Skip to content
Snippets Groups Projects
slave.py 14.1 KiB
Newer Older
# ##### BEGIN GPL LICENSE BLOCK #####
#
#  This program is free software; you can redistribute it and/or
#  modify it under the terms of the GNU General Public License
#  as published by the Free Software Foundation; either version 2
#  of the License, or (at your option) any later version.
#
#  This program is distributed in the hope that it will be useful,
#  but WITHOUT ANY WARRANTY; without even the implied warranty of
#  MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
#  GNU General Public License for more details.
#
#  You should have received a copy of the GNU General Public License
#  along with this program; if not, write to the Free Software Foundation,
#  Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
#
# ##### END GPL LICENSE BLOCK #####

import sys, os, platform, shutil
Campbell Barton's avatar
Campbell Barton committed
import http, http.client, http.server
import subprocess, time
import json

import bpy

from netrender.utils import *
import netrender.model
import netrender.repath
import netrender.thumbnail as thumbnail

BLENDER_PATH = sys.argv[0]

CANCEL_POLL_SPEED = 2
MAX_TIMEOUT = 10
INCREMENT_TIMEOUT = 1
MAX_CONNECT_TRY = 10
try:
    system = platform.system()
except UnicodeDecodeError:
    import sys
    system = sys.platform

if system in ('Windows', 'win32') and platform.version() >= '5': # Error mode is only available on Win2k or higher, that's version 5
    import ctypes
    def SetErrorMode():
        val = ctypes.windll.kernel32.SetErrorMode(0x0002)
        ctypes.windll.kernel32.SetErrorMode(val | 0x0002)
        return val

    def RestoreErrorMode(val):
        ctypes.windll.kernel32.SetErrorMode(val)
else:
    def SetErrorMode():
        return 0

    def RestoreErrorMode(val):
        pass

def clearSlave(path):
    shutil.rmtree(path)

def slave_Info():
    sysname, nodename, release, version, machine, processor = platform.uname()
    slave = netrender.model.RenderSlave()
    slave.name = nodename
    slave.stats = sysname + " " + release + " " + machine + " " + processor
    return slave

def testCancel(conn, job_id, frame_number):
Martin Poirier's avatar
Martin Poirier committed
        with ConnectionContext():
            conn.request("HEAD", "/status", headers={"job-id":job_id, "job-frame": str(frame_number)})

        # canceled if job isn't found anymore
        if responseStatus(conn) == http.client.NO_CONTENT:
            return True
        else:
            return False

Martin Poirier's avatar
Martin Poirier committed
def testFile(conn, job_id, slave_id, rfile, JOB_PREFIX, main_path=None):
    job_full_path = prefixPath(JOB_PREFIX, rfile.filepath, main_path)
    
    found = os.path.exists(job_full_path)
    
    if found and rfile.signature != None:
        found_signature = hashFile(job_full_path)
        found = found_signature == rfile.signature
        
        if not found:
            print("Found file %s at %s but signature mismatch!" % (rfile.filepath, job_full_path))
Martin Poirier's avatar
Martin Poirier committed
            os.remove(job_full_path)
Martin Poirier's avatar
Martin Poirier committed
            job_full_path = prefixPath(JOB_PREFIX, rfile.filepath, main_path, force=True)

    if not found:
        # Force prefix path if not found
Martin Poirier's avatar
Martin Poirier committed
        job_full_path = prefixPath(JOB_PREFIX, rfile.filepath, main_path, force=True)
        temp_path = os.path.join(JOB_PREFIX, "slave.temp")
Martin Poirier's avatar
Martin Poirier committed
        with ConnectionContext():
            conn.request("GET", fileURL(job_id, rfile.index), headers={"slave-id":slave_id})
        response = conn.getresponse()

        if response.status != http.client.OK:
            return None # file for job not returned by server, need to return an error code to server

        f = open(temp_path, "wb")
        buf = response.read(1024)

        while buf:
            f.write(buf)
            buf = response.read(1024)

        f.close()

        os.renames(temp_path, job_full_path)
        
    rfile.filepath = job_full_path

    return job_full_path

def breakable_timeout(timeout):
    for i in range(timeout):
        time.sleep(1)
        if engine.test_break():
            break

