-
Jakub Beránek authoredJakub Beránek authored
io.py 1.70 KiB
import json
import logging
import signal
import time
from typing import List
def dump_records(path: str, records: List):
if records:
with open(path, "a") as f:
for record in records:
json.dump(record, f)
f.write("\n")
records.clear()
def measure_and_store(capture_interval: int, dump_interval: int, file: str, capture_fn, finish_fn):
"""
Runs an endless loop that will repeatedly call `capture_fn` and store its outputs into the
given `file`.
The process will end upon receiving SIGINT.
:param capture_interval: How often to call `capture_fn`. [seconds]
:param dump_interval: How often to dump data to disk. [seconds]
:param file: Path where data will be collected.
:param capture_fn: Function that will be called to measure some data. It should return any
JSON-serializable item.
:param finish_fn: Function that will be called when the measurement process ends.
"""
record_count = 0
records = []
last_update = time.time()
def capture():
now = time.time()
record = capture_fn(now)
if record is not None:
records.append(record)
return now
def finish(sig, frame):
nonlocal record_count
capture()
record_count += len(records)
dump_records(file, records)
logging.info(f"Interrupting monitoring, wrote {record_count} records")
finish_fn()
signal.signal(signal.SIGINT, finish)
while True:
now = capture()
if now - last_update > dump_interval:
last_update = now
record_count += len(records)
dump_records(file, records)
time.sleep(capture_interval)