Skip to content
Snippets Groups Projects
Commit a82f4c03 authored by Martin Poirier's avatar Martin Poirier
Browse files

netrender

better stream handling when uploading files
Might solve the problem with OS X masters
parent 20c83a9d
No related branches found
No related tags found
No related merge requests found
...@@ -213,7 +213,9 @@ class RenderHandler(http.server.BaseHTTPRequestHandler): ...@@ -213,7 +213,9 @@ class RenderHandler(http.server.BaseHTTPRequestHandler):
def send_head(self, code = http.client.OK, headers = {}, content = "application/octet-stream"): def send_head(self, code = http.client.OK, headers = {}, content = "application/octet-stream"):
self.send_response(code) self.send_response(code)
self.send_header("Content-type", content)
if code != http.client.OK and content:
self.send_header("Content-type", content)
for key, value in headers.items(): for key, value in headers.items():
self.send_header(key, value) self.send_header(key, value)
...@@ -512,7 +514,7 @@ class RenderHandler(http.server.BaseHTTPRequestHandler): ...@@ -512,7 +514,7 @@ class RenderHandler(http.server.BaseHTTPRequestHandler):
if job.testStart(): if job.testStart():
self.server.stats("", "New job, started") self.server.stats("", "New job, started")
self.send_head(headers=headers) self.send_head(headers=headers, content = None)
else: else:
self.server.stats("", "New job, missing files (%i total)" % len(job.files)) self.server.stats("", "New job, missing files (%i total)" % len(job.files))
self.send_head(http.client.ACCEPTED, headers=headers) self.send_head(http.client.ACCEPTED, headers=headers)
...@@ -529,7 +531,7 @@ class RenderHandler(http.server.BaseHTTPRequestHandler): ...@@ -529,7 +531,7 @@ class RenderHandler(http.server.BaseHTTPRequestHandler):
info_map = self.getInfoMap() info_map = self.getInfoMap()
job.edit(info_map) job.edit(info_map)
self.send_head() self.send_head(content = None)
else: else:
# no such job id # no such job id
self.send_head(http.client.NO_CONTENT) self.send_head(http.client.NO_CONTENT)
...@@ -547,7 +549,7 @@ class RenderHandler(http.server.BaseHTTPRequestHandler): ...@@ -547,7 +549,7 @@ class RenderHandler(http.server.BaseHTTPRequestHandler):
except: except:
pass # invalid type pass # invalid type
self.send_head() self.send_head(content = None)
# =-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=- # =-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-
elif self.path == "/balance_enable": elif self.path == "/balance_enable":
info_map = self.getInfoMap() info_map = self.getInfoMap()
...@@ -556,7 +558,7 @@ class RenderHandler(http.server.BaseHTTPRequestHandler): ...@@ -556,7 +558,7 @@ class RenderHandler(http.server.BaseHTTPRequestHandler):
if rule: if rule:
rule.enabled = enabled rule.enabled = enabled
self.send_head() self.send_head(content = None)
# =-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=- # =-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-
elif self.path.startswith("/cancel"): elif self.path.startswith("/cancel"):
match = cancel_pattern.match(self.path) match = cancel_pattern.match(self.path)
...@@ -572,7 +574,7 @@ class RenderHandler(http.server.BaseHTTPRequestHandler): ...@@ -572,7 +574,7 @@ class RenderHandler(http.server.BaseHTTPRequestHandler):
if job: if job:
self.server.stats("", "Cancelling job") self.server.stats("", "Cancelling job")
self.server.removeJob(job, clear) self.server.removeJob(job, clear)
self.send_head() self.send_head(content = None)
else: else:
# no such job id # no such job id
self.send_head(http.client.NO_CONTENT) self.send_head(http.client.NO_CONTENT)
...@@ -594,7 +596,7 @@ class RenderHandler(http.server.BaseHTTPRequestHandler): ...@@ -594,7 +596,7 @@ class RenderHandler(http.server.BaseHTTPRequestHandler):
if job: if job:
self.server.stats("", "Pausing job") self.server.stats("", "Pausing job")
job.pause(status) job.pause(status)
self.send_head() self.send_head(content = None)
else: else:
# no such job id # no such job id
self.send_head(http.client.NO_CONTENT) self.send_head(http.client.NO_CONTENT)
...@@ -610,7 +612,7 @@ class RenderHandler(http.server.BaseHTTPRequestHandler): ...@@ -610,7 +612,7 @@ class RenderHandler(http.server.BaseHTTPRequestHandler):
self.server.stats("", "Clearing jobs") self.server.stats("", "Clearing jobs")
self.server.clear(clear) self.server.clear(clear)
self.send_head() self.send_head(content = None)
# =-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=- # =-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-
elif self.path.startswith("/reset"): elif self.path.startswith("/reset"):
match = reset_pattern.match(self.path) match = reset_pattern.match(self.path)
...@@ -629,7 +631,7 @@ class RenderHandler(http.server.BaseHTTPRequestHandler): ...@@ -629,7 +631,7 @@ class RenderHandler(http.server.BaseHTTPRequestHandler):
if frame: if frame:
self.server.stats("", "Reset job frame") self.server.stats("", "Reset job frame")
frame.reset(all) frame.reset(all)
self.send_head() self.send_head(content = None)
else: else:
# no such frame # no such frame
self.send_head(http.client.NO_CONTENT) self.send_head(http.client.NO_CONTENT)
...@@ -637,7 +639,7 @@ class RenderHandler(http.server.BaseHTTPRequestHandler): ...@@ -637,7 +639,7 @@ class RenderHandler(http.server.BaseHTTPRequestHandler):
else: else:
self.server.stats("", "Reset job") self.server.stats("", "Reset job")
job.reset(all) job.reset(all)
self.send_head() self.send_head(content = None)
else: # job not found else: # job not found
self.send_head(http.client.NO_CONTENT) self.send_head(http.client.NO_CONTENT)
...@@ -654,7 +656,7 @@ class RenderHandler(http.server.BaseHTTPRequestHandler): ...@@ -654,7 +656,7 @@ class RenderHandler(http.server.BaseHTTPRequestHandler):
slave_id = self.server.addSlave(slave_info.name, self.client_address, slave_info.stats) slave_id = self.server.addSlave(slave_info.name, self.client_address, slave_info.stats)
self.send_head(headers = {"slave-id": slave_id}) self.send_head(headers = {"slave-id": slave_id}, content = None)
# =-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=- # =-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-
elif self.path == "/log": elif self.path == "/log":
length = int(self.headers['content-length']) length = int(self.headers['content-length'])
...@@ -671,7 +673,7 @@ class RenderHandler(http.server.BaseHTTPRequestHandler): ...@@ -671,7 +673,7 @@ class RenderHandler(http.server.BaseHTTPRequestHandler):
if job: if job:
self.server.stats("", "Log announcement") self.server.stats("", "Log announcement")
job.addLog(log_info.frames) job.addLog(log_info.frames)
self.send_head(http.client.OK) self.send_head(content = None)
else: else:
# no such job id # no such job id
self.send_head(http.client.NO_CONTENT) self.send_head(http.client.NO_CONTENT)
...@@ -691,7 +693,6 @@ class RenderHandler(http.server.BaseHTTPRequestHandler): ...@@ -691,7 +693,6 @@ class RenderHandler(http.server.BaseHTTPRequestHandler):
if match: if match:
self.server.stats("", "Receiving job") self.server.stats("", "Receiving job")
length = int(self.headers['content-length'])
job_id = match.groups()[0] job_id = match.groups()[0]
file_index = int(match.groups()[1]) file_index = int(match.groups()[1])
...@@ -711,20 +712,17 @@ class RenderHandler(http.server.BaseHTTPRequestHandler): ...@@ -711,20 +712,17 @@ class RenderHandler(http.server.BaseHTTPRequestHandler):
else: else:
file_path = os.path.join(job.save_path, main_name) file_path = os.path.join(job.save_path, main_name)
buf = self.rfile.read(length)
# add same temp file + renames as slave # add same temp file + renames as slave
f = open(file_path, "wb") f = open(file_path, "wb")
f.write(buf) shutil.copyfileobj(self.rfile, f)
f.close() f.close()
del buf
render_file.filepath = file_path # set the new path render_file.filepath = file_path # set the new path
if job.testStart(): if job.testStart():
self.server.stats("", "File upload, starting job") self.server.stats("", "File upload, starting job")
self.send_head(http.client.OK) self.send_head(content = None)
else: else:
self.server.stats("", "File upload, file missings") self.server.stats("", "File upload, file missings")
self.send_head(http.client.ACCEPTED) self.send_head(http.client.ACCEPTED)
...@@ -758,15 +756,14 @@ class RenderHandler(http.server.BaseHTTPRequestHandler): ...@@ -758,15 +756,14 @@ class RenderHandler(http.server.BaseHTTPRequestHandler):
frame = job[job_frame] frame = job[job_frame]
if frame: if frame:
self.send_head(content = None)
if job.hasRenderResult(): if job.hasRenderResult():
if job_result == DONE: if job_result == DONE:
length = int(self.headers['content-length'])
buf = self.rfile.read(length)
f = open(os.path.join(job.save_path, "%06d.exr" % job_frame), 'wb') f = open(os.path.join(job.save_path, "%06d.exr" % job_frame), 'wb')
f.write(buf) shutil.copyfileobj(self.rfile, f)
f.close() f.close()
del buf
elif job_result == ERROR: elif job_result == ERROR:
# blacklist slave on this job on error # blacklist slave on this job on error
# slaves might already be in blacklist if errors on the whole chunk # slaves might already be in blacklist if errors on the whole chunk
...@@ -780,9 +777,6 @@ class RenderHandler(http.server.BaseHTTPRequestHandler): ...@@ -780,9 +777,6 @@ class RenderHandler(http.server.BaseHTTPRequestHandler):
job.testFinished() job.testFinished()
self.send_head()
# need some message content here or the slave doesn't like it
self.wfile.write(bytes("foo", encoding='utf8'))
else: # frame not found else: # frame not found
self.send_head(http.client.NO_CONTENT) self.send_head(http.client.NO_CONTENT)
else: # job not found else: # job not found
...@@ -808,18 +802,13 @@ class RenderHandler(http.server.BaseHTTPRequestHandler): ...@@ -808,18 +802,13 @@ class RenderHandler(http.server.BaseHTTPRequestHandler):
frame = job[job_frame] frame = job[job_frame]
if frame: if frame:
self.send_head(content = None)
if job.hasRenderResult(): if job.hasRenderResult():
length = int(self.headers['content-length'])
buf = self.rfile.read(length)
f = open(os.path.join(job.save_path, "%06d.jpg" % job_frame), 'wb') f = open(os.path.join(job.save_path, "%06d.jpg" % job_frame), 'wb')
f.write(buf) shutil.copyfileobj(self.rfile, f)
f.close() f.close()
del buf
self.send_head()
# need some message content here or the slave doesn't like it
self.wfile.write(bytes("foo", encoding='utf8'))
else: # frame not found else: # frame not found
self.send_head(http.client.NO_CONTENT) self.send_head(http.client.NO_CONTENT)
else: # job not found else: # job not found
...@@ -843,17 +832,14 @@ class RenderHandler(http.server.BaseHTTPRequestHandler): ...@@ -843,17 +832,14 @@ class RenderHandler(http.server.BaseHTTPRequestHandler):
frame = job[job_frame] frame = job[job_frame]
if frame and frame.log_path: if frame and frame.log_path:
length = int(self.headers['content-length']) self.send_head(content = None)
buf = self.rfile.read(length)
f = open(frame.log_path, 'ab') f = open(frame.log_path, 'ab')
f.write(buf) shutil.copyfileobj(self.rfile, f)
f.close() f.close()
del buf
self.server.getSeenSlave(self.headers['slave-id']) self.server.getSeenSlave(self.headers['slave-id'])
self.send_head()
else: # frame not found else: # frame not found
self.send_head(http.client.NO_CONTENT) self.send_head(http.client.NO_CONTENT)
else: # job not found else: # job not found
......
...@@ -86,6 +86,7 @@ def testFile(conn, job_id, slave_id, rfile, JOB_PREFIX, main_path = None): ...@@ -86,6 +86,7 @@ def testFile(conn, job_id, slave_id, rfile, JOB_PREFIX, main_path = None):
if not found: if not found:
print("Found file %s at %s but signature mismatch!" % (rfile.filepath, job_full_path)) print("Found file %s at %s but signature mismatch!" % (rfile.filepath, job_full_path))
os.remove(job_full_path)
job_full_path = prefixPath(JOB_PREFIX, rfile.filepath, main_path, force = True) job_full_path = prefixPath(JOB_PREFIX, rfile.filepath, main_path, force = True)
if not found: if not found:
...@@ -258,8 +259,7 @@ def render_slave(engine, netsettings, threads): ...@@ -258,8 +259,7 @@ def render_slave(engine, netsettings, threads):
if stdout: if stdout:
# (only need to update on one frame, they are linked # (only need to update on one frame, they are linked
conn.request("PUT", logURL(job.id, first_frame), stdout, headers=headers) conn.request("PUT", logURL(job.id, first_frame), stdout, headers=headers)
response = conn.getresponse() responseStatus(conn)
response.read()
# Also output on console # Also output on console
if netsettings.use_slave_output_log: if netsettings.use_slave_output_log:
......
...@@ -92,9 +92,11 @@ class BreakableIncrementedSleep: ...@@ -92,9 +92,11 @@ class BreakableIncrementedSleep:
self.increase() self.increase()
def responseStatus(conn): def responseStatus(conn):
response = conn.getresponse() with conn.getresponse() as response:
response.read() length = int(response.getheader("content-length", "0"))
return response.status if length > 0:
response.read()
return response.status
def reporting(report, message, errorType = None): def reporting(report, message, errorType = None):
if errorType: if errorType:
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment