Event Processing

Filter, group, and aggregate streams of host data.

← Back to use cases

Full source

This is a self-contained C++ program. Copy it, compile against the mino library, and run it.

/*
 * event_processing.cpp - filter, group, and aggregate event streams.
 *
 * C++ owns the event source (parsing, I/O, buffering). The mino
 * script defines processing rules using persistent data structures.
 * Self-recursion with automatic tail-call optimization drains the
 * stream in constant stack space. Sets act as membership predicates.
 * Keywords act as data accessors. The threading macro keeps the
 * pipeline flat and readable.
 *
 * Build:
 *   make
 *   c++ -std=c++17 -Isrc -o examples/use-cases/event_processing \
 *       examples/use-cases/event_processing.cpp src/[a-z]*.o -lm
 */

#include "mino.h"
#include <cstdio>
#include <vector>

/* ── Expose ────────────────────────────────────────────────────────── */

/* The `EventSource` wraps a C++ vector of sensor readings. Each reading
 * is a mino map built at construction time. The script consumes events
 * one at a time through the `.next` method. The handle's finalizer
 * cleans up the C++ object when the GC collects it. */

struct EventSource {
    std::vector<mino_val_t *> events;
    size_t cursor = 0;
};

static mino_val_t *make_event(mino_state_t *S, const char *type,
                              const char *device, double value, int ts)
{
    mino_val_t *ks[4], *vs[4];
    ks[0] = mino_keyword(S, "type");    vs[0] = mino_keyword(S, type);
    ks[1] = mino_keyword(S, "device");  vs[1] = mino_string(S, device);
    ks[2] = mino_keyword(S, "value");   vs[2] = mino_float(S, value);
    ks[3] = mino_keyword(S, "ts");      vs[3] = mino_int(S, ts);
    return mino_map(S, ks, vs, 4);
}

static mino_val_t *source_new(mino_state_t *S, mino_val_t *,
                              mino_val_t *, void *)
{
    auto *src = new EventSource;

    src->events.push_back(make_event(S, "temp",     "sensor-01", 21.3, 1000));
    src->events.push_back(make_event(S, "humidity", "sensor-01", 45.0, 1001));
    src->events.push_back(make_event(S, "temp",     "sensor-02", 19.8, 1002));
    src->events.push_back(make_event(S, "pressure", "sensor-01", 1013.2, 1003));
    src->events.push_back(make_event(S, "temp",     "sensor-01", 22.1, 1004));
    src->events.push_back(make_event(S, "temp",     "sensor-02", 20.4, 1005));
    src->events.push_back(make_event(S, "humidity", "sensor-02", 52.0, 1006));
    src->events.push_back(make_event(S, "temp",     "sensor-01", 21.7, 1007));
    src->events.push_back(make_event(S, "temp",     "sensor-02", 19.5, 1008));
    src->events.push_back(make_event(S, "pressure", "sensor-02", 1012.8, 1009));
    src->events.push_back(make_event(S, "temp",     "sensor-01", 22.9, 1010));
    src->events.push_back(make_event(S, "humidity", "sensor-01", 43.0, 1011));

    return mino_handle_ex(S, src, "EventSource",
        [](void *p, const char *) { delete static_cast<EventSource *>(p); });
}

static mino_val_t *source_next(mino_state_t *S, mino_val_t *target,
                               mino_val_t *, void *)
{
    auto *src = static_cast<EventSource *>(mino_handle_ptr(target));
    if (src->cursor >= src->events.size())
        return mino_nil(S);
    return src->events[src->cursor++];
}

static mino_val_t *source_count(mino_state_t *S, mino_val_t *target,
                                mino_val_t *, void *)
{
    auto *src = static_cast<EventSource *>(mino_handle_ptr(target));
    return mino_int(S, (long long)src->cursor);
}

/* ── Script ────────────────────────────────────────────────────────── */

/* `drain` uses self-recursion with an accumulator. Automatic tail-call
 * optimization means this runs in constant stack space regardless
 * of stream length. The set `#{:temp}` is used directly as a filter
 * predicate because collections are callable. */

