<index> / <wazabiedr> / ring-and-ioctl
[ en | fr ]
┌───────────────────────┐
│                       │
│                       │
│                       │
│                       │
│                       │
│                       │
│                       │
│                       │
│                       │
│                       │
│                       │
│                       │
│                       │
│                       │
│                       │
└───────────────────────┘
Partie 4 — Le ring buffer et l'IOCTL en appel inversé
~ lululufr
SOMMAIRE
  0  le problème à cette frontière
  1  pourquoi pas un thread possédé par le kernel
  2  le ring buffer
  3  politique d'éviction et drop_count
  4  submit_event — le côté producteur
  5  le contrat ioctl
  6  dispatch_device_control — le côté consommateur
  7  status_pending et marquage d'irp
  8  cleanup — annuler l'irp parkée
  9  spinlocks et iofcompleterequest

──[ 0. Le Problème à Cette Frontière ]──

Cinq callbacks kernel (Partie 2) produisent des événements. L'agent user mode 
doit les recevoir. Les producteurs se déclenchent à n'importe quel IRQL jusqu'à
`DISPATCH_LEVEL` ; le consommateur tourne à `PASSIVE_LEVEL` (le plus bas IRQL,
où le blocage et le paging sont permis) en user mode. Entre les deux on a besoin
d'un buffer in-kernel, d'une primitive de synchronisation, et d'un transport. Ce post implémente les trois avec un ring buffer, un spinlock, un pointeur
atomique pour une IRP parkée, et un code IOCTL. Il n'y a pas de thread kernel,
pas de section partagée, pas d'objet event géré par le kernel.
──[ 1. Pourquoi Pas un Thread Possédé Par Le Kernel ]──

