Skip to content
Snippets Groups Projects
Commit 294fe2ad authored by Martin Poirier's avatar Martin Poirier
Browse files

netrender

- fix handling of pointcache dependencies on slave (correct repathing) and on client (bad frame dependency range)
- add Force Upload option on master to force dependencies upload and download (don't reuse local copies outside of slave cache)
- add links back to main page on job web page
- client limit dependencies to rendered frame range (it used to add all point cache frames)

Thanks to Philippe Van Hecke for raising the issue by email.
parent b99dae24
No related branches found
No related tags found
No related merge requests found
...@@ -39,16 +39,14 @@ def addFluidFiles(job, path): ...@@ -39,16 +39,14 @@ def addFluidFiles(job, path):
# fluid frames starts at 0, which explains the +1 # fluid frames starts at 0, which explains the +1
# This is stupid # This is stupid
current_frame = int(match.groups()[1]) + 1 current_frame = int(match.groups()[1]) + 1
job.addFile(path + fluid_file, current_frame, current_frame) job.addFile(os.path.join(path, fluid_file), current_frame, current_frame)
def addPointCache(job, ob, point_cache, default_path): def addPointCache(job, ob, point_cache, default_path):
if not point_cache.use_disk_cache: if not point_cache.use_disk_cache:
return return
name = point_cache.name name = cacheName(ob, point_cache)
if name == "":
name = "".join(["%02X" % ord(c) for c in ob.name])
cache_path = bpy.path.abspath(point_cache.filepath) if point_cache.use_external else default_path cache_path = bpy.path.abspath(point_cache.filepath) if point_cache.use_external else default_path
...@@ -70,7 +68,7 @@ def addPointCache(job, ob, point_cache, default_path): ...@@ -70,7 +68,7 @@ def addPointCache(job, ob, point_cache, default_path):
if len(cache_files) == 1: if len(cache_files) == 1:
cache_frame, cache_file = cache_files[0] cache_frame, cache_file = cache_files[0]
job.addFile(cache_path + cache_file, cache_frame, cache_frame) job.addFile(os.path.join(cache_path, cache_file), cache_frame, cache_frame)
else: else:
for i in range(len(cache_files)): for i in range(len(cache_files)):
current_item = cache_files[i] current_item = cache_files[i]
...@@ -79,18 +77,18 @@ def addPointCache(job, ob, point_cache, default_path): ...@@ -79,18 +77,18 @@ def addPointCache(job, ob, point_cache, default_path):
current_frame, current_file = current_item current_frame, current_file = current_item
if not next_item and not previous_item: if not next_item and not previous_item:
job.addFile(cache_path + current_file, current_frame, current_frame) job.addFile(os.path.join(cache_path, current_file), current_frame, current_frame)
elif next_item and not previous_item: elif next_item and not previous_item:
next_frame = next_item[0] next_frame = next_item[0]
job.addFile(cache_path + current_file, current_frame, next_frame - 1) job.addFile(os.path.join(cache_path, current_file), current_frame, next_frame)
elif not next_item and previous_item: elif not next_item and previous_item:
previous_frame = previous_item[0] previous_frame = previous_item[0]
job.addFile(cache_path + current_file, previous_frame + 1, current_frame) job.addFile(os.path.join(cache_path, current_file), previous_frame, current_frame)
else: else:
next_frame = next_item[0] next_frame = next_item[0]
previous_frame = previous_item[0] previous_frame = previous_item[0]
job.addFile(cache_path + current_file, previous_frame + 1, next_frame - 1) job.addFile(os.path.join(cache_path, current_file), previous_frame, next_frame)
def fillCommonJobSettings(job, job_name, netsettings): def fillCommonJobSettings(job, job_name, netsettings):
job.name = job_name job.name = job_name
...@@ -219,8 +217,7 @@ def clientSendJobBlender(conn, scene, anim = False): ...@@ -219,8 +217,7 @@ def clientSendJobBlender(conn, scene, anim = False):
########################### ###########################
# FLUID + POINT CACHE # FLUID + POINT CACHE
########################### ###########################
root, ext = os.path.splitext(name) default_path = cachePath(filename)
default_path = path + os.sep + "blendcache_" + root + os.sep # need an API call for that
for object in bpy.data.objects: for object in bpy.data.objects:
for modifier in object.modifiers: for modifier in object.modifiers:
...@@ -294,7 +291,13 @@ class NetworkRenderEngine(bpy.types.RenderEngine): ...@@ -294,7 +291,13 @@ class NetworkRenderEngine(bpy.types.RenderEngine):
address = "" if netsettings.server_address == "[default]" else netsettings.server_address address = "" if netsettings.server_address == "[default]" else netsettings.server_address
master.runMaster((address, netsettings.server_port), netsettings.use_master_broadcast, netsettings.use_master_clear, bpy.path.abspath(netsettings.path), self.update_stats, self.test_break) master.runMaster(address = (address, netsettings.server_port),
broadcast = netsettings.use_master_broadcast,
clear = netsettings.use_master_clear,
force = netsettings.use_master_force_upload,
path = bpy.path.abspath(netsettings.path),
update_stats = self.update_stats,
test_break = self.test_break)
def render_slave(self, scene): def render_slave(self, scene):
......
...@@ -34,11 +34,21 @@ class MRenderFile(netrender.model.RenderFile): ...@@ -34,11 +34,21 @@ class MRenderFile(netrender.model.RenderFile):
super().__init__(filepath, index, start, end, signature) super().__init__(filepath, index, start, end, signature)
self.found = False self.found = False
def test(self): def updateStatus(self):
self.found = os.path.exists(self.filepath) self.found = os.path.exists(self.filepath)
if self.found and self.signature != None: if self.found and self.signature != None:
found_signature = hashFile(self.filepath) found_signature = hashFile(self.filepath)
self.found = self.signature == found_signature self.found = self.signature == found_signature
if not self.found:
print("Signature mismatch", self.signature, found_signature)
return self.found
def test(self):
# don't check when forcing upload and only until found
if not self.force and not self.found:
self.updateStatus()
return self.found return self.found
...@@ -86,6 +96,10 @@ class MRenderJob(netrender.model.RenderJob): ...@@ -86,6 +96,10 @@ class MRenderJob(netrender.model.RenderJob):
self.last_update = 0 self.last_update = 0
self.save_path = "" self.save_path = ""
self.files = [MRenderFile(rfile.filepath, rfile.index, rfile.start, rfile.end, rfile.signature) for rfile in job_info.files] self.files = [MRenderFile(rfile.filepath, rfile.index, rfile.start, rfile.end, rfile.signature) for rfile in job_info.files]
def setForceUpload(self, force):
for rfile in self.files:
rfile.force = force
def initInfo(self): def initInfo(self):
if not self.resolution: if not self.resolution:
...@@ -514,6 +528,8 @@ class RenderHandler(http.server.BaseHTTPRequestHandler): ...@@ -514,6 +528,8 @@ class RenderHandler(http.server.BaseHTTPRequestHandler):
job_id = self.server.nextJobID() job_id = self.server.nextJobID()
job = MRenderJob(job_id, job_info) job = MRenderJob(job_id, job_info)
job.setForceUpload(self.server.force)
for frame in job_info.frames: for frame in job_info.frames:
frame = job.addFrame(frame.number, frame.command) frame = job.addFrame(frame.number, frame.command)
...@@ -701,9 +717,8 @@ class RenderHandler(http.server.BaseHTTPRequestHandler): ...@@ -701,9 +717,8 @@ class RenderHandler(http.server.BaseHTTPRequestHandler):
match = file_pattern.match(self.path) match = file_pattern.match(self.path)
if match: if match:
self.server.stats("", "Receiving job") self.server.stats("", "Receiving job file")
length = int(self.headers['content-length'])
job_id = match.groups()[0] job_id = match.groups()[0]
file_index = int(match.groups()[1]) file_index = int(match.groups()[1])
...@@ -719,7 +734,7 @@ class RenderHandler(http.server.BaseHTTPRequestHandler): ...@@ -719,7 +734,7 @@ class RenderHandler(http.server.BaseHTTPRequestHandler):
main_path, main_name = os.path.split(main_file) main_path, main_name = os.path.split(main_file)
if file_index > 0: if file_index > 0:
file_path = prefixPath(job.save_path, render_file.filepath, main_path) file_path = prefixPath(job.save_path, render_file.filepath, main_path, force = True)
else: else:
file_path = os.path.join(job.save_path, main_name) file_path = os.path.join(job.save_path, main_name)
...@@ -728,12 +743,16 @@ class RenderHandler(http.server.BaseHTTPRequestHandler): ...@@ -728,12 +743,16 @@ class RenderHandler(http.server.BaseHTTPRequestHandler):
self.write_file(file_path) self.write_file(file_path)
render_file.filepath = file_path # set the new path render_file.filepath = file_path # set the new path
found = render_file.updateStatus() # make sure we have the right file
if job.testStart():
if not found: # checksum mismatch
self.server.stats("", "File upload but checksum mismatch, this shouldn't happen")
self.send_head(http.client.CONFLICT)
elif job.testStart(): # started correctly
self.server.stats("", "File upload, starting job") self.server.stats("", "File upload, starting job")
self.send_head(content = None) self.send_head(content = None)
else: else:
self.server.stats("", "File upload, file missings") self.server.stats("", "File upload, dependency files still missing")
self.send_head(http.client.ACCEPTED) self.send_head(http.client.ACCEPTED)
else: # invalid file else: # invalid file
print("file not found", job_id, file_index) print("file not found", job_id, file_index)
...@@ -851,12 +870,13 @@ class RenderHandler(http.server.BaseHTTPRequestHandler): ...@@ -851,12 +870,13 @@ class RenderHandler(http.server.BaseHTTPRequestHandler):
self.send_head(http.client.NO_CONTENT) self.send_head(http.client.NO_CONTENT)
class RenderMasterServer(socketserver.ThreadingMixIn, http.server.HTTPServer): class RenderMasterServer(socketserver.ThreadingMixIn, http.server.HTTPServer):
def __init__(self, address, handler_class, path, subdir=True): def __init__(self, address, handler_class, path, force=False, subdir=True):
self.jobs = [] self.jobs = []
self.jobs_map = {} self.jobs_map = {}
self.slaves = [] self.slaves = []
self.slaves_map = {} self.slaves_map = {}
self.job_id = 0 self.job_id = 0
self.force = force
if subdir: if subdir:
self.path = os.path.join(path, "master_" + str(os.getpid())) self.path = os.path.join(path, "master_" + str(os.getpid()))
...@@ -1012,7 +1032,7 @@ class RenderMasterServer(socketserver.ThreadingMixIn, http.server.HTTPServer): ...@@ -1012,7 +1032,7 @@ class RenderMasterServer(socketserver.ThreadingMixIn, http.server.HTTPServer):
def clearMaster(path): def clearMaster(path):
shutil.rmtree(path) shutil.rmtree(path)
def createMaster(address, clear, path): def createMaster(address, clear, force, path):
filepath = os.path.join(path, "blender_master.data") filepath = os.path.join(path, "blender_master.data")
if not clear and os.path.exists(filepath): if not clear and os.path.exists(filepath):
...@@ -1020,12 +1040,12 @@ def createMaster(address, clear, path): ...@@ -1020,12 +1040,12 @@ def createMaster(address, clear, path):
with open(filepath, 'rb') as f: with open(filepath, 'rb') as f:
path, jobs, slaves = pickle.load(f) path, jobs, slaves = pickle.load(f)
httpd = RenderMasterServer(address, RenderHandler, path, subdir=False) httpd = RenderMasterServer(address, RenderHandler, path, force=force, subdir=False)
httpd.restore(jobs, slaves) httpd.restore(jobs, slaves)
return httpd return httpd
return RenderMasterServer(address, RenderHandler, path) return RenderMasterServer(address, RenderHandler, path, force=force)
def saveMaster(path, httpd): def saveMaster(path, httpd):
filepath = os.path.join(path, "blender_master.data") filepath = os.path.join(path, "blender_master.data")
...@@ -1033,8 +1053,8 @@ def saveMaster(path, httpd): ...@@ -1033,8 +1053,8 @@ def saveMaster(path, httpd):
with open(filepath, 'wb') as f: with open(filepath, 'wb') as f:
pickle.dump((httpd.path, httpd.jobs, httpd.slaves), f, pickle.HIGHEST_PROTOCOL) pickle.dump((httpd.path, httpd.jobs, httpd.slaves), f, pickle.HIGHEST_PROTOCOL)
def runMaster(address, broadcast, clear, path, update_stats, test_break): def runMaster(address, broadcast, clear, force, path, update_stats, test_break):
httpd = createMaster(address, clear, path) httpd = createMaster(address, clear, force, path)
httpd.timeout = 1 httpd.timeout = 1
httpd.stats = update_stats httpd.stats = update_stats
......
...@@ -215,6 +215,8 @@ def get(handler): ...@@ -215,6 +215,8 @@ def get(handler):
job_id = handler.path[9:] job_id = handler.path[9:]
head("NetRender") head("NetRender")
output(link("Back to Main Page", "/html"))
job = handler.server.getJobID(job_id) job = handler.server.getJobID(job_id)
...@@ -311,5 +313,7 @@ def get(handler): ...@@ -311,5 +313,7 @@ def get(handler):
else: else:
output("no such job") output("no such job")
output(link("Back to Main Page", "/html"))
output("</body></html>") output("</body></html>")
...@@ -154,13 +154,14 @@ class VersioningInfo: ...@@ -154,13 +154,14 @@ class VersioningInfo:
class RenderFile: class RenderFile:
def __init__(self, filepath = "", index = 0, start = -1, end = -1, signature=0): def __init__(self, filepath = "", index = 0, start = -1, end = -1, signature = 0):
self.filepath = filepath self.filepath = filepath
self.original_path = filepath self.original_path = filepath
self.signature = signature self.signature = signature
self.index = index self.index = index
self.start = start self.start = start
self.end = end self.end = end
self.force = False
def serialize(self): def serialize(self):
return { return {
...@@ -169,7 +170,8 @@ class RenderFile: ...@@ -169,7 +170,8 @@ class RenderFile:
"index": self.index, "index": self.index,
"start": self.start, "start": self.start,
"end": self.end, "end": self.end,
"signature": self.signature "signature": self.signature,
"force": self.force
} }
@staticmethod @staticmethod
...@@ -179,6 +181,7 @@ class RenderFile: ...@@ -179,6 +181,7 @@ class RenderFile:
rfile = RenderFile(data["filepath"], data["index"], data["start"], data["end"], data["signature"]) rfile = RenderFile(data["filepath"], data["index"], data["start"], data["end"], data["signature"])
rfile.original_path = data["original_path"] rfile.original_path = data["original_path"]
rfile.force = data["force"]
return rfile return rfile
...@@ -221,11 +224,23 @@ class RenderJob: ...@@ -221,11 +224,23 @@ class RenderJob:
return self.type in (JOB_BLENDER, JOB_VCS) return self.type in (JOB_BLENDER, JOB_VCS)
def addFile(self, file_path, start=-1, end=-1, signed=True): def addFile(self, file_path, start=-1, end=-1, signed=True):
if signed: def isFileInFrames():
signature = hashFile(file_path) if start == end == -1:
else: return True
signature = None
self.files.append(RenderFile(file_path, len(self.files), start, end, signature)) for rframe in self.frames:
if start <= rframe.number<= end:
return True
return False
if isFileInFrames():
if signed:
signature = hashFile(file_path)
else:
signature = None
self.files.append(RenderFile(file_path, len(self.files), start, end, signature))
def addFrame(self, frame_number, command = ""): def addFrame(self, frame_number, command = ""):
frame = RenderFrame(frame_number, command) frame = RenderFrame(frame_number, command)
......
...@@ -57,15 +57,23 @@ def update(job): ...@@ -57,15 +57,23 @@ def update(job):
# Only update if needed # Only update if needed
if paths: if paths:
process = subprocess.Popen([BLENDER_PATH, "-b", "-noaudio", job_full_path, "-P", __file__, "--", new_path] + paths, stdout=subprocess.PIPE, stderr=subprocess.STDOUT) process = subprocess.Popen([BLENDER_PATH, "-b", "-noaudio", job_full_path, "-P", __file__, "--", new_path] + paths, stdout=sys.stdout, stderr=subprocess.STDOUT)
process.wait() process.wait()
os.renames(job_full_path, job_full_path + ".bak") os.renames(job_full_path, job_full_path + ".bak")
os.renames(new_path, job_full_path) os.renames(new_path, job_full_path)
def process(paths): def process(paths):
def processPointCache(point_cache): def processPointCache(ob, point_cache):
point_cache.use_external = False if not point_cache.use_disk_cache:
return
cache_name = cacheName(ob, point_cache)
new_path = path_map.get(cache_name, None)
if new_path:
point_cache.use_external = True
point_cache.filepath = new_path
point_cache.name = cache_name
def processFluid(fluid): def processFluid(fluid):
new_path = path_map.get(fluid.filepath, None) new_path = path_map.get(fluid.filepath, None)
...@@ -76,15 +84,16 @@ def process(paths): ...@@ -76,15 +84,16 @@ def process(paths):
for i in range(0, len(paths), 2): for i in range(0, len(paths), 2):
# special case for point cache # special case for point cache
if paths[i].endswith(".bphys"): if paths[i].endswith(".bphys"):
pass # Don't need them in the map, they all use the default external path path, filename = os.path.split(paths[i+1])
# NOTE: This is probably not correct all the time, need to be fixed. cache_name = filename.split("_")[0]
path_map[cache_name] = path
# special case for fluids # special case for fluids
elif paths[i].endswith(".bobj.gz"): elif paths[i].endswith(".bobj.gz"):
path_map[os.path.split(paths[i])[0]] = os.path.split(paths[i+1])[0] path_map[os.path.split(paths[i])[0]] = os.path.split(paths[i+1])[0]
else: else:
path_map[os.path.split(paths[i])[1]] = paths[i+1] path_map[os.path.split(paths[i])[1]] = paths[i+1]
# TODO original paths aren't really the orignal path (they are the normalized path # TODO original paths aren't really the original path, they are the normalized path
# so we repath using the filenames only. # so we repath using the filenames only.
########################### ###########################
...@@ -113,11 +122,11 @@ def process(paths): ...@@ -113,11 +122,11 @@ def process(paths):
for object in bpy.data.objects: for object in bpy.data.objects:
for modifier in object.modifiers: for modifier in object.modifiers:
if modifier.type == 'FLUID_SIMULATION' and modifier.settings.type == "DOMAIN": if modifier.type == 'FLUID_SIMULATION' and modifier.settings.type == "DOMAIN":
processFluid(modifier.settings) processFluid(object, modifier.settings)
elif modifier.type == "CLOTH": elif modifier.type == "CLOTH":
processPointCache(modifier.point_cache) processPointCache(object, modifier.point_cache)
elif modifier.type == "SOFT_BODY": elif modifier.type == "SOFT_BODY":
processPointCache(modifier.point_cache) processPointCache(object, modifier.point_cache)
elif modifier.type == "SMOKE" and modifier.smoke_type == "TYPE_DOMAIN": elif modifier.type == "SMOKE" and modifier.smoke_type == "TYPE_DOMAIN":
processPointCache(modifier.domain_settings.point_cache_low) processPointCache(modifier.domain_settings.point_cache_low)
if modifier.domain_settings.use_high_resolution: if modifier.domain_settings.use_high_resolution:
......
...@@ -77,7 +77,7 @@ def testCancel(conn, job_id, frame_number): ...@@ -77,7 +77,7 @@ def testCancel(conn, job_id, frame_number):
return False return False
def testFile(conn, job_id, slave_id, rfile, JOB_PREFIX, main_path=None): def testFile(conn, job_id, slave_id, rfile, JOB_PREFIX, main_path=None):
job_full_path = prefixPath(JOB_PREFIX, rfile.filepath, main_path) job_full_path = prefixPath(JOB_PREFIX, rfile.filepath, main_path, force=rfile.force)
found = os.path.exists(job_full_path) found = os.path.exists(job_full_path)
...@@ -88,11 +88,11 @@ def testFile(conn, job_id, slave_id, rfile, JOB_PREFIX, main_path=None): ...@@ -88,11 +88,11 @@ def testFile(conn, job_id, slave_id, rfile, JOB_PREFIX, main_path=None):
if not found: if not found:
print("Found file %s at %s but signature mismatch!" % (rfile.filepath, job_full_path)) print("Found file %s at %s but signature mismatch!" % (rfile.filepath, job_full_path))
os.remove(job_full_path) os.remove(job_full_path)
job_full_path = prefixPath(JOB_PREFIX, rfile.filepath, main_path, force=True)
if not found: if not found:
# Force prefix path if not found # Force prefix path if not found
job_full_path = prefixPath(JOB_PREFIX, rfile.filepath, main_path, force=True) job_full_path = prefixPath(JOB_PREFIX, rfile.filepath, main_path, force=True)
print("Downloading", job_full_path)
temp_path = os.path.join(JOB_PREFIX, "slave.temp") temp_path = os.path.join(JOB_PREFIX, "slave.temp")
with ConnectionContext(): with ConnectionContext():
conn.request("GET", fileURL(job_id, rfile.index), headers={"slave-id":slave_id}) conn.request("GET", fileURL(job_id, rfile.index), headers={"slave-id":slave_id})
......
...@@ -184,6 +184,7 @@ class RENDER_PT_network_master_settings(NetRenderButtonsPanel, bpy.types.Panel): ...@@ -184,6 +184,7 @@ class RENDER_PT_network_master_settings(NetRenderButtonsPanel, bpy.types.Panel):
netsettings = context.scene.network_render netsettings = context.scene.network_render
layout.prop(netsettings, "use_master_broadcast") layout.prop(netsettings, "use_master_broadcast")
layout.prop(netsettings, "use_master_force_upload")
layout.prop(netsettings, "use_master_clear") layout.prop(netsettings, "use_master_clear")
class RENDER_PT_network_job(NetRenderButtonsPanel, bpy.types.Panel): class RENDER_PT_network_job(NetRenderButtonsPanel, bpy.types.Panel):
...@@ -417,7 +418,12 @@ class NetRenderSettings(bpy.types.PropertyGroup): ...@@ -417,7 +418,12 @@ class NetRenderSettings(bpy.types.PropertyGroup):
NetRenderSettings.use_master_clear = BoolProperty( NetRenderSettings.use_master_clear = BoolProperty(
name="Clear on exit", name="Clear on exit",
description="delete saved files on exit", description="Delete saved files on exit",
default = False)
NetRenderSettings.use_master_force_upload = BoolProperty(
name="Force Dependency Upload",
description="Force client to upload dependency files to master",
default = False) default = False)
default_path = os.environ.get("TEMP") default_path = os.environ.get("TEMP")
......
...@@ -230,7 +230,18 @@ def hashData(data): ...@@ -230,7 +230,18 @@ def hashData(data):
m = hashlib.md5() m = hashlib.md5()
m.update(data) m.update(data)
return m.hexdigest() return m.hexdigest()
def cacheName(ob, point_cache):
name = point_cache.name
if name == "":
name = "".join(["%02X" % ord(c) for c in ob.name])
return name
def cachePath(file_path):
path, name = os.path.split(file_path)
root, ext = os.path.splitext(name)
return path + os.sep + "blendcache_" + root # need an API call for that
def prefixPath(prefix_directory, file_path, prefix_path, force = False): def prefixPath(prefix_directory, file_path, prefix_path, force = False):
if (os.path.isabs(file_path) or if (os.path.isabs(file_path) or
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment