(in-package :passepartout) (defvar *loop-interrupt* nil) (defvar *scope-resolver* nil "If set, function returning current scope keyword. Used by perceive gate.") (defvar *loop-async-sensors* '(:chat-message :delegation :user-command) "Sensors that are processed in dedicated threads.") (defvar *loop-focus-id* nil "The Org ID of the node the user is currently interacting with.") (defvar *pre-reason-handlers* (make-hash-table :test 'eq) "Pre-reason handler registry: sensor keyword → handler function.") (defun register-pre-reason-handler (sensor fn) "Registers FN to handle signals with SENSOR in the perceive gate. FN receives (signal) and returns T if consumed, nil to continue." (setf (gethash sensor *pre-reason-handlers*) fn)) (defun inject-stimulus (raw-message &key stream (depth 0)) (stimulus-inject raw-message :stream stream :depth depth)) (defun stimulus-inject (raw-message &key stream (depth 0)) "Inject a raw message into the signal processing pipeline." (let* ((payload (getf raw-message :payload)) (sensor (getf payload :sensor)) (meta (getf raw-message :meta)) (async-p (or (getf payload :async-p) (member sensor *loop-async-sensors*)))) (unless meta (setf meta (list :SOURCE :SYSTEM :SESSION-ID "internal"))) (when stream (setf (getf meta :reply-stream) stream)) (setf (getf raw-message :meta) meta) (setf (getf raw-message :depth) depth) (if async-p (bt:make-thread (lambda () (restart-case (process-signal raw-message) (skip-event () nil))) :name "passepartout-async-task") (restart-case (handler-bind ((error (lambda (c) (log-message "SYSTEM ERROR: ~a" c) (invoke-restart 'skip-event)))) (process-signal raw-message)) (skip-event () (log-message "SYSTEM RECOVERY: Stimulus dropped.")))))) (defun loop-gate-perceive (signal) "Stage 1 of the metabolic pipeline: Normalize sensory input." (let* ((payload (getf signal :payload)) (type (getf signal :type)) (meta (getf signal :meta)) (sensor (getf payload :sensor))) ;; HITL: intercept approval/denial commands before LLM processing (when (and (eq sensor :user-input) (stringp (getf payload :text))) (let ((text (getf payload :text))) (when (ignore-errors (hitl-handle-message text (getf meta :source))) (log-message "GATE [Perceive]: HITL command processed — ~a" text) (return-from loop-gate-perceive signal)))) ;; Pre-reason handlers: dispatch custom sensors to registered skill handlers (let ((handler (gethash sensor *pre-reason-handlers*))) (when handler (when (funcall handler signal) (return-from loop-gate-perceive signal)))) (log-message "GATE [Perceive]: ~a (~a) [Source: ~s]" type (or sensor "no-sensor") (getf meta :source)) (cond ((eq type :EVENT) (case sensor (:buffer-update (let ((ast (getf payload :ast))) (when ast (snapshot-memory) (ingest-ast ast :scope (if *scope-resolver* (funcall *scope-resolver*) :memex))))) (:point-update (let ((element (getf payload :element))) (when element (snapshot-memory) (setf *loop-focus-id* (getf element :id)) (ingest-ast element :scope (if *scope-resolver* (funcall *scope-resolver*) :memex))))) (:interrupt (setf *loop-interrupt* t)) ;; HITL: re-injected approved action from dispatcher-approvals-process (:approval-required (when (getf payload :approved) (log-message "GATE [Perceive]: Approved Flight Plan re-injected") (setf (getf signal :approved) t) (setf (getf signal :approved-action) (getf payload :action)))) ;; Default sensor: pass through without requiring user-input processing (otherwise (log-message "GATE [Perceive]: Unknown sensor ~a, passing through" sensor)))) ((eq type :RESPONSE) (log-message "GATE [Perceive]: Act Result -> ~a" (getf payload :status)))) (setf (getf signal :status) :perceived) (setf (getf signal :foveal-focus) *loop-focus-id*) signal)) (defun perceive-gate (signal) (loop-gate-perceive signal)) (eval-when (:compile-toplevel :load-toplevel :execute) (ql:quickload :fiveam :silent t)) (defpackage :passepartout-pipeline-perceive-tests (:use :cl :fiveam :passepartout) (:export #:pipeline-perceive-suite)) (in-package :passepartout-pipeline-perceive-tests) (def-suite pipeline-perceive-suite :description "Test suite for Perceive pipeline") (in-suite pipeline-perceive-suite) (test test-loop-gate-perceive (clrhash passepartout::*memory-store*) (let* ((signal (list :type :EVENT :payload (list :sensor :buffer-update :ast (list :type :HEADLINE :properties (list :ID "test-node" :TITLE "Test") :contents nil)))) (result (loop-gate-perceive signal))) (is (eq :perceived (getf result :status))) (is (not (null (gethash "test-node" passepartout::*memory-store*)))))) (test test-depth-limiting (let ((runaway-signal (list :type :EVENT :depth 11 :payload (list :sensor :heartbeat)))) (is (null (process-signal runaway-signal)))))