Event Processing
Filter, group, and aggregate streams of host data.
Full source
This is a self-contained C++ program. Copy it, compile against the mino library, and run it.
/*
* event_processing.cpp - filter, group, and aggregate event streams.
*
* C++ owns the event source (parsing, I/O, buffering). The mino
* script defines processing rules using persistent data structures.
* Self-recursion with automatic tail-call optimization drains the
* stream in constant stack space. Sets act as membership predicates.
* Keywords act as data accessors. The threading macro keeps the
* pipeline flat and readable.
*
* Build:
* make
* c++ -std=c++17 -Isrc -o examples/use-cases/event_processing \
* examples/use-cases/event_processing.cpp src/[a-z]*.o -lm
*/
#include "mino.h"
#include <cstdio>
#include <vector>
/* ── Expose ────────────────────────────────────────────────────────── */
/* The `EventSource` wraps a C++ vector of sensor readings. Each reading
* is a mino map built at construction time. The script consumes events
* one at a time through the `.next` method. The handle's finalizer
* cleans up the C++ object when the GC collects it. */
struct EventSource {
std::vector<mino_val_t *> events;
size_t cursor = 0;
};
static mino_val_t *make_event(mino_state_t *S, const char *type,
const char *device, double value, int ts)
{
mino_val_t *ks[4], *vs[4];
ks[0] = mino_keyword(S, "type"); vs[0] = mino_keyword(S, type);
ks[1] = mino_keyword(S, "device"); vs[1] = mino_string(S, device);
ks[2] = mino_keyword(S, "value"); vs[2] = mino_float(S, value);
ks[3] = mino_keyword(S, "ts"); vs[3] = mino_int(S, ts);
return mino_map(S, ks, vs, 4);
}
static mino_val_t *source_new(mino_state_t *S, mino_val_t *,
mino_val_t *, void *)
{
auto *src = new EventSource;
src->events.push_back(make_event(S, "temp", "sensor-01", 21.3, 1000));
src->events.push_back(make_event(S, "humidity", "sensor-01", 45.0, 1001));
src->events.push_back(make_event(S, "temp", "sensor-02", 19.8, 1002));
src->events.push_back(make_event(S, "pressure", "sensor-01", 1013.2, 1003));
src->events.push_back(make_event(S, "temp", "sensor-01", 22.1, 1004));
src->events.push_back(make_event(S, "temp", "sensor-02", 20.4, 1005));
src->events.push_back(make_event(S, "humidity", "sensor-02", 52.0, 1006));
src->events.push_back(make_event(S, "temp", "sensor-01", 21.7, 1007));
src->events.push_back(make_event(S, "temp", "sensor-02", 19.5, 1008));
src->events.push_back(make_event(S, "pressure", "sensor-02", 1012.8, 1009));
src->events.push_back(make_event(S, "temp", "sensor-01", 22.9, 1010));
src->events.push_back(make_event(S, "humidity", "sensor-01", 43.0, 1011));
return mino_handle_ex(S, src, "EventSource",
[](void *p, const char *) { delete static_cast<EventSource *>(p); });
}
static mino_val_t *source_next(mino_state_t *S, mino_val_t *target,
mino_val_t *, void *)
{
auto *src = static_cast<EventSource *>(mino_handle_ptr(target));
if (src->cursor >= src->events.size())
return mino_nil(S);
return src->events[src->cursor++];
}
static mino_val_t *source_count(mino_state_t *S, mino_val_t *target,
mino_val_t *, void *)
{
auto *src = static_cast<EventSource *>(mino_handle_ptr(target));
return mino_int(S, (long long)src->cursor);
}
/* ── Script ────────────────────────────────────────────────────────── */
/* `drain` uses self-recursion with an accumulator. Automatic tail-call
* optimization means this runs in constant stack space regardless
* of stream length. The set `#{:temp}` is used directly as a filter
* predicate because collections are callable. */
static const char *script =
";; Consume all events from a source into a vector.\n"
"(defn drain [source acc]\n"
" (let [evt (.next source)]\n"
" (if (nil? evt)\n"
" acc\n"
" (drain source (conj acc evt)))))\n"
"\n"
";; Summarize a [device readings] group.\n"
"(defn summarize [[device readings]]\n"
" [device {:count (count readings)\n"
" :avg (/ (reduce + (map :value readings))\n"
" (count readings))\n"
" :min (apply min (map :value readings))\n"
" :max (apply max (map :value readings))}])\n"
"\n"
";; Filter, group, summarize.\n"
"(defn analyze [events type-filter]\n"
" (->> events\n"
" (filter #(type-filter (:type %)))\n"
" (group-by :device)\n"
" (map summarize)\n"
" (into (sorted-map))))\n"
"\n"
";; Drain the source, analyze temperature readings.\n"
"(let [events (drain (new EventSource) [])]\n"
" (println \"total events:\" (count events))\n"
" (println \"devices:\" (set (map :device events)))\n"
" (analyze events #{:temp}))\n";
/* ── Embed ─────────────────────────────────────────────────────────── */
int main()
{
mino_state_t *S = mino_state_new();
mino_env_t *env = mino_env_new(S);
/* The sandbox preset excludes host interop; this demo relies on
* host/new and friends, so opt in with the HOST cap on top. */
mino_install(S, env, MINO_CAP_DEFAULT | MINO_CAP_HOST);
/* Register the EventSource type with constructor, method, getter. */
mino_host_enable(S);
mino_host_register_ctor(S, "EventSource", 0, source_new, nullptr);
mino_host_register_method(S, "EventSource", "next", 0, source_next, nullptr);
mino_host_register_getter(S, "EventSource", "count", source_count, nullptr);
mino_val_t *result = mino_eval_string(S, script, env);
if (result) {
printf("result: ");
mino_println(S, result);
} else {
fprintf(stderr, "error: %s\n", mino_last_error(S));
}
mino_env_free(S, env);
mino_state_free(S);
}
Build and run:
c++ -std=c++17 -O2 \
-Imino/src -Imino/src/public -Imino/src/runtime -Imino/src/gc -Imino/src/eval \
-Imino/src/collections -Imino/src/prim -Imino/src/async -Imino/src/interop \
-Imino/src/diag -Imino/src/vendor/imath \
-o use-cases/event_processing \
use-cases/event_processing.cpp \
mino/src/public/*.c mino/src/runtime/*.c mino/src/gc/*.c mino/src/eval/*.c \
mino/src/collections/*.c mino/src/prim/*.c mino/src/async/*.c mino/src/interop/*.c \
mino/src/regex/*.c mino/src/diag/*.c mino/src/vendor/imath/*.c \
-lm
./use-cases/event_processingThe mino script
drain uses self-recursion with an accumulator. Automatic tail-call optimization means this runs in constant stack space regardless of stream length. The set #{:temp} is used directly as a filter predicate because collections are callable.
;; Consume all events from a source into a vector.
(defn drain [source acc]
(let [evt (.next source)]
(if (nil? evt)
acc
(drain source (conj acc evt)))))
;; Summarize a [device readings] group.
(defn summarize [[device readings]]
[device {:count (count readings)
:avg (/ (reduce + (map :value readings))
(count readings))
:min (apply min (map :value readings))
:max (apply max (map :value readings))}])
;; Filter, group, summarize.
(defn analyze [events type-filter]
(->> events
(filter #(type-filter (:type %)))
(group-by :device)
(map summarize)
(into (sorted-map))))
;; Drain the source, analyze temperature readings.
(let [events (drain (new EventSource) [])]
(println \"total events:\" (count events))
(println \"devices:\" (set (map :device events)))
(analyze events #{:temp}))