Files
passepartout/lisp/core-pipeline.lisp
Amr Gharbeia 8fd56dece3 v0.8.2: cleanup + prose + structure + decomposition + budget + errors
Phase 1 — dedup + hardening (~9 items):
- Remove duplicate *skill-registry* defvar from core-skills
- Merge *backend-registry* into *probabilistic-backends*, delete backend-register
- Remove inject-stimulus alias, standardize on stimulus-inject
- Add pre-eval sandbox (skill-source-scan) blocks restricted symbols before eval
- Remove dead plist-get function; remove duplicate json-alist-to-plist export
- Fix read-framed-message whitespace DoS (4096-iteration max)
- Add *read-eval* nil to dispatcher-approvals-process read-from-string (RCE)
- Add test-op to ASDF; update .asd version 0.4.3→0.7.2

Phase 2 — prose + contracts + reorder:
- Split ROADMAP: 2623→1089 lines (TODO only), CHANGELOG: 260→1528 lines (full DONE history, 14 versions reverse chron)
- Add Contracts + Overview to 6 channel files + embedding-native + programming-standards + symbolic-scope
- Reorder 28 .org files: Contract → Test Suite → Implementation (TDD order)
- Add 7-phase inline prose to think() in core-reason
- Expand USER_MANUAL: 183→461 lines (10 new sections)

Phase 3 — decomposition + export organization:
- Decompose think() into think-assemble-prompt, think-call-llm, think-parse-response orchestrator
- Organize 188 exports into 16 grouped sections by module

Phase 4 — budget enforcement + error protocol:
- Per-session budget enforcement (SESSION_BUDGET_USD env var, budget-exhausted-p, guard in think-call-llm)
- Error condition hierarchy (6 conditions: pipeline-error, llm-error, gate-error, budget-error, protocol-error)
- Restarts in loop-process: skip-signal, use-fallback, abort-pipeline
2026-05-13 09:17:48 -04:00

233 lines
10 KiB
Common Lisp

