Skip to content
Snippets Groups Projects
io.py 1.7 KiB
Newer Older
  • Learn to ignore specific revisions
  • Jakub Beránek's avatar
    Jakub Beránek committed
    import json
    import logging
    import signal
    import time
    from typing import List
    
    
    def dump_records(path: str, records: List):
        if records:
            with open(path, "a") as f:
                for record in records:
                    json.dump(record, f)
                    f.write("\n")
            records.clear()
    
    
    def measure_and_store(capture_interval: int, dump_interval: int, file: str, capture_fn, finish_fn):
        """
        Runs an endless loop that will repeatedly call `capture_fn` and store its outputs into the
        given `file`.
    
        The process will end upon receiving SIGINT.
    
        :param capture_interval: How often to call `capture_fn`. [seconds]
        :param dump_interval: How often to dump data to disk. [seconds]
        :param file: Path where data will be collected.
        :param capture_fn: Function that will be called to measure some data. It should return any
        JSON-serializable item.
        :param finish_fn: Function that will be called when the measurement process ends.
        """
        record_count = 0
    
        records = []
        last_update = time.time()
    
        def capture():
            now = time.time()
            record = capture_fn(now)
            if record is not None:
                records.append(record)
            return now
    
        def finish(sig, frame):
            nonlocal record_count
    
            capture()
            record_count += len(records)
            dump_records(file, records)
            logging.info(f"Interrupting monitoring, wrote {record_count} records")
            finish_fn()
    
        signal.signal(signal.SIGINT, finish)
    
        while True:
            now = capture()
    
            if now - last_update > dump_interval:
                last_update = now
                record_count += len(records)
                dump_records(file, records)
            time.sleep(capture_interval)