Skip to content
Snippets Groups Projects
master.py 39.7 KiB
Newer Older
  • Learn to ignore specific revisions
  • # ##### BEGIN GPL LICENSE BLOCK #####
    #
    #  This program is free software; you can redistribute it and/or
    #  modify it under the terms of the GNU General Public License
    #  as published by the Free Software Foundation; either version 2
    #  of the License, or (at your option) any later version.
    #
    #  This program is distributed in the hope that it will be useful,
    #  but WITHOUT ANY WARRANTY; without even the implied warranty of
    #  MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
    #  GNU General Public License for more details.
    #
    #  You should have received a copy of the GNU General Public License
    #  along with this program; if not, write to the Free Software Foundation,
    #  Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
    #
    # ##### END GPL LICENSE BLOCK #####
    
    import sys, os
    
    Campbell Barton's avatar
    Campbell Barton committed
    import http, http.client, http.server, socket, socketserver
    import shutil, time, hashlib
    
    import pickle
    import select # for select.error
    import json
    
    from netrender.utils import *
    import netrender.model
    import netrender.balancing
    import netrender.master_html
    import netrender.thumbnail as thumbnail
    
    class MRenderFile(netrender.model.RenderFile):
        def __init__(self, filepath, index, start, end, signature):
            super().__init__(filepath, index, start, end, signature)
            self.found = False
    
    
    Martin Poirier's avatar
    Martin Poirier committed
        def updateStatus(self):
    
            self.found = os.path.exists(self.filepath)
    
    Martin Poirier's avatar
    Martin Poirier committed
            
    
            if self.found and self.signature != None:
                found_signature = hashFile(self.filepath)
                self.found = self.signature == found_signature
    
    Martin Poirier's avatar
    Martin Poirier committed
                if not self.found:
                    print("Signature mismatch", self.signature, found_signature)
                
            return self.found
    
        def test(self):
            # don't check when forcing upload and only until found
            if not self.force and not self.found:
                self.updateStatus()
    
                
            return self.found
    
    
    class MRenderSlave(netrender.model.RenderSlave):
    
    Martin Poirier's avatar
    Martin Poirier committed
        def __init__(self, slave_info):
            super().__init__(slave_info)
            self.id = hashlib.md5(bytes(repr(slave_info.name) + repr(slave_info.address), encoding='utf8')).hexdigest()
    
            self.last_seen = time.time()
    
            self.job = None
            self.job_frames = []
    
            netrender.model.RenderSlave._slave_map[self.id] = self
    
        def seen(self):
            self.last_seen = time.time()
    
        def finishedFrame(self, frame_number):
    
            try:
                self.job_frames.remove(frame_number)
            except ValueError as e:
                print("Internal error: Frame %i not in job frames list" % frame_number)
                print(self.job_frames)
    
            if not self.job_frames:
                self.job = None
    
    class MRenderJob(netrender.model.RenderJob):
        def __init__(self, job_id, job_info):
            super().__init__(job_info)
            self.id = job_id
            self.last_dispatched = time.time()
    
            # force one chunk for process jobs
            if self.type == netrender.model.JOB_PROCESS:
                self.chunks = 1
    
            # Force WAITING status on creation
            self.status = JOB_WAITING
    
            # special server properties
            self.last_update = 0
            self.save_path = ""
            self.files = [MRenderFile(rfile.filepath, rfile.index, rfile.start, rfile.end, rfile.signature) for rfile in job_info.files]
    
    Martin Poirier's avatar
    Martin Poirier committed
            
        def setForceUpload(self, force):
            for rfile in self.files:
                rfile.force = force
    
    
        def initInfo(self):
            if not self.resolution:
                self.resolution = tuple(getFileInfo(self.files[0].filepath, ["bpy.context.scene.render.resolution_x", "bpy.context.scene.render.resolution_y", "bpy.context.scene.render.resolution_percentage"]))
    
        def save(self):
            if self.save_path:
                f = open(os.path.join(self.save_path, "job.txt"), "w")
                f.write(json.dumps(self.serialize()))
                f.close()
    
        def edit(self, info_map):
            if "status" in info_map:
                self.status = info_map["status"]
    
            if "priority" in info_map:
                self.priority = info_map["priority"]
    
            if "chunks" in info_map:
                self.chunks = info_map["chunks"]
    
        def testStart(self):
            # Don't test files for versionned jobs
            if not self.version_info:
                for f in self.files:
                    if not f.test():
                        return False
    
            self.start()
            self.initInfo()
            return True
    
        def testFinished(self):
            for f in self.frames:
                if f.status == QUEUED or f.status == DISPATCHED:
                    break
            else:
                self.status = JOB_FINISHED
    
        def pause(self, status = None):
            if self.status not in {JOB_PAUSED, JOB_QUEUED}:
                return
    
            if status is None:
                self.status = JOB_PAUSED if self.status == JOB_QUEUED else JOB_QUEUED
            elif status:
                self.status = JOB_QUEUED
            else:
                self.status = JOB_PAUSED
    
        def start(self):
            self.status = JOB_QUEUED
    
        def addLog(self, frames):
    
    Martin Poirier's avatar
    Martin Poirier committed
            frames = sorted(frames)
            log_name = "%06d_%06d.log" % (frames[0], frames[-1])
    
            log_path = os.path.join(self.save_path, log_name)
    
            for number in frames:
                frame = self[number]
                if frame:
                    frame.log_path = log_path
    
        def addFrame(self, frame_number, command):
            frame = MRenderFrame(frame_number, command)
            self.frames.append(frame)
            return frame
    
        def reset(self, all):
            for f in self.frames:
                f.reset(all)
    
    Martin Poirier's avatar
    Martin Poirier committed
                
            if all:
                self.status = JOB_QUEUED
    
    
        def getFrames(self):
            frames = []
            for f in self.frames:
                if f.status == QUEUED:
                    self.last_dispatched = time.time()
                    frames.append(f)
                    if len(frames) >= self.chunks:
                        break
    
            return frames
    
    class MRenderFrame(netrender.model.RenderFrame):
        def __init__(self, frame, command):
            super().__init__()
            self.number = frame
            self.slave = None
            self.time = 0
            self.status = QUEUED
            self.command = command
    
            self.log_path = None
    
        def reset(self, all):
            if all or self.status == ERROR:
                self.log_path = None
                self.slave = None
                self.time = 0
                self.status = QUEUED
    
    
    # -=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=
    # =-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-
    # -=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=
    # =-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-
    file_pattern = re.compile("/file_([a-zA-Z0-9]+)_([0-9]+)")
    render_pattern = re.compile("/render_([a-zA-Z0-9]+)_([0-9]+).exr")
    thumb_pattern = re.compile("/thumb_([a-zA-Z0-9]+)_([0-9]+).jpg")
    log_pattern = re.compile("/log_([a-zA-Z0-9]+)_([0-9]+).log")
    reset_pattern = re.compile("/reset(all|)_([a-zA-Z0-9]+)_([0-9]+)")
    cancel_pattern = re.compile("/cancel_([a-zA-Z0-9]+)")
    pause_pattern = re.compile("/pause_([a-zA-Z0-9]+)")
    edit_pattern = re.compile("/edit_([a-zA-Z0-9]+)")
    
    class RenderHandler(http.server.BaseHTTPRequestHandler):
    
        def write_file(self, file_path, mode = 'wb'):
            length = int(self.headers['content-length'])
            f = open(file_path, mode)
            buf = self.rfile.read(length)
            f.write(buf)
            f.close()
            del buf
            
    
        def log_message(self, format, *args):
            # override because the original calls self.address_string(), which
            # is extremely slow due to some timeout..
            sys.stderr.write("[%s] %s\n" % (self.log_date_time_string(), format%args))
    
        def getInfoMap(self):
            length = int(self.headers['content-length'])
    
            if length > 0:
                msg = str(self.rfile.read(length), encoding='utf8')
                return json.loads(msg)
            else:
                return {}
    
        def send_head(self, code = http.client.OK, headers = {}, content = "application/octet-stream"):
            self.send_response(code)
    
    Martin Poirier's avatar
    Martin Poirier committed
            
            if code != http.client.OK and content:
                self.send_header("Content-type", content)
    
    
            for key, value in headers.items():
                self.send_header(key, value)
    
            self.end_headers()
    
        def do_HEAD(self):
    
            if self.path == "/status":
                job_id = self.headers.get('job-id', "")
                job_frame = int(self.headers.get('job-frame', -1))
    
                job = self.server.getJobID(job_id)
                if job:
                    frame = job[job_frame]
    
    
                    if frame:
                        self.send_head(http.client.OK)
                    else:
                        # no such frame
                        self.send_head(http.client.NO_CONTENT)
                else:
                    # no such job id
                    self.send_head(http.client.NO_CONTENT)
    
        # =-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-
        # -=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=
        # =-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-
        # -=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=
        # =-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-
    
        def do_GET(self):
    
            if self.path == "/version":
                self.send_head()
                self.server.stats("", "Version check")
                self.wfile.write(VERSION)
            # =-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-
            elif self.path.startswith("/render"):
                match = render_pattern.match(self.path)
    
                if match:
                    job_id = match.groups()[0]
                    frame_number = int(match.groups()[1])
    
                    job = self.server.getJobID(job_id)
    
                    if job:
                        frame = job[frame_number]
    
                        if frame:
                            if frame.status in (QUEUED, DISPATCHED):
                                self.send_head(http.client.ACCEPTED)
                            elif frame.status == DONE:
                                self.server.stats("", "Sending result to client")
    
                                filename = os.path.join(job.save_path, "%06d.exr" % frame_number)
    
                                f = open(filename, 'rb')
                                self.send_head(content = "image/x-exr")
                                shutil.copyfileobj(f, self.wfile)
                                f.close()
                            elif frame.status == ERROR:
                                self.send_head(http.client.PARTIAL_CONTENT)
                        else:
                            # no such frame
                            self.send_head(http.client.NO_CONTENT)
                    else:
                        # no such job id
                        self.send_head(http.client.NO_CONTENT)
                else:
                    # invalid url
                    self.send_head(http.client.NO_CONTENT)
            # =-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-
            elif self.path.startswith("/thumb"):
                match = thumb_pattern.match(self.path)
    
                if match:
                    job_id = match.groups()[0]
                    frame_number = int(match.groups()[1])
    
                    job = self.server.getJobID(job_id)
    
                    if job:
                        frame = job[frame_number]
    
                        if frame:
                            if frame.status in (QUEUED, DISPATCHED):
                                self.send_head(http.client.ACCEPTED)
                            elif frame.status == DONE:
                                filename = os.path.join(job.save_path, "%06d.exr" % frame_number)
    
                                thumbname = thumbnail.generate(filename)
    
                                if thumbname:
                                    f = open(thumbname, 'rb')
                                    self.send_head(content = "image/jpeg")
                                    shutil.copyfileobj(f, self.wfile)
                                    f.close()
                                else: # thumbnail couldn't be generated
                                    self.send_head(http.client.PARTIAL_CONTENT)
                                    return
                            elif frame.status == ERROR:
                                self.send_head(http.client.PARTIAL_CONTENT)
                        else:
                            # no such frame
                            self.send_head(http.client.NO_CONTENT)
                    else:
                        # no such job id
                        self.send_head(http.client.NO_CONTENT)
                else:
                    # invalid url
                    self.send_head(http.client.NO_CONTENT)
            # =-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-
            elif self.path.startswith("/log"):
                match = log_pattern.match(self.path)
    
                if match:
                    job_id = match.groups()[0]
                    frame_number = int(match.groups()[1])
    
                    job = self.server.getJobID(job_id)
    
                    if job:
                        frame = job[frame_number]
    
                        if frame:
                            if not frame.log_path or frame.status in (QUEUED, DISPATCHED):
                                self.send_head(http.client.PROCESSING)
                            else:
                                self.server.stats("", "Sending log to client")
                                f = open(frame.log_path, 'rb')
    
                                self.send_head(content = "text/plain")
    
                                shutil.copyfileobj(f, self.wfile)
    
                                f.close()
                        else:
                            # no such frame
                            self.send_head(http.client.NO_CONTENT)
                    else:
                        # no such job id
                        self.send_head(http.client.NO_CONTENT)
                else:
                    # invalid URL
                    self.send_head(http.client.NO_CONTENT)
            # =-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-
            elif self.path == "/status":
                job_id = self.headers.get('job-id', "")
                job_frame = int(self.headers.get('job-frame', -1))
    
                if job_id:
    
                    job = self.server.getJobID(job_id)
                    if job:
                        if job_frame != -1:
                            frame = job[frame]
    
                            if frame:
                                message = frame.serialize()
                            else:
                                # no such frame
                                self.send_heat(http.client.NO_CONTENT)
                                return
                        else:
                            message = job.serialize()
                    else:
                        # no such job id
                        self.send_head(http.client.NO_CONTENT)
                        return
                else: # status of all jobs
                    message = []
    
                    for job in self.server:
                        message.append(job.serialize())
    
    
                self.server.stats("", "Sending status")
                self.send_head()
                self.wfile.write(bytes(json.dumps(message), encoding='utf8'))
    
            # =-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-
            elif self.path == "/job":
                self.server.balance()
    
                slave_id = self.headers['slave-id']
    
                slave = self.server.getSeenSlave(slave_id)
    
                if slave: # only if slave id is valid
    
    Martin Poirier's avatar
    Martin Poirier committed
                    job, frames = self.server.newDispatch(slave)
    
    
                    if job and frames:
                        for f in frames:
                            print("dispatch", f.number)
                            f.status = DISPATCHED
                            f.slave = slave
    
                        slave.job = job
                        slave.job_frames = [f.number for f in frames]
    
                        self.send_head(headers={"job-id": job.id})
    
                        message = job.serialize(frames)
                        self.wfile.write(bytes(json.dumps(message), encoding='utf8'))
    
                        self.server.stats("", "Sending job to slave")
                    else:
                        # no job available, return error code
                        slave.job = None
                        slave.job_frames = []
    
                        self.send_head(http.client.ACCEPTED)
                else: # invalid slave id
                    self.send_head(http.client.NO_CONTENT)
            # =-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-
            elif self.path.startswith("/file"):
                match = file_pattern.match(self.path)
    
                if match:
                    slave_id = self.headers['slave-id']
                    slave = self.server.getSeenSlave(slave_id)
    
                    if not slave:
                        # invalid slave id
                        print("invalid slave id")
    
                    job_id = match.groups()[0]
                    file_index = int(match.groups()[1])
    
                    job = self.server.getJobID(job_id)
    
                    if job:
                        render_file = job.files[file_index]
    
                        if render_file:
                            self.server.stats("", "Sending file to slave")
                            f = open(render_file.filepath, 'rb')
    
                            self.send_head()
                            shutil.copyfileobj(f, self.wfile)
    
                            f.close()
                        else:
                            # no such file
                            self.send_head(http.client.NO_CONTENT)
                    else:
                        # no such job id
                        self.send_head(http.client.NO_CONTENT)
                else: # invalid url
                    self.send_head(http.client.NO_CONTENT)
            # =-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-
            elif self.path == "/slaves":
                message = []
    
                self.server.stats("", "Sending slaves status")
    
                for slave in self.server.slaves:
                    message.append(slave.serialize())
    
                self.send_head()
    
                self.wfile.write(bytes(json.dumps(message), encoding='utf8'))
            # =-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-
            else:
                # hand over the rest to the html section
                netrender.master_html.get(self)
    
        # =-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-
        # -=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=
        # =-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-
        # -=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=
        # =-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-
        def do_POST(self):
    
            # =-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-
            if self.path == "/job":
    
                length = int(self.headers['content-length'])
    
                job_info = netrender.model.RenderJob.materialize(json.loads(str(self.rfile.read(length), encoding='utf8')))
                job_id = self.server.nextJobID()
    
                job = MRenderJob(job_id, job_info)
    
    Martin Poirier's avatar
    Martin Poirier committed
                
                job.setForceUpload(self.server.force)
    
    
                for frame in job_info.frames:
                    frame = job.addFrame(frame.number, frame.command)
    
                self.server.addJob(job)
    
                headers={"job-id": job_id}
    
                if job.testStart():
                    self.server.stats("", "New job, started")
    
    Martin Poirier's avatar
    Martin Poirier committed
                    self.send_head(headers=headers, content = None)
    
                else:
                    self.server.stats("", "New job, missing files (%i total)" % len(job.files))
                    self.send_head(http.client.ACCEPTED, headers=headers)
            # =-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-
            elif self.path.startswith("/edit"):
                match = edit_pattern.match(self.path)
    
                if match:
                    job_id = match.groups()[0]
    
                    job = self.server.getJobID(job_id)
    
                    if job:
                        info_map = self.getInfoMap()
    
                        job.edit(info_map)
    
    Martin Poirier's avatar
    Martin Poirier committed
                        self.send_head(content = None)
    
                    else:
                        # no such job id
                        self.send_head(http.client.NO_CONTENT)
                else:
                    # invalid url
                    self.send_head(http.client.NO_CONTENT)
            # =-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-
            elif self.path == "/balance_limit":
                info_map = self.getInfoMap()
                for rule_id, limit in info_map.items():
                    try:
                        rule = self.server.balancer.ruleByID(rule_id)
                        if rule:
                            rule.setLimit(limit)
                    except:
                        pass # invalid type
    
    
    Martin Poirier's avatar
    Martin Poirier committed
                self.send_head(content = None)
    
            # =-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-
            elif self.path == "/balance_enable":
                info_map = self.getInfoMap()
                for rule_id, enabled in info_map.items():
                    rule = self.server.balancer.ruleByID(rule_id)
                    if rule:
                        rule.enabled = enabled
    
    
    Martin Poirier's avatar
    Martin Poirier committed
                self.send_head(content = None)
    
            # =-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-
            elif self.path.startswith("/cancel"):
                match = cancel_pattern.match(self.path)
    
                if match:
                    info_map = self.getInfoMap()
                    clear = info_map.get("clear", False)
    
                    job_id = match.groups()[0]
    
                    job = self.server.getJobID(job_id)
    
                    if job:
                        self.server.stats("", "Cancelling job")
                        self.server.removeJob(job, clear)
    
    Martin Poirier's avatar
    Martin Poirier committed
                        self.send_head(content = None)
    
                    else:
                        # no such job id
                        self.send_head(http.client.NO_CONTENT)
                else:
                    # invalid url
                    self.send_head(http.client.NO_CONTENT)
            # =-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-
            elif self.path.startswith("/pause"):
                match = pause_pattern.match(self.path)
    
                if match:
                    info_map = self.getInfoMap()
                    status = info_map.get("status", None)
    
                    job_id = match.groups()[0]
    
                    job = self.server.getJobID(job_id)
    
                    if job:
                        self.server.stats("", "Pausing job")
                        job.pause(status)
    
    Martin Poirier's avatar
    Martin Poirier committed
                        self.send_head(content = None)
    
                    else:
                        # no such job id
                        self.send_head(http.client.NO_CONTENT)
                else:
                    # invalid url
                    self.send_head(http.client.NO_CONTENT)
            # =-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-
            elif self.path == "/clear":
                # cancel all jobs
                info_map = self.getInfoMap()
                clear = info_map.get("clear", False)
    
                self.server.stats("", "Clearing jobs")
                self.server.clear(clear)
    
    
    Martin Poirier's avatar
    Martin Poirier committed
                self.send_head(content = None)
    
            # =-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-
            elif self.path.startswith("/reset"):
                match = reset_pattern.match(self.path)
    
                if match:
                    all = match.groups()[0] == 'all'
                    job_id = match.groups()[1]
                    job_frame = int(match.groups()[2])
    
                    job = self.server.getJobID(job_id)
    
                    if job:
                        if job_frame != 0:
    
                            frame = job[job_frame]
                            if frame:
                                self.server.stats("", "Reset job frame")
                                frame.reset(all)
    
    Martin Poirier's avatar
    Martin Poirier committed
                                self.send_head(content = None)
    
                            else:
                                # no such frame
                                self.send_head(http.client.NO_CONTENT)
    
                        else:
                            self.server.stats("", "Reset job")
                            job.reset(all)
    
    Martin Poirier's avatar
    Martin Poirier committed
                            self.send_head(content = None)
    
    
                    else: # job not found
                        self.send_head(http.client.NO_CONTENT)
                else: # invalid url
                    self.send_head(http.client.NO_CONTENT)
            # =-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-
            elif self.path == "/slave":
                length = int(self.headers['content-length'])
    
    Campbell Barton's avatar
    Campbell Barton committed
                # job_frame_string = self.headers['job-frame']  # UNUSED
    
    
                self.server.stats("", "New slave connected")
    
                slave_info = netrender.model.RenderSlave.materialize(json.loads(str(self.rfile.read(length), encoding='utf8')), cache = False)
    
    Martin Poirier's avatar
    Martin Poirier committed
                
                slave_info.address = self.client_address
    
    Martin Poirier's avatar
    Martin Poirier committed
                slave_id = self.server.addSlave(slave_info)
    
    Martin Poirier's avatar
    Martin Poirier committed
                self.send_head(headers = {"slave-id": slave_id}, content = None)
    
            # =-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-
            elif self.path == "/log":
                length = int(self.headers['content-length'])
    
                log_info = netrender.model.LogFile.materialize(json.loads(str(self.rfile.read(length), encoding='utf8')))
    
                slave_id = log_info.slave_id
    
                slave = self.server.getSeenSlave(slave_id)
    
                if slave: # only if slave id is valid
                    job = self.server.getJobID(log_info.job_id)
    
                    if job:
                        self.server.stats("", "Log announcement")
                        job.addLog(log_info.frames)
    
    Martin Poirier's avatar
    Martin Poirier committed
                        self.send_head(content = None)
    
                    else:
                        # no such job id
                        self.send_head(http.client.NO_CONTENT)
                else: # invalid slave id
                    self.send_head(http.client.NO_CONTENT)
        # =-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-
        # -=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=
        # =-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-
        # -=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=
        # =-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-
        def do_PUT(self):
    
            # =-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-
            if self.path.startswith("/file"):
                match = file_pattern.match(self.path)
    
                if match:
    
    Martin Poirier's avatar
    Martin Poirier committed
                    self.server.stats("", "Receiving job file")
    
    
                    job_id = match.groups()[0]
                    file_index = int(match.groups()[1])
    
                    job = self.server.getJobID(job_id)
    
                    if job:
    
                        render_file = job.files[file_index]
    
                        if render_file:
                            main_file = job.files[0].filepath # filename of the first file
    
                            main_path, main_name = os.path.split(main_file)
    
                            if file_index > 0:
    
    Martin Poirier's avatar
    Martin Poirier committed
                                file_path = prefixPath(job.save_path, render_file.filepath, main_path, force = True)
    
                            else:
                                file_path = os.path.join(job.save_path, main_name)
    
                            # add same temp file + renames as slave
                            
    
                            self.write_file(file_path)
                            
    
                            render_file.filepath = file_path # set the new path
    
    Martin Poirier's avatar
    Martin Poirier committed
                            found = render_file.updateStatus() # make sure we have the right file
                            
                            if not found: # checksum mismatch
                                self.server.stats("", "File upload but checksum mismatch, this shouldn't happen")
                                self.send_head(http.client.CONFLICT)
                            elif job.testStart(): # started correctly
    
                                self.server.stats("", "File upload, starting job")
    
    Martin Poirier's avatar
    Martin Poirier committed
                                self.send_head(content = None)
    
                            else:
    
    Martin Poirier's avatar
    Martin Poirier committed
                                self.server.stats("", "File upload, dependency files still missing")
    
                                self.send_head(http.client.ACCEPTED)
                        else: # invalid file
                            print("file not found", job_id, file_index)
                            self.send_head(http.client.NO_CONTENT)
                    else: # job not found
                        print("job not found", job_id, file_index)
                        self.send_head(http.client.NO_CONTENT)
                else: # invalid url
                    print("no match")
                    self.send_head(http.client.NO_CONTENT)
            # =-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-
            elif self.path == "/render":
                self.server.stats("", "Receiving render result")
    
                slave_id = self.headers['slave-id']
    
                slave = self.server.getSeenSlave(slave_id)
    
                if slave: # only if slave id is valid
                    job_id = self.headers['job-id']
    
                    job = self.server.getJobID(job_id)
    
                    if job:
                        job_frame = int(self.headers['job-frame'])
                        job_result = int(self.headers['job-result'])
                        job_time = float(self.headers['job-time'])
    
                        frame = job[job_frame]
    
                        if frame:
    
    Martin Poirier's avatar
    Martin Poirier committed
                            self.send_head(content = None)
    
    
                            if job.hasRenderResult():
                                if job_result == DONE:
    
                                    self.write_file(os.path.join(job.save_path, "%06d.exr" % job_frame))
    
    
                                elif job_result == ERROR:
                                    # blacklist slave on this job on error
                                    # slaves might already be in blacklist if errors on the whole chunk
                                    if not slave.id in job.blacklist:
                                        job.blacklist.append(slave.id)
    
                            slave.finishedFrame(job_frame)
    
                            frame.status = job_result
                            frame.time = job_time
    
                            job.testFinished()
    
                        else: # frame not found
                            self.send_head(http.client.NO_CONTENT)
                    else: # job not found
                        self.send_head(http.client.NO_CONTENT)
                else: # invalid slave id
                    self.send_head(http.client.NO_CONTENT)
            # =-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-
            elif self.path == "/thumb":
                self.server.stats("", "Receiving thumbnail result")
    
                slave_id = self.headers['slave-id']
    
                slave = self.server.getSeenSlave(slave_id)
    
                if slave: # only if slave id is valid
                    job_id = self.headers['job-id']
    
                    job = self.server.getJobID(job_id)
    
                    if job:
                        job_frame = int(self.headers['job-frame'])
    
                        frame = job[job_frame]
    
                        if frame:
    
    Martin Poirier's avatar
    Martin Poirier committed
                            self.send_head(content = None)
                            
    
                            if job.hasRenderResult():
    
    Martin Poirier's avatar
    Martin Poirier committed
                                self.write_file(os.path.join(os.path.join(job.save_path, "%06d.jpg" % job_frame)))
    
    
                        else: # frame not found
                            self.send_head(http.client.NO_CONTENT)
                    else: # job not found
                        self.send_head(http.client.NO_CONTENT)
                else: # invalid slave id
                    self.send_head(http.client.NO_CONTENT)
            # =-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-
            elif self.path.startswith("/log"):
                self.server.stats("", "Receiving log file")
    
                match = log_pattern.match(self.path)
    
                if match:
                    job_id = match.groups()[0]
    
                    job = self.server.getJobID(job_id)
    
                    if job:
                        job_frame = int(match.groups()[1])
    
                        frame = job[job_frame]
    
                        if frame and frame.log_path:
    
    Martin Poirier's avatar
    Martin Poirier committed
                            self.send_head(content = None)
    
    
                            self.write_file(frame.log_path, 'ab')
    
    
                            self.server.getSeenSlave(self.headers['slave-id'])
    
                        else: # frame not found
                            self.send_head(http.client.NO_CONTENT)
                    else: # job not found
                        self.send_head(http.client.NO_CONTENT)
                else: # invalid url
                    self.send_head(http.client.NO_CONTENT)
    
    class RenderMasterServer(socketserver.ThreadingMixIn, http.server.HTTPServer):
    
    Martin Poirier's avatar
    Martin Poirier committed
        def __init__(self, address, handler_class, path, force=False, subdir=True):
    
            self.jobs = []
            self.jobs_map = {}
            self.slaves = []
            self.slaves_map = {}
            self.job_id = 0
    
    Martin Poirier's avatar
    Martin Poirier committed
            self.force = force
    
    
            if subdir:
                self.path = os.path.join(path, "master_" + str(os.getpid()))
            else:
                self.path = path
    
    
            if not os.path.exists(self.path):
                os.mkdir(self.path)
    
    
            self.slave_timeout = 5 # 5 mins: need a parameter for that
    
            self.balancer = netrender.balancing.Balancer()
            self.balancer.addRule(netrender.balancing.RatingUsageByCategory(self.getJobs))
            self.balancer.addRule(netrender.balancing.RatingUsage())
            self.balancer.addException(netrender.balancing.ExcludeQueuedEmptyJob())
            self.balancer.addException(netrender.balancing.ExcludeSlavesLimit(self.countJobs, self.countSlaves, limit = 0.9))
            self.balancer.addPriority(netrender.balancing.NewJobPriority())
            self.balancer.addPriority(netrender.balancing.MinimumTimeBetweenDispatchPriority(limit = 2))
    
    
            super().__init__(address, handler_class)
    
    
        def restore(self, jobs, slaves, balancer = None):
            self.jobs = jobs
            self.jobs_map = {}
            
            for job in self.jobs:
                self.jobs_map[job.id] = job
                self.job_id = max(self.job_id, int(job.id))
    
            self.slaves = slaves
            for slave in self.slaves:
                self.slaves_map[slave.id] = slave
            
            if balancer:
                self.balancer = balancer
            
    
        def nextJobID(self):
            self.job_id += 1
            return str(self.job_id)
    
    
    Martin Poirier's avatar
    Martin Poirier committed
        def addSlave(self, slave_info):
            slave = MRenderSlave(slave_info)
    
            self.slaves.append(slave)
            self.slaves_map[slave.id] = slave
    
            return slave.id
    
        def removeSlave(self, slave):
            self.slaves.remove(slave)
            self.slaves_map.pop(slave.id)
    
        def getSlave(self, slave_id):
            return self.slaves_map.get(slave_id)
    
        def getSeenSlave(self, slave_id):
            slave = self.getSlave(slave_id)
            if slave:
                slave.seen()
    
            return slave
    
        def timeoutSlaves(self):
            removed = []
    
            t = time.time()
    
            for slave in self.slaves:
                if (t - slave.last_seen) / 60 > self.slave_timeout:
                    removed.append(slave)
    
                    if slave.job:
                        for f in slave.job_frames:
                            slave.job[f].status = ERROR
    
            for slave in removed:
                self.removeSlave(slave)
    
        def updateUsage(self):
            blend = 0.5
            for job in self.jobs:
                job.usage *= (1 - blend)
    
            if self.slaves:
                slave_usage = blend / self.countSlaves()
    
                for slave in self.slaves:
                    if slave.job:
                        slave.job.usage += slave_usage
    
    
        def clear(self, clear_files = False):
            removed = self.jobs[:]
    
            for job in removed:
                self.removeJob(job, clear_files)
    
        def balance(self):
            self.balancer.balance(self.jobs)
    
        def getJobs(self):
            return self.jobs
    
        def countJobs(self, status = JOB_QUEUED):
            total = 0
            for j in self.jobs:
                if j.status == status:
                    total += 1
    
            return total
    
        def countSlaves(self):
            return len(self.slaves)
    
        def removeJob(self, job, clear_files = False):
            self.jobs.remove(job)
            self.jobs_map.pop(job.id)