Data Pipeline
Compose map, filter, reduce over persistent collections.
Full source
This is a self-contained C++ program. Copy it, compile against the mino library, and run it.
/*
* data_pipeline.cpp - transform host data through mino pipelines.
*
* A metrics collector pushes timestamped measurements into mino.
* The script defines transformation rules using threading macros
* and persistent data structures. Data flows through the pipeline
* without copying: structural sharing means intermediate results
* reuse memory from earlier stages.
*
* Build:
* make
* c++ -std=c++17 -Isrc -o examples/use-cases/data_pipeline \
* examples/use-cases/data_pipeline.cpp src/[a-z]*.o -lm
*/
#include "mino.h"
#include <cstdio>
#include <vector>
/* ── Expose ────────────────────────────────────────────────────────── */
/* Each measurement is a map with `:metric`, `:host`, `:value`, `:ts` keys.
* Maps are immutable once created. The script receives read-only
* snapshots of the C++ data. */
struct Measurement {
const char *metric;
const char *host;
double value;
int ts;
};
static mino_val_t *make_measurement(mino_state_t *S, const Measurement &m)
{
mino_val_t *ks[4], *vs[4];
ks[0] = mino_keyword(S, "metric"); vs[0] = mino_keyword(S, m.metric);
ks[1] = mino_keyword(S, "host"); vs[1] = mino_string(S, m.host);
ks[2] = mino_keyword(S, "value"); vs[2] = mino_float(S, m.value);
ks[3] = mino_keyword(S, "ts"); vs[3] = mino_int(S, m.ts);
return mino_map(S, ks, vs, 4);
}
/* ── Script ────────────────────────────────────────────────────────── */
/* The pipeline uses `->>` to thread data through each stage.
* Keywords like `:metric` and `:value` act directly as accessor
* functions. Sets like `#{:cpu :mem}` act as membership predicates
* in `filter`. Named helpers keep the top-level pipeline flat. */
static const char *script =
";; Average of a numeric sequence.\n"
"(defn avg [xs]\n"
" (/ (reduce + xs) (count xs)))\n"
"\n"
";; Summarize a [host measurements] group.\n"
"(defn summarize [[host readings]]\n"
" (let [values (map :value readings)]\n"
" [host {:count (count readings)\n"
" :avg (avg values)\n"
" :min (apply min values)\n"
" :max (apply max values)}]))\n"
"\n"
";; Build a summary table for one metric.\n"
"(defn metric-summary [data metric-key]\n"
" (->> data\n"
" (filter #(= (:metric %) metric-key))\n"
" (group-by :host)\n"
" (map summarize)\n"
" (into (sorted-map))))\n"
"\n"
";; Top-level: summarize CPU and memory metrics.\n"
"(let [summaries (->> [:cpu :mem]\n"
" (map (fn [m] [m (metric-summary data m)]))\n"
" (into (sorted-map)))]\n"
" summaries)\n";
/* ── Embed ─────────────────────────────────────────────────────────── */
int main()
{
mino_state_t *S = mino_state_new();
mino_env_t *env = mino_env_new_default(S);
/* Simulated metrics batch from a monitoring agent. */
std::vector<Measurement> batch = {
{"cpu", "web-01", 72.5, 1000},
{"mem", "web-01", 61.2, 1000},
{"cpu", "web-02", 45.3, 1000},
{"mem", "web-02", 78.9, 1000},
{"cpu", "web-01", 68.1, 1001},
{"mem", "web-01", 62.0, 1001},
{"cpu", "web-02", 51.7, 1001},
{"mem", "web-02", 77.4, 1001},
{"cpu", "web-01", 75.0, 1002},
{"cpu", "web-02", 48.2, 1002},
};
/* Push data into mino as a vector of maps.
* Each record is rooted via mino_ref so the GC cannot collect
* earlier records while later ones are still being allocated. */
std::vector<mino_ref_t *> refs;
for (auto &m : batch)
refs.push_back(mino_ref(S, make_measurement(S, m)));
std::vector<mino_val_t *> records;
for (auto *r : refs)
records.push_back(mino_deref(r));
mino_env_set(S, env, "data",
mino_vector(S, records.data(), records.size()));
for (auto *r : refs)
mino_unref(S, r);
/* Run the pipeline. */
mino_val_t *result = mino_eval_string(S, script, env);
if (result) {
printf("summaries:\n");
mino_println(S, result);
} else {
fprintf(stderr, "error: %s\n", mino_last_error(S));
}
mino_env_free(S, env);
mino_state_free(S);
}
Build and run:
c++ -std=c++17 -O2 \
-Imino/src -Imino/src/public -Imino/src/runtime -Imino/src/gc -Imino/src/eval \
-Imino/src/collections -Imino/src/prim -Imino/src/async -Imino/src/interop \
-Imino/src/diag -Imino/src/vendor/imath \
-o use-cases/data_pipeline \
use-cases/data_pipeline.cpp \
mino/src/public/*.c mino/src/runtime/*.c mino/src/gc/*.c mino/src/eval/*.c \
mino/src/collections/*.c mino/src/prim/*.c mino/src/async/*.c mino/src/interop/*.c \
mino/src/regex/*.c mino/src/diag/*.c mino/src/vendor/imath/*.c \
-lm
./use-cases/data_pipelineThe mino script
The pipeline uses ->> to thread data through each stage. Keywords like :metric and :value act directly as accessor functions. Sets like #{:cpu :mem} act as membership predicates in filter. Named helpers keep the top-level pipeline flat.
;; Average of a numeric sequence.
(defn avg [xs]
(/ (reduce + xs) (count xs)))
;; Summarize a [host measurements] group.
(defn summarize [[host readings]]
(let [values (map :value readings)]
[host {:count (count readings)
:avg (avg values)
:min (apply min values)
:max (apply max values)}]))
;; Build a summary table for one metric.
(defn metric-summary [data metric-key]
(->> data
(filter #(= (:metric %) metric-key))
(group-by :host)
(map summarize)
(into (sorted-map))))
;; Top-level: summarize CPU and memory metrics.
(let [summaries (->> [:cpu :mem]
(map (fn [m] [m (metric-summary data m)]))
(into (sorted-map)))]
summaries)