meathook-rs

Capture ephemeral data.
Compose the sink.
Lose nothing.

A library-first base for long-running data collection jobs. Sinks compose like tower layers. There is no YAML plugin system.

Read the docs

≤ 1 torn record on SIGKILL · none on panic · none on SIGTERM

built on the satay-rs sans-IO action model

ephemeral → durable

APIs forget. meathook doesn't.

A weather reading at 14:02 is gone from the source by 14:03. The API only ever serves latest. If you don't poll and store it continuously, that reading is lost forever.

now

{ "k":"air_temperature", "v":29.4, "st":"S24", "ts":"14:02:00Z" }

gone in 60s

meathook
data/air_temperature/ 6 files
  • 2024-06-18/13.parquet
  • 2024-06-18/14.parquet
  • 2024-06-19/14.parquet
  • 2024-06-20/14.parquet
  • 2024-07-18/14.parquet
  • 2024-08-18/14.parquet

A single reading is worthless.
A year of hourly readings is a dataset.

flow · tick cadence

Each pipeline keeps its own rhythm.

Collectors tick at their own intervals — every minute, every five, every hour. The sink stack buffers them all and ships a parquet file when the flush window closes.

air_temperature 1m
rainfall 5m
pm25 1h
0m 15m 30m 45m 60m
flush
window · 1h
data/{pipeline}/{YYYY-MM-DD}/{HH}.parquet

durability

The disk is the buffer.

— disk.rs

DiskSpool is a write-ahead spool: ingest appends records as JSON lines to an fsynced segment file before returning, and segments are deleted only after the downstream sink accepted them. Replays are idempotent.

failure what happens data lost
SIGKILL / OOM-kill leftover segments replayed on next start ≤ 1 torn record
Task panic supervisor respawns the pipeline from its factory none
Sink outage (HF 5xx) segments accumulate on disk, retried each firing none
Graceful SIGTERM runtime drains every sink stack before exit none
Disk lost spool gone everything unflushed — use a PVC on k8s

footprint

A year-long collector that fits in 14.8 megabytes.

A release binary polling three NEA weather endpoints on a single tokio runtime. I/O-bound between ticks; memory stays flat.

50% 0%
50mb 0mb
cpu %
peak 0.3% · ~300/core
rss mb
0s5m10m
13.6mb rss avg
0% cpu avg
14.8mb rss peak
0.3% cpu peak
10 min sampled
303 samples

sampled live from a release binary running the nea example

example · singapore weather → huggingface

A real collector, not a toy.

The reference consumer polls Singapore's NEA / data.gov.sg realtime weather via the satay-generated nea-rs client. Three pipelines, each its own tokio task, deduped by key.

air_temperature 1m

per-station readings

dedupe key: (station_id, timestamp)

rainfall 5m

per-station readings

dedupe key: (station_id, timestamp)

pm25 1h

regional readings

dedupe key: (region, timestamp)

Each flush ships one parquet file at the deterministic Hive-style path — so replays are safe and the HF dataset viewer stays happy.

repo: zeon256/nea-weather
path: data/{pipeline}/{YYYY-MM-DD}/{HH}.parquet

wire it in code

Two traits, composed like tower layers.

There is no YAML plugin system. You implement Collector and Sink, and sinks compose with the same .spooled() combinator you'd expect from tower.

1use std::{time::Duration, env};2 3use meathook::{FlushPolicy, HfSink, Meathook, Pipeline, SatayCollector, SinkExt as _};4use satay_reqwest::ReqwestActionExt as _;5use reqwest::Client;6 7#[tokio::main]8async fn main() -> Result<(), meathook::runtime::RuntimeError> {9    let client = Client::new();10    let token = env::var("HF_TOKEN").expect("HF_TOKEN must be set");11 12    Meathook::builder()13        .pipeline(move || {14            let api = nea_rs::Api::new();15            let collector = SatayCollector::new(16                "air_temperature",17                client.clone(),18                move |client| {19                    let api = api.clone();20                    async move { api.air_temperature().send_with(&client).await }21                },22                |response| flatten(response),23            );24 25            // Durable stack: every tick is fsynced to disk before ingest26            // returns; hourly windows land on HF as parquet.27            let sink = HfSink::new(client.clone(), "you/your-dataset", token.clone())28                .spooled("/var/lib/meathook/spool/air_temperature", FlushPolicy::hourly());29 30            Pipeline::new(collector, sink, Duration::from_secs(60))31                .with_key_fn(|r: &MyRecord| (r.station_id.clone(), r.timestamp.clone()))32        })33        .run()34        .await35}

records are plain structs — #[derive(Serialize, Deserialize)] is all the parquet encoder needs

compose

Swap any layer. Write your own terminal.

Fan-out is just another combinator (Tee). With --no-default-features you get the core traits, layers, spool, and runtime — bring your own terminal sink.

Buffered tier 1

in-memory accumulation, flush by interval or record count

↓ swappable

DiskSpool tier 2

write-ahead JSONL spool, fsynced before ingest returns

↓ swappable

Terminal terminal

HfSink ships parquet to HuggingFace — or write your own

↓ swappable

feature flags

feature default what it enables
parquet encode::to_parquet (arrow + parquet + serde_arrow)
huggingface HfSink + CommitAction (implies parquet)

Want a SQLite sink? Request one. Built one? Open a PR.

Start capturing before the API forgets.

docs.rs