def render_slave(engine, netsettings, threads):
    bisleep = BreakableIncrementedSleep(INCREMENT_TIMEOUT, 1, MAX_TIMEOUT, engine.test_break)

    engine.update_stats("", "Network render node initiation")
    
    slave_path = bpy.path.abspath(netsettings.path)

    if not os.path.exists(slave_path):
        print("Slave working path ( %s ) doesn't exist" % netsettings.path)
        return

    if not os.access(slave_path, os.W_OK):
        print("Slave working path ( %s ) is not writable" % netsettings.path)
        return

    conn = clientConnection(netsettings.server_address, netsettings.server_port)
    
    if not conn:
        print("Connection failed, will try connecting again at most %i times" % MAX_CONNECT_TRY)
        bisleep.reset()
        
        for i in range(MAX_CONNECT_TRY):
            bisleep.sleep()
            
            conn = clientConnection(netsettings.server_address, netsettings.server_port)
            
            if conn or engine.test_break():
                break
            
            print("Retry %i failed, waiting %is before retrying" % (i + 1, bisleep.current))
    
    if conn:
Martin Poirier's avatar
Martin Poirier committed
        with ConnectionContext():
            conn.request("POST", "/slave", json.dumps(slave_Info().serialize()))
        response = conn.getresponse()
        response.read()

        slave_id = response.getheader("slave-id")

        NODE_PREFIX = os.path.join(slave_path, "slave_" + slave_id)
        if not os.path.exists(NODE_PREFIX):
            os.mkdir(NODE_PREFIX)

        engine.update_stats("", "Network render connected to master, waiting for jobs")

        while not engine.test_break():
Martin Poirier's avatar
Martin Poirier committed
            with ConnectionContext():
                conn.request("GET", "/job", headers={"slave-id":slave_id})
            response = conn.getresponse()

            if response.status == http.client.OK:
                bisleep.reset()

                job = netrender.model.RenderJob.materialize(json.loads(str(response.read(), encoding='utf8')))
                engine.update_stats("", "Network render processing job from master")

                JOB_PREFIX = os.path.join(NODE_PREFIX, "job_" + job.id)
                if not os.path.exists(JOB_PREFIX):
                    os.mkdir(JOB_PREFIX)

                # set tempdir for fsaa temp files
                # have to set environ var because render is done in a subprocess and that's the easiest way to propagate the setting
                os.environ["TMP"] = JOB_PREFIX


                if job.type == netrender.model.JOB_BLENDER:
                    job_path = job.files[0].filepath # path of main file
                    main_path, main_file = os.path.split(job_path)

                    job_full_path = testFile(conn, job.id, slave_id, job.files[0], JOB_PREFIX)
                    print("Fullpath", job_full_path)
                    print("File:", main_file, "and %i other files" % (len(job.files) - 1,))

                    for rfile in job.files[1:]:
                        testFile(conn, job.id, slave_id, rfile, JOB_PREFIX, main_path)
                        print("\t", rfile.filepath)
                        
                    netrender.repath.update(job)

Martin Poirier's avatar
Martin Poirier committed
                    engine.update_stats("", "Render File " + main_file + " for job " + job.id)
                elif job.type == netrender.model.JOB_VCS:
                    if not job.version_info:
                        # Need to return an error to server, incorrect job type
                        pass
                        
                    job_path = job.files[0].filepath # path of main file
                    main_path, main_file = os.path.split(job_path)
                    
                    job.version_info.update()
                    
                    # For VCS jobs, file path is relative to the working copy path
                    job_full_path = os.path.join(job.version_info.wpath, job_path)
                    
Martin Poirier's avatar
Martin Poirier committed
                    engine.update_stats("", "Render File " + main_file + " for job " + job.id)

                # announce log to master
                logfile = netrender.model.LogFile(job.id, slave_id, [frame.number for frame in job.frames])
