This repository contains two programs for working with the M100 dataset:
- Signal Processing — Reads raw sensor data, detects significant state changes, and outputs new parquet files.
- Graph Processing — Reads processed state files and prepares them for graph-based analysis.
- Navigate to the GraphProcessing directory.
- Place your
.parquetfile into theStateFilesdirectory.- If the directory does not exist, create it.
- You can use either:
- Parquet files modified by the Signal Processing step (described below), or
- The original dataset files.
- By default, the program reads the file named
state.parquet.- To change this, modify the
state_filevariable at the top of themainfunction in run_pipeline.py.
- To change this, modify the
# (Optional) Create a virtual environment
pip install -r requirements.txt
python -m run_pipelinePushes rows from the specified file, to the buffer in the format:
{node: {node, timestamp, rack_id, metric1, metric2,...},... }
- Navigate to the SignalProcessing directory.
- Create a directory named
TarFiles.- Place your
.tarfiles inside. - Extract them so they become regular folders containing the
.parquetfiles. - Dataset link: Zenodo Record
- Place your
- Create two more directories:
outputs— will store processed parquet files.logs— will store logs.
At the top of the run function in run_pipeline.py, you can adjust:
limit_racks(int | None) — Process only a specified rack by ID.Nonemeans process all racks as one.limit_nodes(int | None) — Limit the number of nodes for faster testing.delta(float, 0–1) — Sensitivity of change detection (ADWIN parameter).clock(int) — Frequency of checks (ADWIN parameter).rows_in_mem(int) — Number of rows loaded into memory at once.bq_max_size— Default is2 * rows_in_mem, number of items, each queue can hold
At the bottom of the file, there is also an option to change which rack you process, and an option to process all of them in parallel. See limit_racks, at the bottom of the file.
# (Optional) Create a virtual environment
pip install -r requirements.txt
python -m run_pipelinenode_managerreads parquet files from theTarFilesdirectory for the specified rack.- It processes data in batches and pushes each batch into a queue.
- Each queue element contains one row of synchronized sensor readings for all nodes at a given timestamp.
- If a node has no data for a timestamp:
- Readings are set to
None, or - If available, replaced with the last known reading.
- Readings are set to
Format of a queue element:
{
node_id:{
'timestamp': to_json_serializable_timestamp(self.current_time),
'rack_id': str,
'sensor_data': {'<metric name>': float}
}
}- The program outputs one parquet file for the rack, into the outputs directory.
- row format:
node, timestamp, rack_id, metric1, metric2, ..., metric_k - Each row contains a significant state for one of the nodes. The rows should be sorted by timestamp.