`PsCreateSystemThread` (la routine kernel qui crée un thread system-context dans 
le processus System) fonctionne. Il impose aussi des obligations :
    - Une primitive de signalisation producteur-consommateur (un
      KEVENT signalé au push de queue, attendu par le consommateur).
    - Un drapeau de shutdown que le thread consommateur polle, posé
      par DriverUnload.
    - Une manière de drainer les derniers événements au shutdown
      sans courser les callbacks producteurs (qui continuent à se
      déclencher jusqu'à ce que les callbacks soient déregistrés).
    - Un chemin de secours quand aucun agent n'est connecté — les
      événements n'ont nulle part où aller, donc le consommateur
      possédé par le kernel aurait quand même besoin d'une queue
      en dessous.

Le design en appel inversé replie ces obligations : le thread user-mode de 
l'agent est le consommateur, le mécanisme IRP-pending de l'I/O manager est la
primitive d'attente, et le même ring buffer qui satisfait le cas "pas d'agent"
est la queue. Chaque pièce a un seul rôle.
──[ 2. Le Ring Buffer ]──

Un slot fait deux pointeurs de large :
#[derive(Copy, Clone)]
pub struct Slot {
    pub data: *mut u8,
    pub size: u32,
}
`data` pointe sur un bloc alloué par `ExAllocatePool2` depuis le pool 
non-paginé. `size` est la longueur du bloc en octets. Répliquer la taille dans
le slot (elle est aussi présente dans `EventHeader::size`) évite de toucher le
buffer quand le ring a besoin de mesurer la pression ou de drop les plus
vieilles entrées. Le ring lui-même est un tableau à taille fixe avec trois indices :
pub const QUEUE_CAP: usize = 4096;

pub static QUEUE_BUF: SyncCell<MaybeUninit<[Slot; QUEUE_CAP]>>
    = SyncCell::new(MaybeUninit::uninit());

pub static QUEUE_HEAD: SyncCell<usize> = SyncCell::new(0);
pub static QUEUE_TAIL: SyncCell<usize> = SyncCell::new(0);
pub static QUEUE_LEN:  SyncCell<usize> = SyncCell::new(0);
Les entrées vivantes occupent `[HEAD .. HEAD + LEN] mod CAP`. Le `LEN` explicite 
(plutôt que reconstruit depuis `HEAD` et `TAIL` avec un slot réservé) est un
trade volontaire — un `usize` supplémentaire en stockage statique en échange
d'un cas en moins d'arithmétique modulaire et d'un check de plénitude plus clair
(`*len == QUEUE_CAP`). La capacité (4096 slots) est dimensionnée par rapport à la baseline mesurée du
projet : des dizaines d'événements process/thread/image par seconde au repos,
des pics de plusieurs centaines par seconde pendant une compilation ou le
démarrage d'un service. À 250 événements/s, 4096 slots font à peu près 16
secondes de buffering — assez pour absorber un redémarrage d'agent, pas assez
pour garder un backlog âgé quand un agent est offline depuis des heures. Push et pop attendent tous les deux que l'appelant détienne `QUEUE_LOCK` (un
`KSPIN_LOCK` — le spinlock de base du kernel, utilisable jusqu'à
`DISPATCH_LEVEL`) :
pub unsafe fn queue_push_locked(data: *mut u8, size: u32) -> bool {
    let buf  = QUEUE_BUF.as_mut_ptr() as *mut Slot;
    let head = &mut *QUEUE_HEAD.as_mut_ptr();
    let tail = &mut *QUEUE_TAIL.as_mut_ptr();
    let len  = &mut *QUEUE_LEN.as_mut_ptr();

    if *len == QUEUE_CAP {
        let old = *buf.add(*head);
        ExFreePool(old.data as PVOID);
        *head = (*head + 1) % QUEUE_CAP;
        *len -= 1;
        DROP_COUNT.fetch_add(1, Ordering::Relaxed);
    }

    *buf.add(*tail) = Slot { data, size };
    *tail = (*tail + 1) % QUEUE_CAP;
    *len += 1;
    true
}
L'`unsafe` est obligatoire parce que le borrow checker ne voit pas le contrat du 
spinlock. Le lock est acquis par l'appelant.
──[ 3. Politique D'Éviction et drop_count ]──

Quand le ring est plein et qu'un producteur push, la plus vieille entrée est 
évincée. Deux conséquences : Le buffer évincé est `ExFreePool`-é immédiatement. Sans ça le driver fuirait de
la mémoire pool non-paginée au taux d'éviction — observé en production par le
système qui tombe en panne de pool non-paginé plusieurs heures après le
démarrage. `DROP_COUNT` est incrémenté atomiquement. Ce compteur est lu et reset par
`make_header` (Partie 2) quand le prochain callback enqueue avec succès :
EventHeader {
    drop_count: DROP_COUNT.swap(0, Ordering::Relaxed),
    …
}
Le `swap(0, …)` capture le compte courant au moment où on stampe le header. Le 
prochain événement livré porte donc le nombre exact d'événements que le driver a
évincés entre l'événement précédemment livré et celui-ci. L'agent reconstruit le
signal de gap à partir de ce seul champ ; aucun type d'événement "perte" séparé
n'est requis. La politique elle-même — évincer le plus vieux, garder le plus récent — est
volontaire. Pour un feed EDR, l'activité la plus récente, c'est ce qui intéresse
un opérateur quand l'agent se reconnecte. Des événements vieillis de dizaines de
secondes sont de toute façon souvent trop tard pour informer une réponse.
──[ 4. submit_event — Le Côté Producteur ]──

`submit_event` est le seul point d'entrée utilisé par chaque callback kernel 
(Partie 2) :
pub unsafe fn submit_event(data: *mut u8, size: u32) {
    let guard = SpinLockGuard::acquire(QUEUE_LOCK.as_mut_ptr() as *mut KSPIN_LOCK);

    let pending = PENDING_IRP.swap(ptr::null_mut(), Ordering::AcqRel);

    if !pending.is_null() {
        let stack  = current_stack_location(pending);
        let outlen = (*stack).Parameters.DeviceIoControl.OutputBufferLength;

        if outlen >= size {
            let sysbuf = (*pending).AssociatedIrp.SystemBuffer as *mut u8;
            ptr::copy_nonoverlapping(data, sysbuf, size as usize);
            drop(guard);
            ExFreePool(data as PVOID);
            complete_irp(pending, STATUS_SUCCESS, size as usize);
            return;
        }

        drop(guard);
        ExFreePool(data as PVOID);
        DROP_COUNT.fetch_add(1, Ordering::Relaxed);
        complete_irp(pending, STATUS_BUFFER_TOO_SMALL, size as usize);
        return;
    }

    queue_push_locked(data, size);
}
Trois chemins.

**Chemin 1 — livraison directe.** Une IRP est parkée dans `PENDING_IRP` et son 
buffer de sortie est assez grand. Le producteur copie tout droit dans le
`SystemBuffer` de l'agent (la copie allouée côté kernel du buffer de sortie de
l'agent, créée par l'I/O manager parce que le device déclare `DO_BUFFERED_IO`),
libère le bloc de staging, et complète l'IRP. L'événement n'atteint jamais le
ring. **Chemin 2 — buffer trop petit.** Une IRP est parkée mais son buffer de sortie
est plus court que cet événement. L'IRP est complétée avec
`STATUS_BUFFER_TOO_SMALL` et `Information = size`. L'agent lit la taille, alloue
un buffer plus grand, et réémet l'IOCTL. L'événement courant est droppé parce
qu'il n'y a pas de manière propre de le remettre en tête du ring sans un second
type de slot. L'agent apprend le drop via le `drop_count` du prochain événement. **Chemin 3 — pas de waiter.** Aucune IRP n'est parkée (l'IOCTL précédente de
l'agent a été satisfaite, ou l'agent ne tourne pas). L'événement est poussé sur
le ring ; si le ring était plein, la plus vieille entrée est évincée par la
Section 3. Un détail d'ordering : on fait `swap` de `PENDING_IRP` vers `null` avant de
vérifier l'adéquation du buffer. L'IRP est prise quel que soit le sous-chemin
qui suit. La remettre quand le buffer est trop petit voudrait dire que de futurs
événements tenteraient une livraison dans un buffer déjà connu pour être trop
petit — ce qui défait le but du chemin sensible à la taille.
──[ 5. Le Contrat IOCTL ]──

