Skip to content
Snippets Groups Projects
slave.py 14.1 KiB
Newer Older
  • Learn to ignore specific revisions
  • # ##### BEGIN GPL LICENSE BLOCK #####
    #
    #  This program is free software; you can redistribute it and/or
    #  modify it under the terms of the GNU General Public License
    #  as published by the Free Software Foundation; either version 2
    #  of the License, or (at your option) any later version.
    #
    #  This program is distributed in the hope that it will be useful,
    #  but WITHOUT ANY WARRANTY; without even the implied warranty of
    #  MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
    #  GNU General Public License for more details.
    #
    #  You should have received a copy of the GNU General Public License
    #  along with this program; if not, write to the Free Software Foundation,
    #  Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
    #
    # ##### END GPL LICENSE BLOCK #####
    
    import sys, os, platform, shutil
    
    Campbell Barton's avatar
    Campbell Barton committed
    import http, http.client, http.server
    
    import subprocess, time
    import json
    
    import bpy
    
    from netrender.utils import *
    import netrender.model
    import netrender.repath
    import netrender.thumbnail as thumbnail
    
    BLENDER_PATH = sys.argv[0]
    
    CANCEL_POLL_SPEED = 2
    MAX_TIMEOUT = 10
    INCREMENT_TIMEOUT = 1
    MAX_CONNECT_TRY = 10
    try:
        system = platform.system()
    except UnicodeDecodeError:
        import sys
        system = sys.platform
    
    if system in ('Windows', 'win32') and platform.version() >= '5': # Error mode is only available on Win2k or higher, that's version 5
        import ctypes
        def SetErrorMode():
            val = ctypes.windll.kernel32.SetErrorMode(0x0002)
            ctypes.windll.kernel32.SetErrorMode(val | 0x0002)
            return val
    
        def RestoreErrorMode(val):
            ctypes.windll.kernel32.SetErrorMode(val)
    else:
        def SetErrorMode():
            return 0
    
        def RestoreErrorMode(val):
            pass
    
    def clearSlave(path):
        shutil.rmtree(path)
    
    def slave_Info():
        sysname, nodename, release, version, machine, processor = platform.uname()
        slave = netrender.model.RenderSlave()
        slave.name = nodename
        slave.stats = sysname + " " + release + " " + machine + " " + processor
        return slave
    
    def testCancel(conn, job_id, frame_number):
    
    Martin Poirier's avatar
    Martin Poirier committed
            with ConnectionContext():
                conn.request("HEAD", "/status", headers={"job-id":job_id, "job-frame": str(frame_number)})
    
    
            # canceled if job isn't found anymore
            if responseStatus(conn) == http.client.NO_CONTENT:
                return True
            else:
                return False
    
    
    Martin Poirier's avatar
    Martin Poirier committed
    def testFile(conn, job_id, slave_id, rfile, JOB_PREFIX, main_path=None):
    
        job_full_path = prefixPath(JOB_PREFIX, rfile.filepath, main_path)
        
        found = os.path.exists(job_full_path)
        
        if found and rfile.signature != None:
            found_signature = hashFile(job_full_path)
            found = found_signature == rfile.signature
            
            if not found:
                print("Found file %s at %s but signature mismatch!" % (rfile.filepath, job_full_path))
    
    Martin Poirier's avatar
    Martin Poirier committed
                os.remove(job_full_path)
    
    Martin Poirier's avatar
    Martin Poirier committed
                job_full_path = prefixPath(JOB_PREFIX, rfile.filepath, main_path, force=True)
    
    
        if not found:
            # Force prefix path if not found
    
    Martin Poirier's avatar
    Martin Poirier committed
            job_full_path = prefixPath(JOB_PREFIX, rfile.filepath, main_path, force=True)
    
            temp_path = os.path.join(JOB_PREFIX, "slave.temp")
    
    Martin Poirier's avatar
    Martin Poirier committed
            with ConnectionContext():
                conn.request("GET", fileURL(job_id, rfile.index), headers={"slave-id":slave_id})
    
            response = conn.getresponse()
    
            if response.status != http.client.OK:
                return None # file for job not returned by server, need to return an error code to server
    
            f = open(temp_path, "wb")
            buf = response.read(1024)
    
            while buf:
                f.write(buf)
                buf = response.read(1024)
    
            f.close()
    
            os.renames(temp_path, job_full_path)
            
        rfile.filepath = job_full_path
    
        return job_full_path
    
    def breakable_timeout(timeout):
        for i in range(timeout):
            time.sleep(1)
            if engine.test_break():
                break
    
    def render_slave(engine, netsettings, threads):
        bisleep = BreakableIncrementedSleep(INCREMENT_TIMEOUT, 1, MAX_TIMEOUT, engine.test_break)
    
        engine.update_stats("", "Network render node initiation")
    
        
        slave_path = bpy.path.abspath(netsettings.path)
    
        if not os.path.exists(slave_path):
            print("Slave working path ( %s ) doesn't exist" % netsettings.path)
            return
    
        if not os.access(slave_path, os.W_OK):
            print("Slave working path ( %s ) is not writable" % netsettings.path)
            return
    
    
        conn = clientConnection(netsettings.server_address, netsettings.server_port)
        
        if not conn:
            print("Connection failed, will try connecting again at most %i times" % MAX_CONNECT_TRY)
            bisleep.reset()
            
            for i in range(MAX_CONNECT_TRY):
                bisleep.sleep()
                
                conn = clientConnection(netsettings.server_address, netsettings.server_port)
                
                if conn or engine.test_break():
                    break
                
                print("Retry %i failed, waiting %is before retrying" % (i + 1, bisleep.current))
        
        if conn:
    
    Martin Poirier's avatar
    Martin Poirier committed
            with ConnectionContext():
                conn.request("POST", "/slave", json.dumps(slave_Info().serialize()))
    
            response = conn.getresponse()
            response.read()
    
            slave_id = response.getheader("slave-id")
    
    
            NODE_PREFIX = os.path.join(slave_path, "slave_" + slave_id)
    
            if not os.path.exists(NODE_PREFIX):
                os.mkdir(NODE_PREFIX)
    
            engine.update_stats("", "Network render connected to master, waiting for jobs")
    
            while not engine.test_break():
    
    Martin Poirier's avatar
    Martin Poirier committed
                with ConnectionContext():
                    conn.request("GET", "/job", headers={"slave-id":slave_id})
    
                response = conn.getresponse()
    
                if response.status == http.client.OK:
                    bisleep.reset()
    
                    job = netrender.model.RenderJob.materialize(json.loads(str(response.read(), encoding='utf8')))
                    engine.update_stats("", "Network render processing job from master")
    
                    JOB_PREFIX = os.path.join(NODE_PREFIX, "job_" + job.id)
                    if not os.path.exists(JOB_PREFIX):
                        os.mkdir(JOB_PREFIX)
    
                    # set tempdir for fsaa temp files
                    # have to set environ var because render is done in a subprocess and that's the easiest way to propagate the setting
                    os.environ["TMP"] = JOB_PREFIX
    
    
                    if job.type == netrender.model.JOB_BLENDER:
                        job_path = job.files[0].filepath # path of main file
                        main_path, main_file = os.path.split(job_path)
    
                        job_full_path = testFile(conn, job.id, slave_id, job.files[0], JOB_PREFIX)
                        print("Fullpath", job_full_path)
                        print("File:", main_file, "and %i other files" % (len(job.files) - 1,))
    
                        for rfile in job.files[1:]:
                            testFile(conn, job.id, slave_id, rfile, JOB_PREFIX, main_path)
                            print("\t", rfile.filepath)
                            
                        netrender.repath.update(job)
    
    
    Martin Poirier's avatar
    Martin Poirier committed
                        engine.update_stats("", "Render File " + main_file + " for job " + job.id)
    
                    elif job.type == netrender.model.JOB_VCS:
                        if not job.version_info:
                            # Need to return an error to server, incorrect job type
                            pass
                            
                        job_path = job.files[0].filepath # path of main file
                        main_path, main_file = os.path.split(job_path)
                        
                        job.version_info.update()
                        
                        # For VCS jobs, file path is relative to the working copy path
                        job_full_path = os.path.join(job.version_info.wpath, job_path)
                        
    
    Martin Poirier's avatar
    Martin Poirier committed
                        engine.update_stats("", "Render File " + main_file + " for job " + job.id)
    
    
                    # announce log to master
                    logfile = netrender.model.LogFile(job.id, slave_id, [frame.number for frame in job.frames])
    
    Martin Poirier's avatar
    Martin Poirier committed
                    with ConnectionContext():
                        conn.request("POST", "/log", bytes(json.dumps(logfile.serialize()), encoding='utf8'))
    
                    response = conn.getresponse()
                    response.read()
    
    
                    first_frame = job.frames[0].number
    
                    # start render
                    start_t = time.time()
    
                    if job.rendersWithBlender():
                        frame_args = []
    
                        for frame in job.frames:
                            print("frame", frame.number)
                            frame_args += ["-f", str(frame.number)]
    
                        val = SetErrorMode()
                        process = subprocess.Popen([BLENDER_PATH, "-b", "-noaudio", job_full_path, "-t", str(threads), "-o", os.path.join(JOB_PREFIX, "######"), "-E", "BLENDER_RENDER", "-F", "MULTILAYER"] + frame_args, stdout=subprocess.PIPE, stderr=subprocess.STDOUT)
                        RestoreErrorMode(val)
                    elif job.type == netrender.model.JOB_PROCESS:
                        command = job.frames[0].command
                        val = SetErrorMode()
                        process = subprocess.Popen(command.split(" "), stdout=subprocess.PIPE, stderr=subprocess.STDOUT)
                        RestoreErrorMode(val)
    
                    headers = {"slave-id":slave_id}
    
                    cancelled = False
                    stdout = bytes()
                    run_t = time.time()
                    while not cancelled and process.poll() is None:
                        stdout += process.stdout.read(1024)
                        current_t = time.time()
                        cancelled = engine.test_break()
                        if current_t - run_t > CANCEL_POLL_SPEED:
    
                            # update logs if needed
                            if stdout:
                                # (only need to update on one frame, they are linked
    
    Martin Poirier's avatar
    Martin Poirier committed
                                with ConnectionContext():
                                    conn.request("PUT", logURL(job.id, first_frame), stdout, headers=headers)
    
    Martin Poirier's avatar
    Martin Poirier committed
                                responseStatus(conn)
    
                                
                                # Also output on console
                                if netsettings.use_slave_output_log:
                                    print(str(stdout, encoding='utf8'), end="")
    
                                stdout = bytes()
    
                            run_t = current_t
                            if testCancel(conn, job.id, first_frame):
                                cancelled = True
    
                    if job.type == netrender.model.JOB_BLENDER:
                        netrender.repath.reset(job)
    
                    # read leftovers if needed
                    stdout += process.stdout.read()
    
                    if cancelled:
                        # kill process if needed
                        if process.poll() is None:
                            try:
                                process.terminate()
                            except OSError:
                                pass
                        continue # to next frame
    
                    # flush the rest of the logs
                    if stdout:
                        # Also output on console
                        if netsettings.use_slave_thumb:
                            print(str(stdout, encoding='utf8'), end="")
                        
                        # (only need to update on one frame, they are linked
    
    Martin Poirier's avatar
    Martin Poirier committed
                        with ConnectionContext():
                            conn.request("PUT", logURL(job.id, first_frame), stdout, headers=headers)
                        
    
                        if responseStatus(conn) == http.client.NO_CONTENT:
                            continue
    
                    total_t = time.time() - start_t
    
                    avg_t = total_t / len(job.frames)
    
                    status = process.returncode
    
                    print("status", status)
    
                    headers = {"job-id":job.id, "slave-id":slave_id, "job-time":str(avg_t)}
    
    
                    if status == 0: # non zero status is error
                        headers["job-result"] = str(DONE)
                        for frame in job.frames:
                            headers["job-frame"] = str(frame.number)
                            if job.hasRenderResult():
                                # send image back to server
    
                                filename = os.path.join(JOB_PREFIX, "%06d.exr" % frame.number)
    
                                # thumbnail first
                                if netsettings.use_slave_thumb:
                                    thumbname = thumbnail.generate(filename)
                                    
                                    if thumbname:
                                        f = open(thumbname, 'rb')
    
    Martin Poirier's avatar
    Martin Poirier committed
                                        with ConnectionContext():
                                            conn.request("PUT", "/thumb", f, headers=headers)
    
                                        f.close()
                                        responseStatus(conn)
    
                                f = open(filename, 'rb')
    
    Martin Poirier's avatar
    Martin Poirier committed
                                with ConnectionContext():
                                    conn.request("PUT", "/render", f, headers=headers)
    
                                f.close()
                                if responseStatus(conn) == http.client.NO_CONTENT:
                                    continue
    
                            elif job.type == netrender.model.JOB_PROCESS:
    
    Martin Poirier's avatar
    Martin Poirier committed
                                with ConnectionContext():
                                    conn.request("PUT", "/render", headers=headers)
    
                                if responseStatus(conn) == http.client.NO_CONTENT:
                                    continue
                    else:
                        headers["job-result"] = str(ERROR)
                        for frame in job.frames:
                            headers["job-frame"] = str(frame.number)
                            # send error result back to server
    
    Martin Poirier's avatar
    Martin Poirier committed
                            with ConnectionContext():
                                conn.request("PUT", "/render", headers=headers)
    
                            if responseStatus(conn) == http.client.NO_CONTENT:
                                continue
    
                    engine.update_stats("", "Network render connected to master, waiting for jobs")
                else:
                    bisleep.sleep()
    
            conn.close()
    
            if netsettings.use_slave_clear:
                clearSlave(NODE_PREFIX)
    
    if __name__ == "__main__":
        pass