(in-package :passepartout) (defvar *loop-interrupt* nil) (defvar *loop-async-sensors* '(:chat-message :delegation :user-command) "Sensors that are processed in dedicated threads.") (defvar *loop-focus-id* nil "The Org ID of the node the user is currently interacting with.") (defun stimulus-inject (raw-message &key stream (depth 0)) "Inject a raw message into the signal processing pipeline." (let* ((payload (getf raw-message :payload)) (sensor (getf payload :sensor)) (meta (getf raw-message :meta)) (async-p (or (getf payload :async-p) (member sensor *loop-async-sensors*)))) (unless meta (setf meta (list :SOURCE :SYSTEM :SESSION-ID "internal"))) (when stream (setf (getf meta :reply-stream) stream)) (setf (getf raw-message :meta) meta) (setf (getf raw-message :depth) depth) (if async-p (bt:make-thread (lambda () (restart-case (process-signal raw-message) (skip-event () nil))) :name "passepartout-async-task") (restart-case (handler-bind ((error (lambda (c) (log-message "SYSTEM ERROR: ~a" c) (invoke-restart 'skip-event)))) (process-signal raw-message)) (skip-event () (log-message "SYSTEM RECOVERY: Stimulus dropped.")))))) (defun loop-gate-perceive (signal) "Stage 1 of the metabolic pipeline: Normalize sensory input." (let* ((payload (getf signal :payload)) (type (getf signal :type)) (meta (getf signal :meta)) (sensor (getf payload :sensor))) (log-message "GATE [Perceive]: ~a (~a) [Source: ~s]" type (or sensor "no-sensor") (getf meta :source)) (cond ((eq type :EVENT) (case sensor (:buffer-update (let ((ast (getf payload :ast))) (when ast (snapshot-memory) (ingest-ast ast)))) (:point-update (let ((element (getf payload :element))) (when element (snapshot-memory) (setf *loop-focus-id* (getf element :id)) (ingest-ast element)))) (:interrupt (setf *loop-interrupt* t)))) ((eq type :RESPONSE) (log-message "GATE [Perceive]: Act Result -> ~a" (getf payload :status)))) (setf (getf signal :status) :perceived) (setf (getf signal :foveal-focus) *loop-focus-id*) signal)) (eval-when (:compile-toplevel :load-toplevel :execute) (ql:quickload :fiveam :silent t)) (defpackage :passepartout-pipeline-perceive-tests (:use :cl :fiveam :passepartout) (:export #:pipeline-perceive-suite)) (in-package :passepartout-pipeline-perceive-tests) (def-suite pipeline-perceive-suite :description "Test suite for Perceive pipeline") (in-suite pipeline-perceive-suite) (test test-loop-gate-perceive (clrhash passepartout::*memory-store*) (let* ((signal (list :type :EVENT :payload (list :sensor :buffer-update :ast (list :type :HEADLINE :properties (list :ID "test-node" :TITLE "Test") :contents nil)))) (result (loop-gate-perceive signal))) (is (eq :perceived (getf result :status))) (is (not (null (gethash "test-node" passepartout::*memory-store*)))))) (test test-depth-limiting (let ((runaway-signal (list :type :EVENT :depth 11 :payload (list :sensor :heartbeat)))) (is (null (process-signal runaway-signal)))))