<index> / <wazabiedr> / server
[ en | fr ]
┌───────────────────────┐
│                       │
│                       │
│                       │
│                       │
│                       │
│                       │
│                       │
│                       │
│                       │
│                       │
│                       │
│                       │
│                       │
│                       │
│                       │
└───────────────────────┘
Part 6 — The Server
~ lululufr
CONTENTS
  0  why python on the server
  1  three stores, one job each
  2  lifespan — startup and shutdown order
  3  two audiences, two authentications
  4  the ingestion hot path
  5  why not kafka in front
  6  implemented vs. stub

──[ 0. Why Python on the Server ]──

The driver and the agent are in Rust because they run on potentially thousands 
of endpoints, in kernel mode and in pinky-promised-no-paging user mode, where
panic-equals-bug-check and a missed event is a missed event. The server runs in
a Linux container that the operator scales, restarts, and observes. Its
bottleneck is network I/O and database round-trips, not memory layout or panic
safety. The chosen stack is Python 3.12 + FastAPI (an async web framework built on
Starlette and Pydantic) + SQLAlchemy 2 async (the standard Python ORM in its
async-native incarnation) + Alembic (the SQLAlchemy-companion migrations tool) +
the official async OpenSearch client + the async `redis` client. Docker Compose
ties the runtime together for local development; production deployments would
substitute managed equivalents. This post explains the choices and the leaks. The per-route detail lives in the
server repository's `ARCHITECTURE.md`.
──[ 1. Three Stores, One Job Each ]──
   ┌────────────────────────── Wazabi Server (FastAPI) ──────────────────────────┐
   │                                                                              │
   │   agents  ───►  /api/v1/agents/*   ──┐                                        │
   │   console ───►  /api/v1/{rules…}   ──┤                                        │
   │                                      │                                        │
   │                                      ├──► services/ingest.py ──bulk──►  OpenSearch
   │                                      │                                  (events,
   │                                      │                                   alerts)
   │                                      │                                        │
   │                                      └──► SQLAlchemy 2 async ───►  PostgreSQL │
   │                                                                    (endpoints,│
   │                                                                     profiles, │
   │                                                                     rules,    │
   │                                                                     commands) │
   │                                                                                │
   │                                       Redis (cache + command queue)            │
   └──────────────────────────────────────────────────────────────────────────────┘
**PostgreSQL** holds the relational, transactional state — the endpoint 
inventory, the profiles assigned to them, the rules composing a profile, the
console users, the per-agent command queue, the licence accounting. Anything
that needs `JOIN`s or strict consistency lives here. **OpenSearch** (a fork of Elasticsearch 7 maintained after the licensing split;
functionally equivalent for ingest, search, and aggregations) holds the
high-volume firehose: events from the kernel, alerts from rule engines, plugin
telemetry. Multi-terabyte indexes that an analyst searches by free text, by
source IP, by PID, by time window. The relational store would not survive being
asked to do this. **Redis** holds ephemeral state: session caches, the per-agent command queue (an
agent polls the queue every 30 seconds; whatever it finds is the next set of
commands to execute), rate-limit counters. Nothing in Redis is load-bearing — if
the instance disappears the system degrades but does not lose persistent state. Three stores cost more operationally than one. The justification is shape: each
one is the right fit for its workload, and forcing any of the three workloads
into the wrong store produces immediate pain (OLTP on OpenSearch, full-text
search on PostgreSQL, persistent state in Redis).
──[ 2. Lifespan — Startup and Shutdown Order ]──

FastAPI exposes a `lifespan` context manager that runs once at process start and 
once at process stop:
@asynccontextmanager
async def lifespan(_app: FastAPI):
    await init_postgres()
    await init_redis()
    await init_opensearch()
    await ensure_index_templates()
    try:
        yield
    finally:
        await close_opensearch()
        await close_redis()
        await close_postgres()
The order is deliberate. Postgres comes up first because every other subsystem 
eventually reads endpoint / user state from it. Redis comes up second because
the auth middleware needs it. OpenSearch comes up last because index-template
creation on a cold cluster can be slow and we don't want it to block the other
two. `ensure_index_templates` is idempotent — it issues `PUT _index_template/<name>`
calls that the cluster accepts as no-ops if the template already exists at the
same version. Redeploying the image against an existing cluster therefore costs
only the round-trip latency. Shutdown unwinds the order. `try/finally` is mandatory because a `docker stop`
can interrupt at any point during the `yield`, and a partially-closed connection
pool surviving the process can wedge the next start.
──[ 3. Two Audiences, Two Authentications ]──

Agents and humans both talk to the same FastAPI process. They are not the same 
kind of client and they do not share an auth path:
    Console (humans)  ── JWT Bearer + refresh tokens
    Agents (machines) ── mTLS + per-agent enrolment token

The console flow is conventional. `POST /auth/login` exchanges username and 
password (or SSO assertion) for a short-lived JWT (*JWT, JSON Web Token*: a
base64-encoded, signed token carrying claims; the signature lets the server
verify the token without per-request database lookups) and a longer-lived
refresh token. Each call to `/api/v1/{endpoints,rules,alerts,…}` carries the JWT
in `Authorization: Bearer …`, and RBAC is enforced at the route level. The agent flow combines two layers. Connection-level: mTLS (*mTLS, mutual TLS*:
standard TLS extended so the server requires the client to present a
certificate; both endpoints authenticate the other before any application
traffic flows), with the agent's enrolment certificate as the client cert and
the server's CA as the trust anchor. Application-level: a per-agent bearer token
issued at enrolment time, used to disambiguate which agent the certificate
belongs to within a fleet that may share a CA. The agent endpoint is `POST /api/v1/agents/checkin`, called every ~30 seconds.
Each call carries:
    - heartbeat (last_seen, agent version, OS info)
    - any batched alerts the agent has accumulated
    - acks of previously-received commands

The response contains the next batch of commands for the agent (isolate, run 
scan, upload artefact, restart). Short polling at 30-second cadence rather than
long-polling or WebSocket means every transport is a routine HTTPS request that
load balancers, WAFs, and intermediate proxies handle without special
configuration. The cost is one minute of worst-case command latency, which is
acceptable for the commands an agent receives.
──[ 4. The Ingestion Hot Path ]──

`services/ingest.py` runs every time an agent's checkin carries events or 
alerts:
async def bulk_index(docs: list[dict[str, Any]], index_name: str) -> int:
    if not docs:
        return 0

    actions: list[dict[str, Any]] = []
    for doc in docs:
        actions.append({"index": {"_index": index_name}})
        actions.append(doc)

    client = get_opensearch()
    response = await client.bulk(body=actions, refresh=False)

    if response.get("errors"):
        failed = sum(
            1
            for item in response["items"]
            if item.get("index", {}).get("status", 200) >= 400
        )
        logger.warning("OpenSearch bulk had %d failures on %d docs.", failed, len(docs))
        return len(docs) - failed
    return len(docs)
The `bulk` API of OpenSearch takes an array of action/document pairs and indexes 
them in a single round-trip. The throughput multiplier over per-document `index`
calls is roughly 10×–20× depending on document size — well worth the extra
serialisation step on our side. `refresh=False` is the key parameter. OpenSearch's index refresh is what makes
newly-indexed documents searchable; with `refresh=True` every bulk forces a
refresh, which is correct but expensive. With `refresh=False`, refreshes happen
on the cluster's own schedule (1 second by default), trading bounded eventual
visibility for an order-of-magnitude throughput improvement. A `bulk` response can contain per-item statuses, including partial failures:
some documents indexed, others rejected (mapping conflict, malformed value). The
handler logs the first failure, returns the success count, and lets the caller
decide whether to retry the failed subset. Today the caller doesn't — the
agent's spool holds the originals and a retry would risk duplicates. Server-side enrichment runs before the bulk:
def enrich_doc(doc, endpoint, received_at):
    return {
        **doc,
        "agent_id":    str(endpoint.id),
        "host":        host_dict_from_endpoint(endpoint),
        "received_at": received_at.isoformat(),
    }
The agent submits the kernel-produced event plus its own annotations. The server 
adds `agent_id`, `host` (hostname + IP + OS), and `received_at`. Without these
three fields, a search in a multi-agent index would return "a process started
on… one of your machines, at… some time".
──[ 5. Why Not Kafka in Front ]──

The reflex for an event firehose at any scale is to put Kafka (or another 
distributed log) between producers and the storage layer. We do not have one.
Two reasons. The agent already plays that role. Its on-disk spool absorbs server downtime,
batches efficiently, and retries automatically (Part 5). A second buffer in
front of the server would buffer events that are already buffered, accelerating
no bottleneck — the actual constraint is OpenSearch's ingest pipeline, which
Kafka in front does not relax. Operationally, Kafka adds a third datastore class (after PostgreSQL and
OpenSearch) with its own cluster topology, broker management, consumer-group
coordination, and replication concerns. The value-to-cost ratio is unfavourable
until traffic crosses a scale we are nowhere near. If we ever do approach that
scale, the agent-to-server contract does not change — the shipper still POSTs
`.zst` batches. An ingestion worker between FastAPI and OpenSearch would be
slotted in then, behind the existing API.
──[ 6. Implemented vs. Stub ]──

The server repository's `ARCHITECTURE.md` is explicit about what is done and 
what is a placeholder. At the time of writing:
    Implemented end-to-end:
      - lifespan + three-store wiring
      - agent enrolment + checkin + heartbeat
      - ingest path (events + alerts) into OpenSearch
      - JWT auth + refresh for the console

    Stub (route + response shape, no real behaviour):
      - endpoints isolation
      - rules CRUD + evaluation
      - deployment one-liners
      - licence accounting

This series covers the parts that work — the infrastructure that receives the 
firehose. The detection language and the console UI that consumes the API are
separate projects on different timelines. Next post: the plugin world. The protocol an external process speaks to push
telemetry into the agent's pipeline, and the named pipe it speaks it over.