Skip to content
Snippets Groups Projects
io.py 1.70 KiB
import json
import logging
import signal
import time
from typing import List


def dump_records(path: str, records: List):
    if records:
        with open(path, "a") as f:
            for record in records:
                json.dump(record, f)
                f.write("\n")
        records.clear()


def measure_and_store(capture_interval: int, dump_interval: int, file: str, capture_fn, finish_fn):
    """
    Runs an endless loop that will repeatedly call `capture_fn` and store its outputs into the
    given `file`.

    The process will end upon receiving SIGINT.

    :param capture_interval: How often to call `capture_fn`. [seconds]
    :param dump_interval: How often to dump data to disk. [seconds]
    :param file: Path where data will be collected.
    :param capture_fn: Function that will be called to measure some data. It should return any
    JSON-serializable item.
    :param finish_fn: Function that will be called when the measurement process ends.
    """
    record_count = 0

    records = []
    last_update = time.time()

    def capture():
        now = time.time()
        record = capture_fn(now)
        if record is not None:
            records.append(record)
        return now

    def finish(sig, frame):
        nonlocal record_count

        capture()
        record_count += len(records)
        dump_records(file, records)
        logging.info(f"Interrupting monitoring, wrote {record_count} records")
        finish_fn()

    signal.signal(signal.SIGINT, finish)

    while True:
        now = capture()

        if now - last_update > dump_interval:
            last_update = now
            record_count += len(records)
            dump_records(file, records)
        time.sleep(capture_interval)