(eval-when (:compile-toplevel :load-toplevel :execute)
(ql:quickload :fiveam :silent t))
(defpackage :passepartout-immune-system-tests
(:use :cl :fiveam :passepartout)
(:export #:immune-suite))
(in-package :passepartout-immune-system-tests)
(def-suite immune-suite :description "Verification of the Immune System (Core Error Hooks)")
(in-suite immune-suite)
(test loop-error-injection
"Contract 1: a crash in think/decide triggers :loop-error stimulus."
(clrhash passepartout::*skill-registry*)
(passepartout:defskill :evil-skill
:priority 100
:trigger (lambda (ctx) (eq (getf (getf ctx :payload) :sensor) :user-input))
:probabilistic (lambda (ctx) (declare (ignore ctx)) (error "CRITICAL BRAIN FAILURE"))
:deterministic nil)
(passepartout:loop-process '(:type :EVENT :payload (:sensor :user-input)))
(let ((logs (if (fboundp 'passepartout::context-get-system-logs)
(passepartout:context-get-system-logs 20)
nil)))
(is (or (null logs) ; no log service available — degraded but not broken
(not (null (find-if (lambda (line) (search "CRITICAL BRAIN FAILURE" line)) logs)))))))
(test test-process-signal-normal-path
"Contract 1: a valid signal passes through the pipeline without crash."
(clrhash passepartout::*skill-registry*)
(handler-case
(let ((signal (list :type :EVENT :depth 0 :payload (list :sensor :heartbeat))))
(process-signal signal)
(pass))
(error (c)
(fail "Pipeline crashed on normal signal: ~a" c))))
(test test-loop-process-returns-nil-on-deep
"Contract 1: depth > 10 returns nil from loop-process."
(let ((result (loop-process '(:type :EVENT :depth 11 :payload (:sensor :heartbeat)))))
(is (null result))))
(in-package :passepartout)
(define-condition passepartout-error (error)
((message :initarg :message :reader error-message))
(:report (lambda (c s) (format s "Passepartout error: ~a" (error-message c))))
(:documentation "Root of the pipeline error hierarchy."))
(define-condition pipeline-error (passepartout-error)
((signal :initarg :signal :reader pipeline-error-signal :initform nil))
(:report (lambda (c s) (format s "Pipeline error: ~a" (error-message c))))
(:documentation "Any error during the Perceive→Reason→Act cycle."))
(define-condition llm-error (pipeline-error)
((provider :initarg :provider :reader llm-error-provider)
(cascade :initarg :cascade :reader llm-error-cascade :initform nil)
(attempt-count :initarg :attempt-count :reader llm-error-attempt-count :initform 0))
(:report (lambda (c s) (format s "LLM error (~a): ~a" (llm-error-provider c) (error-message c))))
(:documentation "LLM provider failure: timeout, cascade exhaustion, or API error."))
(define-condition gate-error (pipeline-error)
((gate-name :initarg :gate-name :reader gate-error-gate-name)
(rejected-action :initarg :rejected-action :reader gate-error-rejected-action))
(:report (lambda (c s) (format s "Gate ~a blocked action: ~a" (gate-error-gate-name c) (error-message c))))
(:documentation "Deterministic gate blocked a proposed action."))
(define-condition budget-error (pipeline-error)
((remaining :initarg :remaining :reader budget-error-remaining :initform 0.0)
(requested :initarg :requested :reader budget-error-requested :initform 0.0))
(:report (lambda (c s) (format s "Budget exhausted: $~,4f remaining, $~,4f requested" (budget-error-remaining c) (budget-error-requested c))))
(:documentation "Session budget cap has been reached."))
(define-condition protocol-error (passepartout-error)
((raw-message :initarg :raw-message :reader protocol-error-raw-message :initform nil))
(:report (lambda (c s) (format s "Protocol error: ~a" (error-message c))))
(:documentation "Malformed message, framing failure, or schema violation."))
(defvar *interrupt-flag* nil
"Atomic flag set by signal handlers to trigger graceful shutdown.")
(defvar *loop-interrupt-lock* (bt:make-lock "harness-interrupt-lock")
"Mutex protecting *interrupt-flag* access.")
(defvar *heartbeat-thread* nil
"Handle to the heartbeat thread.")
(defun loop-process (signal)
"The entry point to the Metabolic Pipeline: Perceive -> Reason -> Act."
(let ((current-signal signal))
(loop while current-signal do
(let ((depth (getf current-signal :depth 0))
(meta (getf current-signal :meta)))
(when (> depth 10)
(log-message "METABOLISM ERROR: Max recursion depth reached.")
(return nil))
(when (bt:with-lock-held (*loop-interrupt-lock*) *interrupt-flag*)
(log-message "METABOLISM: Interrupted by shutdown signal.")
(return nil))
(restart-case
(handler-bind
((pipeline-error (lambda (c)
(log-message "PIPELINE ERROR: ~a" (error-message c)))))
(handler-case
(progn
(setf current-signal (perceive-gate current-signal))
(setf current-signal (reason-gate current-signal))
(let ((feedback (act-gate current-signal)))
(if feedback
(progn
(unless (getf feedback :meta) (setf (getf feedback :meta) meta))
(setf current-signal feedback))
(setf current-signal nil))))
(error (c)
(let ((sensor (ignore-errors (getf (getf current-signal :payload) :sensor))))
(log-message "METABOLISM CRASH [~a]: ~a" (or sensor :unknown) c)
(unless (member sensor '(:loop-error :tool-error :syntax-error))
(log-message "CRITICAL ERROR: Initiating Micro-Rollback.")
(rollback-memory 0))
(if (or (> depth 2) (member sensor '(:loop-error :tool-error)))
(setf current-signal nil)
(setf current-signal
(list :type :EVENT :depth (1+ depth) :meta meta
:payload (list :sensor :loop-error :message (format nil "~a" c) :depth depth))))))))
(skip-signal ()
:report "Drop the current signal and continue the loop."
(setf current-signal nil))
(use-fallback (text)
:report "Inject a canned response instead of the LLM result."
(setf current-signal
(list :type :EVENT :depth (1+ depth) :meta meta
:payload (list :sensor :loop-error :message text :depth depth))))
(abort-pipeline ()
:report "Terminate the cognitive cycle cleanly."
(return nil)))))))
(defun process-signal (signal)
(loop-process signal))
(defvar *memory-auto-save-interval* 300)
(defvar *heartbeat-save-counter* 0)
(defun heartbeat-start ()
"Starts the background heartbeat thread."
(let ((interval (or (ignore-errors (parse-integer (uiop:getenv "HEARTBEAT_INTERVAL"))) 60))
(auto-save (or (ignore-errors (parse-integer (uiop:getenv "MEMORY_AUTO_SAVE_INTERVAL"))) *memory-auto-save-interval*)))
(setf *memory-auto-save-interval* auto-save)
(setf *heartbeat-save-counter* 0)
(setf *heartbeat-thread*
(bt:make-thread
(lambda ()
(loop
(sleep interval)
(incf *heartbeat-save-counter*)
(when (>= *heartbeat-save-counter* (/ *memory-auto-save-interval* interval))
(setf *heartbeat-save-counter* 0)
(save-memory-to-disk))
(stimulus-inject
(list :type :EVENT :payload (list :sensor :heartbeat :unix-time (get-universal-time))))))
:name "passepartout-heartbeat"))))
(defvar *shutdown-save-enabled* t)
(defvar *system-health* :unknown
"Current system health status: :healthy, :degraded, :unhealthy, or :unknown.")
(defvar *health-check-ran* nil
"Flag indicating if initial health check has completed.")
(defun diagnostics-startup-run ()
"Runs the doctor diagnostics on startup. Returns health status."
(format t "~%")
(format t "==================================================~%")
(format t " DOCTOR: Running Startup Health Check~%")
(format t "==================================================~%")
(handler-case
(progn
(when (fboundp 'diagnostics-run-all)
(let ((result (diagnostics-run-all :auto-install nil)))
(setf *health-check-ran* t)
(if result
(progn
(setf *system-health* :healthy)
(format t "DAEMON: Health check passed. Starting services.~%"))
(progn
(setf *system-health* :degraded)
(format t "DAEMON: Health check found issues.~%")
(format t " Run 'passepartout diagnostics' to repair.~%")))))
(setf *health-check-ran* t))
(error (c)
(format t "DIAGNOSTICS ERROR: ~a~%" c)
(setf *system-health* :unhealthy)
(setf *health-check-ran* t)))
(format t "==================================================~%~%"))
(defun main ()
"Entry point for Passepartout. Initializes the system and enters idle loop."
(let* ((home (uiop:getenv "HOME"))
(env-file (uiop:merge-pathnames* ".config/passepartout/.env" (uiop:ensure-directory-pathname home))))
(when (uiop:file-exists-p env-file)
(cl-dotenv:load-env env-file)))
(load-memory-from-disk)
(actuator-initialize)
(skill-initialize-all)
;; Run proactive diagnostics before starting services
(diagnostics-startup-run)
(when (fboundp 'events-start-heartbeat)
(events-start-heartbeat))
(start-daemon)
#+sbcl
(sb-sys:enable-interrupt sb-unix:sigint
(lambda (sig code scp)
(declare (ignore sig code scp))
(log-message "SHUTDOWN: SIGINT received. Saving memory...")
(when *shutdown-save-enabled* (save-memory-to-disk))
(uiop:quit 0)))
(let ((sleep-interval (or (ignore-errors (parse-integer (uiop:getenv "DAEMON_SLEEP_INTERVAL"))) 3600)))
(loop
(when (bt:with-lock-held (*loop-interrupt-lock*) *interrupt-flag*)
(log-message "SHUTDOWN: Interrupt flag set. Saving memory...")
(when *shutdown-save-enabled* (save-memory-to-disk))
(return))
(sleep sleep-interval))))