Phase 2: Clean package boundaries — declarative make-pipeline DSL

- New make-pipeline function: single declarative call creates server,
  mounts, encoders, and pipeline wiring from an output spec
- Pipeline owns encoder lifecycle (pipeline-encoders slot, auto-cleanup)
- Pipeline owns server when it creates one (pipeline-owns-server-p)
- Hook system wired: pipeline-add-hook fires on track-change and
  playlist-change via pipeline-fire-hook
- stream-harmony.lisp slimmed: start is 1 make-pipeline + 2 hooks,
  stop is 1 pipeline-stop call (cleanup automatic)
- Removed global encoder variables from Asteroid glue layer
- Backward-compatible: dj-session.lisp unchanged, cl-streamer:*server*
  still set for legacy callers

Runtime verified: audio streams, metadata displays, crossfades work.
This commit is contained in:
Glenn Thompson 2026-03-08 11:23:40 +03:00
parent c79cebcfa7
commit b7266e3ac2
2 changed files with 148 additions and 66 deletions

View File

@ -21,6 +21,8 @@
#:pipeline-fire-hook) #:pipeline-fire-hook)
(:export #:audio-pipeline (:export #:audio-pipeline
#:make-audio-pipeline #:make-audio-pipeline
#:make-pipeline
#:make-encoder-for-format
#:add-pipeline-output #:add-pipeline-output
;; Re-export protocol generics so callers can use cl-streamer/harmony:X ;; Re-export protocol generics so callers can use cl-streamer/harmony:X
#:pipeline-start #:pipeline-start
@ -38,6 +40,9 @@
#:pipeline-add-hook #:pipeline-add-hook
#:pipeline-remove-hook #:pipeline-remove-hook
#:pipeline-fire-hook #:pipeline-fire-hook
;; Pipeline state accessors
#:pipeline-encoders
#:pipeline-owns-server-p
;; Backward-compatible aliases (delegate to protocol generics) ;; Backward-compatible aliases (delegate to protocol generics)
#:start-pipeline #:start-pipeline
#:stop-pipeline #:stop-pipeline
@ -160,7 +165,12 @@
:documentation "Callback (lambda (pipeline playlist-path)) called when scheduler playlist starts") :documentation "Callback (lambda (pipeline playlist-path)) called when scheduler playlist starts")
;; Hook system ;; Hook system
(hooks :initform (make-hash-table :test 'eq) :reader pipeline-hooks (hooks :initform (make-hash-table :test 'eq) :reader pipeline-hooks
:documentation "Hash table mapping event keywords to lists of hook functions"))) :documentation "Hash table mapping event keywords to lists of hook functions")
;; Encoder & server ownership (Phase 2)
(encoders :initform nil :accessor pipeline-encoders
:documentation "List of (encoder . mount-path) pairs owned by the pipeline")
(owns-server :initform nil :accessor pipeline-owns-server-p
:documentation "T if pipeline created the server and should stop it on shutdown")))
(defun make-audio-pipeline (&key encoder stream-server (mount-path "/stream.mp3") (defun make-audio-pipeline (&key encoder stream-server (mount-path "/stream.mp3")
(sample-rate 44100) (channels 2)) (sample-rate 44100) (channels 2))
@ -178,6 +188,81 @@
(drain-add-output (pipeline-drain pipeline) encoder mount-path)) (drain-add-output (pipeline-drain pipeline) encoder mount-path))
pipeline)) pipeline))
(defun make-encoder-for-format (format &key (bitrate 128) (sample-rate 44100) (channels 2))
"Create an encoder for the given FORMAT keyword (:mp3 or :aac)."
(ecase format
(:mp3 (cl-streamer:make-mp3-encoder :bitrate bitrate
:sample-rate sample-rate
:channels channels))
(:aac (cl-streamer:make-aac-encoder :bitrate (* bitrate 1000)
:sample-rate sample-rate
:channels channels))))
(defun content-type-for-format (format)
"Return the MIME content type for FORMAT keyword."
(ecase format
(:mp3 "audio/mpeg")
(:aac "audio/aac")))
(defun make-pipeline (&key (port 8000) (sample-rate 44100) (channels 2) outputs server)
"Create a complete streaming pipeline from a declarative spec.
PORT: stream server port (ignored if SERVER is provided).
OUTPUTS: list of output specs, each a plist:
(:format :mp3 :mount \"/stream.mp3\" :bitrate 128 :name \"My Stream\")
SERVER: an existing stream-server instance (optional).
If NIL, a new server is created and owned by the pipeline.
Example:
(make-pipeline :port 8000
:outputs '((:format :mp3 :mount \"/radio.mp3\" :bitrate 128
:name \"Radio MP3\")
(:format :aac :mount \"/radio.aac\" :bitrate 128
:name \"Radio AAC\")))
Returns the pipeline (already wired, but not started call pipeline-start)."
(let* ((owns-server (null server))
(srv (or server
(let ((s (cl-streamer:make-stream-server :port port)))
(cl-streamer:start-server s)
;; Set global so write-audio-data/set-now-playing work
(setf cl-streamer:*server* s)
s)))
(pipeline (make-instance 'audio-pipeline
:stream-server srv
:sample-rate sample-rate
:channels channels))
(encoders nil))
(setf (pipeline-owns-server-p pipeline) owns-server)
;; Create drain
(setf (pipeline-drain pipeline)
(make-instance 'streaming-drain :channels channels))
;; Process each output spec
(dolist (spec outputs)
(let* ((format (getf spec :format))
(mount (getf spec :mount))
(bitrate (or (getf spec :bitrate) 128))
(name (or (getf spec :name) "CL-Streamer"))
(genre (or (getf spec :genre) "Various"))
(content-type (or (getf spec :content-type)
(content-type-for-format format)))
(encoder (make-encoder-for-format format
:bitrate bitrate
:sample-rate sample-rate
:channels channels)))
;; Add mount point to server
(cl-streamer:add-mount srv mount
:content-type content-type
:bitrate bitrate
:name name
:genre genre)
;; Wire encoder to drain
(drain-add-output (pipeline-drain pipeline) encoder mount)
(push (cons encoder mount) encoders)))
(setf (pipeline-encoders pipeline) (nreverse encoders))
(log:info "Pipeline configured: ~A outputs on port ~A"
(length outputs) (if owns-server port (cl-streamer::server-port srv)))
pipeline))
(defun add-pipeline-output (pipeline encoder mount-path) (defun add-pipeline-output (pipeline encoder mount-path)
"Add an additional encoder/mount output to the pipeline. "Add an additional encoder/mount output to the pipeline.
Can be called before or after start-pipeline." Can be called before or after start-pipeline."
@ -224,11 +309,23 @@
pipeline) pipeline)
(defmethod pipeline-stop ((pipeline audio-pipeline)) (defmethod pipeline-stop ((pipeline audio-pipeline))
"Stop the audio pipeline." "Stop the audio pipeline. Cleans up owned encoders and server."
(setf (%pipeline-running pipeline) nil) (setf (%pipeline-running pipeline) nil)
(when (pipeline-harmony-server pipeline) (when (pipeline-harmony-server pipeline)
(mixed:end (pipeline-harmony-server pipeline)) (mixed:end (pipeline-harmony-server pipeline))
(setf (pipeline-harmony-server pipeline) nil)) (setf (pipeline-harmony-server pipeline) nil))
;; Close owned encoders
(dolist (pair (pipeline-encoders pipeline))
(handler-case
(cl-streamer:encoder-close (car pair))
(error (e) (log:debug "Error closing encoder for ~A: ~A" (cdr pair) e))))
(setf (pipeline-encoders pipeline) nil)
;; Stop owned server
(when (pipeline-owns-server-p pipeline)
(handler-case
(cl-streamer:stop-server (pipeline-server pipeline))
(error (e) (log:debug "Error stopping server: ~A" e)))
(setf (pipeline-owns-server-p pipeline) nil))
(log:info "Audio pipeline stopped") (log:info "Audio pipeline stopped")
pipeline) pipeline)
@ -337,8 +434,11 @@
(update-all-mounts-metadata pipeline title)) (update-all-mounts-metadata pipeline title))
(defun notify-track-change (pipeline track-info) (defun notify-track-change (pipeline track-info)
"Update pipeline state and fire the on-track-change callback." "Update pipeline state and fire hooks + legacy callback."
(setf (%pipeline-current-track pipeline) track-info) (setf (%pipeline-current-track pipeline) track-info)
;; Fire hook system (Phase 2)
(pipeline-fire-hook pipeline :track-change track-info)
;; Legacy callback (backward compat)
(when (pipeline-on-track-change pipeline) (when (pipeline-on-track-change pipeline)
(handler-case (handler-case
(funcall (pipeline-on-track-change pipeline) pipeline track-info) (funcall (pipeline-on-track-change pipeline) pipeline track-info)
@ -418,16 +518,19 @@
;; Replace remaining list and update current for loop-queue ;; Replace remaining list and update current for loop-queue
(setf (car remaining-ref) all-queued) (setf (car remaining-ref) all-queued)
(setf (car current-list-ref) (copy-list all-queued)) (setf (car current-list-ref) (copy-list all-queued))
;; Fire playlist-change callback so app layer updates metadata ;; Fire hooks + legacy callback so app layer updates metadata
(when (pipeline-on-playlist-change pipeline) (let ((playlist-path (pipeline-pending-playlist-path pipeline)))
(let ((playlist-path (pipeline-pending-playlist-path pipeline))) (when playlist-path
(when playlist-path ;; Fire hook system (Phase 2)
(pipeline-fire-hook pipeline :playlist-change playlist-path)
;; Legacy callback (backward compat)
(when (pipeline-on-playlist-change pipeline)
(handler-case (handler-case
(funcall (pipeline-on-playlist-change pipeline) (funcall (pipeline-on-playlist-change pipeline)
pipeline playlist-path) pipeline playlist-path)
(error (e) (error (e)
(log:warn "Playlist change callback error: ~A" e))) (log:warn "Playlist change callback error: ~A" e))))
(setf (pipeline-pending-playlist-path pipeline) nil)))) (setf (pipeline-pending-playlist-path pipeline) nil)))
t)))) t))))
(defun next-entry (pipeline remaining-ref current-list-ref) (defun next-entry (pipeline remaining-ref current-list-ref)

View File

@ -12,11 +12,19 @@
(defvar *harmony-stream-port* 8000 (defvar *harmony-stream-port* 8000
"Port for the cl-streamer HTTP stream server.") "Port for the cl-streamer HTTP stream server.")
(defvar *harmony-mp3-encoder* nil ;; Encoder instances are now owned by the pipeline (Phase 2).
"MP3 encoder instance.") ;; Kept as aliases for backward compatibility with any external references.
(defun harmony-mp3-encoder ()
"Get the MP3 encoder from the pipeline (if running)."
(when *harmony-pipeline*
(car (find "/asteroid.mp3" (cl-streamer/harmony:pipeline-encoders *harmony-pipeline*)
:key #'cdr :test #'string=))))
(defvar *harmony-aac-encoder* nil (defun harmony-aac-encoder ()
"AAC encoder instance.") "Get the AAC encoder from the pipeline (if running)."
(when *harmony-pipeline*
(car (find "/asteroid.aac" (cl-streamer/harmony:pipeline-encoders *harmony-pipeline*)
:key #'cdr :test #'string=))))
(defvar *harmony-state-file* (defvar *harmony-state-file*
(merge-pathnames ".playback-state.lisp" (asdf:system-source-directory :asteroid)) (merge-pathnames ".playback-state.lisp" (asdf:system-source-directory :asteroid))
@ -197,75 +205,46 @@
;;; ---- Pipeline Lifecycle ---- ;;; ---- Pipeline Lifecycle ----
(defun start-harmony-streaming (&key (port *harmony-stream-port*) (defun start-harmony-streaming (&key (port *harmony-stream-port*)
(mp3-bitrate 128000) (mp3-bitrate 128)
(aac-bitrate 128000)) (aac-bitrate 128))
"Start the cl-streamer pipeline with MP3 and AAC outputs. "Start the cl-streamer pipeline with MP3 and AAC outputs.
Should be called once during application startup." Should be called once during application startup.
MP3-BITRATE and AAC-BITRATE are in kbps (e.g. 128)."
(when *harmony-pipeline* (when *harmony-pipeline*
(log:warn "Harmony streaming already running") (log:warn "Harmony streaming already running")
(return-from start-harmony-streaming *harmony-pipeline*)) (return-from start-harmony-streaming *harmony-pipeline*))
;; Start the stream server ;; Create pipeline from declarative spec — server, mounts, encoders all handled
(cl-streamer:start :port port)
;; Add mount points
(cl-streamer:add-mount cl-streamer:*server* "/asteroid.mp3"
:content-type "audio/mpeg"
:bitrate 128
:name "Asteroid Radio MP3")
(cl-streamer:add-mount cl-streamer:*server* "/asteroid.aac"
:content-type "audio/aac"
:bitrate 128
:name "Asteroid Radio AAC")
;; Create encoders
(setf *harmony-mp3-encoder*
(cl-streamer:make-mp3-encoder :bitrate (floor mp3-bitrate 1000)
:sample-rate 44100
:channels 2))
(setf *harmony-aac-encoder*
(cl-streamer:make-aac-encoder :bitrate aac-bitrate
:sample-rate 44100
:channels 2))
;; Create pipeline with track-change callback
(setf *harmony-pipeline* (setf *harmony-pipeline*
(cl-streamer/harmony:make-audio-pipeline (cl-streamer/harmony:make-pipeline
:encoder *harmony-mp3-encoder* :port port
:stream-server cl-streamer:*server* :outputs (list (list :format :mp3
:mount-path "/asteroid.mp3")) :mount "/asteroid.mp3"
:bitrate mp3-bitrate
:name "Asteroid Radio MP3")
(list :format :aac
:mount "/asteroid.aac"
:bitrate aac-bitrate
:name "Asteroid Radio AAC"))))
;; Add AAC output ;; Register hooks
(cl-streamer/harmony:add-pipeline-output *harmony-pipeline* (cl-streamer/harmony:pipeline-add-hook *harmony-pipeline*
*harmony-aac-encoder* :track-change #'on-harmony-track-change)
"/asteroid.aac") (cl-streamer/harmony:pipeline-add-hook *harmony-pipeline*
:playlist-change #'on-harmony-playlist-change)
;; Set the track-change callback
(setf (cl-streamer/harmony:pipeline-on-track-change *harmony-pipeline*)
#'on-harmony-track-change)
;; Set the playlist-change callback (fires when scheduler playlist actually starts)
(setf (cl-streamer/harmony:pipeline-on-playlist-change *harmony-pipeline*)
#'on-harmony-playlist-change)
;; Start the audio pipeline ;; Start the audio pipeline
(cl-streamer/harmony:start-pipeline *harmony-pipeline*) (cl-streamer/harmony:pipeline-start *harmony-pipeline*)
(log:info "Harmony streaming started on port ~A (MP3 + AAC)" port) (log:info "Harmony streaming started on port ~A (MP3 + AAC)" port)
*harmony-pipeline*) *harmony-pipeline*)
(defun stop-harmony-streaming () (defun stop-harmony-streaming ()
"Stop the cl-streamer pipeline and stream server." "Stop the cl-streamer pipeline and stream server.
Pipeline owns encoders and server cleanup is automatic."
(when *harmony-pipeline* (when *harmony-pipeline*
(cl-streamer/harmony:stop-pipeline *harmony-pipeline*) (cl-streamer/harmony:pipeline-stop *harmony-pipeline*)
(setf *harmony-pipeline* nil)) (setf *harmony-pipeline* nil))
(when *harmony-mp3-encoder*
(cl-streamer:close-encoder *harmony-mp3-encoder*)
(setf *harmony-mp3-encoder* nil))
(when *harmony-aac-encoder*
(cl-streamer:close-aac-encoder *harmony-aac-encoder*)
(setf *harmony-aac-encoder* nil))
(cl-streamer:stop)
(log:info "Harmony streaming stopped")) (log:info "Harmony streaming stopped"))
;;; ---- Playlist Control ---- ;;; ---- Playlist Control ----