Un code IOCTL, défini symétriquement des deux côtés :
pub const IOCTL_WEDR_GET_EVENT: u32 =
    ctl_code(FILE_DEVICE_UNKNOWN, 0x800, METHOD_BUFFERED, FILE_READ_ACCESS);
`METHOD_BUFFERED` (la méthode I/O qu'on a couverte en Partie 1 — `SystemBuffer` 
alloué côté kernel, une copie entrante, une sortante) est choisie parce que le
volume d'événements est faible (un événement par appel) et les propriétés de
sécurité (pas de surprises de révocation de buffer user mode, pas de contraintes
IRQL sur l'accès mémoire user mode) valent la copie par appel. `FILE_READ_ACCESS` reflète la sémantique — l'appel est une lecture du point de
vue de l'agent — et matche l'accès demandé par `CreateFileW(GENERIC_READ)` quand
l'agent ouvre le device. `0x800` est le code de fonction ; les valeurs 0x800 et au-dessus sont réservées
par la convention Microsoft pour usage tiers, laissant 0x000–0x7FF pour les
codes IOCTL possédés par Microsoft.
──[ 6. dispatch_device_control — Le Côté Consommateur ]──

Le handler d'IOCTL est le dual de `submit_event` :
let guard = SpinLockGuard::acquire(QUEUE_LOCK.as_mut_ptr() as *mut KSPIN_LOCK);

if let Some(slot) = queue_pop_locked() {
    drop(guard);
    if outlen < slot.size {
        ExFreePool(slot.data as PVOID);
        DROP_COUNT.fetch_add(1, Ordering::Relaxed);
        return complete_irp(irp, STATUS_BUFFER_TOO_SMALL, slot.size as usize);
    }
    let sysbuf = (*irp).AssociatedIrp.SystemBuffer as *mut u8;
    ptr::copy_nonoverlapping(slot.data, sysbuf, slot.size as usize);
    ExFreePool(slot.data as PVOID);
    return complete_irp(irp, STATUS_SUCCESS, slot.size as usize);
}

let prev = PENDING_IRP.compare_exchange(
    ptr::null_mut(), irp, Ordering::AcqRel, Ordering::Acquire,
);
drop(guard);
if prev.is_err() {
    return complete_irp(irp, STATUS_UNSUCCESSFUL, 0);
}
mark_irp_pending(irp);
wdk_sys::STATUS_PENDING
Deux chemins.

**Fast path** — le ring a un événement. Pop, vérifier la taille du buffer agent, 
copier dans `SystemBuffer` et compléter avec `STATUS_SUCCESS` (ou échouer avec
`STATUS_BUFFER_TOO_SMALL` comme précédemment). **Slow path** — le ring est vide. Parker l'IRP dans `PENDING_IRP` via un
compare-and-exchange contre `null`, marquer l'IRP pending, et retourner
`STATUS_PENDING`. Le CAS protège contre une seconde IOCTL concurrente — un agent
mal configuré, un test harness, ou simplement un bug — en refusant d'écraser une
IRP déjà parkée. Le device est single-client by design et un second waiter
concurrent reçoit `STATUS_UNSUCCESSFUL` plutôt que de corrompre l'état.
──[ 7. STATUS_PENDING et Marquage D'IRP ]──

`STATUS_PENDING` (un `NTSTATUS` spécial qui signale "la requête n'est pas encore 
complétée, ne pas détruire l'IRP, l'appelant devrait attendre") ne fonctionne
que si la routine de dispatch pose aussi un flag dans la stack location courante
de l'IRP avant de retourner :
pub unsafe fn mark_irp_pending(irp: PIRP) {
    let stack = current_stack_location(irp);
    (*stack).Control |= SL_PENDING_RETURNED;   // 0x01
}
`SL_PENDING_RETURNED` est le bit que l'I/O manager vérifie pour confirmer que la 
routine de dispatch avait effectivement l'intention de différer la complétion.
Les bindings `wdk-sys` n'exposent pas la macro `IoMarkIrpPending` de `wdm.h`,
donc on pose le bit directement. L'omettre, c'est un bug kernel classique :
l'I/O manager traite l'IRP comme complétée malgré le retour `STATUS_PENDING`, le
thread user se débloque tôt, et le prochain accès mémoire dans `SystemBuffer`
écrit dans une allocation libérée. La complétion différée finit par arriver via un des trois chemins :
    - submit_event Chemin 1 — un producteur copie dans le buffer
      de l'agent et complète l'IRP avec SUCCESS.
    - dispatch_cleanup — le dernier handle de l'agent se ferme,
      on annule l'IRP parkée avec STATUS_CANCELLED.
    - driver_unload — le driver est en cours de déchargement, on
      annule l'IRP parkée avec STATUS_CANCELLED.

──[ 8. Cleanup — Annuler L'IRP Parkée ]──

`IRP_MJ_CLEANUP` est envoyé par l'I/O manager quand le dernier handle sur le 
file object se ferme — exit du processus agent, `CloseHandle`, etc. Le buffer
user mode derrière `SystemBuffer` est sur le point d'être libéré :
pub unsafe extern "C" fn dispatch_cleanup(_dev: _, irp: PIRP) -> NTSTATUS {
    let pending = PENDING_IRP.swap(ptr::null_mut(), Ordering::AcqRel);
    if !pending.is_null() {
        complete_irp(pending, STATUS_CANCELLED, 0);
    }
    complete_irp(irp, STATUS_SUCCESS, 0)
}
Le swap vers `null`, c'est ce qui empêche un producteur qui arrive en 
concurrence de livrer dans le buffer voué à mourir. Le `complete_irp(pending,
STATUS_CANCELLED, …)` suivant relâche l'IRP vers l'I/O manager avec le bon
statut, et le `DeviceIoControl` bloqué de l'agent retourne avec
`ERROR_OPERATION_ABORTED`. `DriverUnload` exécute la même danse pour la même raison — le driver est en
cours de démontage et toute IRP en attente doit être relâchée avant que le
device ne soit supprimé.
──[ 9. Spinlocks et IofCompleteRequest ]──

Un détail sur la portée du lock qui mérite d'être exposé explicitement.

`IofCompleteRequest` (la routine kernel qui exécute les routines de complétion 
de l'IRP et relâche l'IRP) tourne synchroniquement sur le thread appelant, à
`DISPATCH_LEVEL`. Les routines de complétion peuvent faire du vrai boulot :
signaler des threads user mode, logger, mettre à jour des structures de compta.
Tenir le spinlock de queue à travers cet appel risque à la fois l'inversion de
lock (une routine de complétion prend un lock qui est bloqué derrière quelque
chose qui attend `QUEUE_LOCK`) et des pics de contention qui scalent avec le
coût de la routine de complétion en aval. La règle du codebase, appliquée uniformément : drop le spinlock avant d'appeler
`complete_irp`. Le pattern est explicite dans chaque chemin :
drop(guard);
…
complete_irp(irp, STATUS_…, …);
Le `drop(guard)` est la release explicite parce que le drop par portée de Rust 
étendrait la section critique jusqu'à la fin du bloc englobant, complétion
incluse. Le `drop` manuel relâche le lock au point précis qu'on veut. Ça couvre le chemin de livraison d'événements de bout en bout. Prochain post :
l'agent user mode — comment il pilote vraiment cette boucle d'IOCTL, ce qu'il
fait avec les octets, et comment il les expédie en aval.