Martin Poirier's avatar
Martin Poirier committed
                with ConnectionContext():
                    conn.request("POST", "/log", bytes(json.dumps(logfile.serialize()), encoding='utf8'))
                response = conn.getresponse()
                response.read()


                first_frame = job.frames[0].number

                # start render
                start_t = time.time()

                if job.rendersWithBlender():
                    frame_args = []

                    for frame in job.frames:
                        print("frame", frame.number)
                        frame_args += ["-f", str(frame.number)]

                    val = SetErrorMode()
                    process = subprocess.Popen([BLENDER_PATH, "-b", "-noaudio", job_full_path, "-t", str(threads), "-o", os.path.join(JOB_PREFIX, "######"), "-E", "BLENDER_RENDER", "-F", "MULTILAYER"] + frame_args, stdout=subprocess.PIPE, stderr=subprocess.STDOUT)
                    RestoreErrorMode(val)
                elif job.type == netrender.model.JOB_PROCESS:
                    command = job.frames[0].command
                    val = SetErrorMode()
                    process = subprocess.Popen(command.split(" "), stdout=subprocess.PIPE, stderr=subprocess.STDOUT)
                    RestoreErrorMode(val)

                headers = {"slave-id":slave_id}

                cancelled = False
                stdout = bytes()
                run_t = time.time()
                while not cancelled and process.poll() is None:
                    stdout += process.stdout.read(1024)
                    current_t = time.time()
                    cancelled = engine.test_break()
                    if current_t - run_t > CANCEL_POLL_SPEED:

                        # update logs if needed
                        if stdout:
                            # (only need to update on one frame, they are linked
Martin Poirier's avatar
Martin Poirier committed
                            with ConnectionContext():
                                conn.request("PUT", logURL(job.id, first_frame), stdout, headers=headers)
Martin Poirier's avatar
Martin Poirier committed
                            responseStatus(conn)
                            
                            # Also output on console
                            if netsettings.use_slave_output_log:
                                print(str(stdout, encoding='utf8'), end="")

                            stdout = bytes()

                        run_t = current_t
                        if testCancel(conn, job.id, first_frame):
                            cancelled = True

                if job.type == netrender.model.JOB_BLENDER:
                    netrender.repath.reset(job)

                # read leftovers if needed
                stdout += process.stdout.read()

                if cancelled:
                    # kill process if needed
                    if process.poll() is None:
                        try:
                            process.terminate()
                        except OSError:
                            pass
                    continue # to next frame

                # flush the rest of the logs
                if stdout:
                    # Also output on console
                    if netsettings.use_slave_thumb:
                        print(str(stdout, encoding='utf8'), end="")
                    
                    # (only need to update on one frame, they are linked
Martin Poirier's avatar
Martin Poirier committed
                    with ConnectionContext():
                        conn.request("PUT", logURL(job.id, first_frame), stdout, headers=headers)
                    
                    if responseStatus(conn) == http.client.NO_CONTENT:
                        continue

                total_t = time.time() - start_t

                avg_t = total_t / len(job.frames)

                status = process.returncode

                print("status", status)

                headers = {"job-id":job.id, "slave-id":slave_id, "job-time":str(avg_t)}


                if status == 0: # non zero status is error
                    headers["job-result"] = str(DONE)
                    for frame in job.frames:
                        headers["job-frame"] = str(frame.number)
                        if job.hasRenderResult():
                            # send image back to server

                            filename = os.path.join(JOB_PREFIX, "%06d.exr" % frame.number)

                            # thumbnail first
                            if netsettings.use_slave_thumb:
                                thumbname = thumbnail.generate(filename)
                                
                                if thumbname:
                                    f = open(thumbname, 'rb')
Martin Poirier's avatar
Martin Poirier committed
                                    with ConnectionContext():
                                        conn.request("PUT", "/thumb", f, headers=headers)
                                    f.close()
                                    responseStatus(conn)

                            f = open(filename, 'rb')
Martin Poirier's avatar
Martin Poirier committed
                            with ConnectionContext():
                                conn.request("PUT", "/render", f, headers=headers)
                            f.close()
                            if responseStatus(conn) == http.client.NO_CONTENT:
                                continue

                        elif job.type == netrender.model.JOB_PROCESS:
Martin Poirier's avatar
Martin Poirier committed
                            with ConnectionContext():
                                conn.request("PUT", "/render", headers=headers)
                            if responseStatus(conn) == http.client.NO_CONTENT:
                                continue
                else:
                    headers["job-result"] = str(ERROR)
                    for frame in job.frames:
                        headers["job-frame"] = str(frame.number)
                        # send error result back to server
Martin Poirier's avatar
Martin Poirier committed
                        with ConnectionContext():
                            conn.request("PUT", "/render", headers=headers)
                        if responseStatus(conn) == http.client.NO_CONTENT:
                            continue

                engine.update_stats("", "Network render connected to master, waiting for jobs")
            else:
                bisleep.sleep()

        conn.close()

        if netsettings.use_slave_clear:
            clearSlave(NODE_PREFIX)

if __name__ == "__main__":
    pass