static const char *script =
    ";; Consume all events from a source into a vector.\n"
    "(defn drain [source acc]\n"
    "  (let [evt (.next source)]\n"
    "    (if (nil? evt)\n"
    "      acc\n"
    "      (drain source (conj acc evt)))))\n"
    "\n"
    ";; Summarize a [device readings] group.\n"
    "(defn summarize [[device readings]]\n"
    "  [device {:count (count readings)\n"
    "           :avg   (/ (reduce + (map :value readings))\n"
    "                     (count readings))\n"
    "           :min   (apply min (map :value readings))\n"
    "           :max   (apply max (map :value readings))}])\n"
    "\n"
    ";; Filter, group, summarize.\n"
    "(defn analyze [events type-filter]\n"
    "  (->> events\n"
    "       (filter #(type-filter (:type %)))\n"
    "       (group-by :device)\n"
    "       (map summarize)\n"
    "       (into (sorted-map))))\n"
    "\n"
    ";; Drain the source, analyze temperature readings.\n"
    "(let [events (drain (new EventSource) [])]\n"
    "  (println \"total events:\" (count events))\n"
    "  (println \"devices:\" (set (map :device events)))\n"
    "  (analyze events #{:temp}))\n";

/* ── Embed ─────────────────────────────────────────────────────────── */

int main()
{
    mino_state_t *S   = mino_state_new();
    mino_env_t   *env = mino_env_new(S);
    /* The sandbox preset excludes host interop; this demo relies on
     * host/new and friends, so opt in with the HOST cap on top. */
    mino_install(S, env, MINO_CAP_DEFAULT | MINO_CAP_HOST);

    /* Register the EventSource type with constructor, method, getter. */
    mino_host_enable(S);
    mino_host_register_ctor(S, "EventSource", 0, source_new, nullptr);
    mino_host_register_method(S, "EventSource", "next", 0, source_next, nullptr);
    mino_host_register_getter(S, "EventSource", "count", source_count, nullptr);

    mino_val_t *result = mino_eval_string(S, script, env);

    if (result) {
        printf("result: ");
        mino_println(S, result);
    } else {
        fprintf(stderr, "error: %s\n", mino_last_error(S));
    }

    mino_env_free(S, env);
    mino_state_free(S);
}
Build and run:
c++ -std=c++17 -O2 \
  -Imino/src -Imino/src/public -Imino/src/runtime -Imino/src/gc -Imino/src/eval \
  -Imino/src/collections -Imino/src/prim -Imino/src/async -Imino/src/interop \
  -Imino/src/diag -Imino/src/vendor/imath \
  -o use-cases/event_processing \
  use-cases/event_processing.cpp \
  mino/src/public/*.c mino/src/runtime/*.c mino/src/gc/*.c mino/src/eval/*.c \
  mino/src/collections/*.c mino/src/prim/*.c mino/src/async/*.c mino/src/interop/*.c \
  mino/src/regex/*.c mino/src/diag/*.c mino/src/vendor/imath/*.c \
  -lm
./use-cases/event_processing

The mino script

drain uses self-recursion with an accumulator. Automatic tail-call optimization means this runs in constant stack space regardless of stream length. The set #{:temp} is used directly as a filter predicate because collections are callable.

;; Consume all events from a source into a vector.
(defn drain [source acc]
  (let [evt (.next source)]
    (if (nil? evt)
      acc
      (drain source (conj acc evt)))))

;; Summarize a [device readings] group.
(defn summarize [[device readings]]
  [device {:count (count readings)
           :avg   (/ (reduce + (map :value readings))
                     (count readings))
           :min   (apply min (map :value readings))
           :max   (apply max (map :value readings))}])

;; Filter, group, summarize.
(defn analyze [events type-filter]
  (->> events
       (filter #(type-filter (:type %)))
       (group-by :device)
       (map summarize)
       (into (sorted-map))))

;; Drain the source, analyze temperature readings.
(let [events (drain (new EventSource) [])]
  (println \"total events:\" (count events))
  (println \"devices:\" (set (map :device events)))
  (analyze events #{:temp}))

← All use cases