diff --git a/cl-streamer/harmony-backend.lisp b/cl-streamer/harmony-backend.lisp index d31311f..ce57ca3 100644 --- a/cl-streamer/harmony-backend.lisp +++ b/cl-streamer/harmony-backend.lisp @@ -21,6 +21,8 @@ #:pipeline-fire-hook) (:export #:audio-pipeline #:make-audio-pipeline + #:make-pipeline + #:make-encoder-for-format #:add-pipeline-output ;; Re-export protocol generics so callers can use cl-streamer/harmony:X #:pipeline-start @@ -38,6 +40,9 @@ #:pipeline-add-hook #:pipeline-remove-hook #:pipeline-fire-hook + ;; Pipeline state accessors + #:pipeline-encoders + #:pipeline-owns-server-p ;; Backward-compatible aliases (delegate to protocol generics) #:start-pipeline #:stop-pipeline @@ -160,7 +165,12 @@ :documentation "Callback (lambda (pipeline playlist-path)) called when scheduler playlist starts") ;; Hook system (hooks :initform (make-hash-table :test 'eq) :reader pipeline-hooks - :documentation "Hash table mapping event keywords to lists of hook functions"))) + :documentation "Hash table mapping event keywords to lists of hook functions") + ;; Encoder & server ownership (Phase 2) + (encoders :initform nil :accessor pipeline-encoders + :documentation "List of (encoder . mount-path) pairs owned by the pipeline") + (owns-server :initform nil :accessor pipeline-owns-server-p + :documentation "T if pipeline created the server and should stop it on shutdown"))) (defun make-audio-pipeline (&key encoder stream-server (mount-path "/stream.mp3") (sample-rate 44100) (channels 2)) @@ -178,6 +188,81 @@ (drain-add-output (pipeline-drain pipeline) encoder mount-path)) pipeline)) +(defun make-encoder-for-format (format &key (bitrate 128) (sample-rate 44100) (channels 2)) + "Create an encoder for the given FORMAT keyword (:mp3 or :aac)." + (ecase format + (:mp3 (cl-streamer:make-mp3-encoder :bitrate bitrate + :sample-rate sample-rate + :channels channels)) + (:aac (cl-streamer:make-aac-encoder :bitrate (* bitrate 1000) + :sample-rate sample-rate + :channels channels)))) + +(defun content-type-for-format (format) + "Return the MIME content type for FORMAT keyword." + (ecase format + (:mp3 "audio/mpeg") + (:aac "audio/aac"))) + +(defun make-pipeline (&key (port 8000) (sample-rate 44100) (channels 2) outputs server) + "Create a complete streaming pipeline from a declarative spec. + PORT: stream server port (ignored if SERVER is provided). + OUTPUTS: list of output specs, each a plist: + (:format :mp3 :mount \"/stream.mp3\" :bitrate 128 :name \"My Stream\") + SERVER: an existing stream-server instance (optional). + If NIL, a new server is created and owned by the pipeline. + + Example: + (make-pipeline :port 8000 + :outputs '((:format :mp3 :mount \"/radio.mp3\" :bitrate 128 + :name \"Radio MP3\") + (:format :aac :mount \"/radio.aac\" :bitrate 128 + :name \"Radio AAC\"))) + + Returns the pipeline (already wired, but not started — call pipeline-start)." + (let* ((owns-server (null server)) + (srv (or server + (let ((s (cl-streamer:make-stream-server :port port))) + (cl-streamer:start-server s) + ;; Set global so write-audio-data/set-now-playing work + (setf cl-streamer:*server* s) + s))) + (pipeline (make-instance 'audio-pipeline + :stream-server srv + :sample-rate sample-rate + :channels channels)) + (encoders nil)) + (setf (pipeline-owns-server-p pipeline) owns-server) + ;; Create drain + (setf (pipeline-drain pipeline) + (make-instance 'streaming-drain :channels channels)) + ;; Process each output spec + (dolist (spec outputs) + (let* ((format (getf spec :format)) + (mount (getf spec :mount)) + (bitrate (or (getf spec :bitrate) 128)) + (name (or (getf spec :name) "CL-Streamer")) + (genre (or (getf spec :genre) "Various")) + (content-type (or (getf spec :content-type) + (content-type-for-format format))) + (encoder (make-encoder-for-format format + :bitrate bitrate + :sample-rate sample-rate + :channels channels))) + ;; Add mount point to server + (cl-streamer:add-mount srv mount + :content-type content-type + :bitrate bitrate + :name name + :genre genre) + ;; Wire encoder to drain + (drain-add-output (pipeline-drain pipeline) encoder mount) + (push (cons encoder mount) encoders))) + (setf (pipeline-encoders pipeline) (nreverse encoders)) + (log:info "Pipeline configured: ~A outputs on port ~A" + (length outputs) (if owns-server port (cl-streamer::server-port srv))) + pipeline)) + (defun add-pipeline-output (pipeline encoder mount-path) "Add an additional encoder/mount output to the pipeline. Can be called before or after start-pipeline." @@ -224,11 +309,23 @@ pipeline) (defmethod pipeline-stop ((pipeline audio-pipeline)) - "Stop the audio pipeline." + "Stop the audio pipeline. Cleans up owned encoders and server." (setf (%pipeline-running pipeline) nil) (when (pipeline-harmony-server pipeline) (mixed:end (pipeline-harmony-server pipeline)) (setf (pipeline-harmony-server pipeline) nil)) + ;; Close owned encoders + (dolist (pair (pipeline-encoders pipeline)) + (handler-case + (cl-streamer:encoder-close (car pair)) + (error (e) (log:debug "Error closing encoder for ~A: ~A" (cdr pair) e)))) + (setf (pipeline-encoders pipeline) nil) + ;; Stop owned server + (when (pipeline-owns-server-p pipeline) + (handler-case + (cl-streamer:stop-server (pipeline-server pipeline)) + (error (e) (log:debug "Error stopping server: ~A" e))) + (setf (pipeline-owns-server-p pipeline) nil)) (log:info "Audio pipeline stopped") pipeline) @@ -337,8 +434,11 @@ (update-all-mounts-metadata pipeline title)) (defun notify-track-change (pipeline track-info) - "Update pipeline state and fire the on-track-change callback." + "Update pipeline state and fire hooks + legacy callback." (setf (%pipeline-current-track pipeline) track-info) + ;; Fire hook system (Phase 2) + (pipeline-fire-hook pipeline :track-change track-info) + ;; Legacy callback (backward compat) (when (pipeline-on-track-change pipeline) (handler-case (funcall (pipeline-on-track-change pipeline) pipeline track-info) @@ -418,16 +518,19 @@ ;; Replace remaining list and update current for loop-queue (setf (car remaining-ref) all-queued) (setf (car current-list-ref) (copy-list all-queued)) - ;; Fire playlist-change callback so app layer updates metadata - (when (pipeline-on-playlist-change pipeline) - (let ((playlist-path (pipeline-pending-playlist-path pipeline))) - (when playlist-path + ;; Fire hooks + legacy callback so app layer updates metadata + (let ((playlist-path (pipeline-pending-playlist-path pipeline))) + (when playlist-path + ;; Fire hook system (Phase 2) + (pipeline-fire-hook pipeline :playlist-change playlist-path) + ;; Legacy callback (backward compat) + (when (pipeline-on-playlist-change pipeline) (handler-case (funcall (pipeline-on-playlist-change pipeline) pipeline playlist-path) (error (e) - (log:warn "Playlist change callback error: ~A" e))) - (setf (pipeline-pending-playlist-path pipeline) nil)))) + (log:warn "Playlist change callback error: ~A" e)))) + (setf (pipeline-pending-playlist-path pipeline) nil))) t)))) (defun next-entry (pipeline remaining-ref current-list-ref) diff --git a/stream-harmony.lisp b/stream-harmony.lisp index 5488212..3253e1f 100644 --- a/stream-harmony.lisp +++ b/stream-harmony.lisp @@ -12,11 +12,19 @@ (defvar *harmony-stream-port* 8000 "Port for the cl-streamer HTTP stream server.") -(defvar *harmony-mp3-encoder* nil - "MP3 encoder instance.") +;; Encoder instances are now owned by the pipeline (Phase 2). +;; Kept as aliases for backward compatibility with any external references. +(defun harmony-mp3-encoder () + "Get the MP3 encoder from the pipeline (if running)." + (when *harmony-pipeline* + (car (find "/asteroid.mp3" (cl-streamer/harmony:pipeline-encoders *harmony-pipeline*) + :key #'cdr :test #'string=)))) -(defvar *harmony-aac-encoder* nil - "AAC encoder instance.") +(defun harmony-aac-encoder () + "Get the AAC encoder from the pipeline (if running)." + (when *harmony-pipeline* + (car (find "/asteroid.aac" (cl-streamer/harmony:pipeline-encoders *harmony-pipeline*) + :key #'cdr :test #'string=)))) (defvar *harmony-state-file* (merge-pathnames ".playback-state.lisp" (asdf:system-source-directory :asteroid)) @@ -197,75 +205,46 @@ ;;; ---- Pipeline Lifecycle ---- (defun start-harmony-streaming (&key (port *harmony-stream-port*) - (mp3-bitrate 128000) - (aac-bitrate 128000)) + (mp3-bitrate 128) + (aac-bitrate 128)) "Start the cl-streamer pipeline with MP3 and AAC outputs. - Should be called once during application startup." + Should be called once during application startup. + MP3-BITRATE and AAC-BITRATE are in kbps (e.g. 128)." (when *harmony-pipeline* (log:warn "Harmony streaming already running") (return-from start-harmony-streaming *harmony-pipeline*)) - ;; Start the stream server - (cl-streamer:start :port port) - - ;; Add mount points - (cl-streamer:add-mount cl-streamer:*server* "/asteroid.mp3" - :content-type "audio/mpeg" - :bitrate 128 - :name "Asteroid Radio MP3") - (cl-streamer:add-mount cl-streamer:*server* "/asteroid.aac" - :content-type "audio/aac" - :bitrate 128 - :name "Asteroid Radio AAC") - - ;; Create encoders - (setf *harmony-mp3-encoder* - (cl-streamer:make-mp3-encoder :bitrate (floor mp3-bitrate 1000) - :sample-rate 44100 - :channels 2)) - (setf *harmony-aac-encoder* - (cl-streamer:make-aac-encoder :bitrate aac-bitrate - :sample-rate 44100 - :channels 2)) - - ;; Create pipeline with track-change callback + ;; Create pipeline from declarative spec — server, mounts, encoders all handled (setf *harmony-pipeline* - (cl-streamer/harmony:make-audio-pipeline - :encoder *harmony-mp3-encoder* - :stream-server cl-streamer:*server* - :mount-path "/asteroid.mp3")) + (cl-streamer/harmony:make-pipeline + :port port + :outputs (list (list :format :mp3 + :mount "/asteroid.mp3" + :bitrate mp3-bitrate + :name "Asteroid Radio MP3") + (list :format :aac + :mount "/asteroid.aac" + :bitrate aac-bitrate + :name "Asteroid Radio AAC")))) - ;; Add AAC output - (cl-streamer/harmony:add-pipeline-output *harmony-pipeline* - *harmony-aac-encoder* - "/asteroid.aac") - - ;; Set the track-change callback - (setf (cl-streamer/harmony:pipeline-on-track-change *harmony-pipeline*) - #'on-harmony-track-change) - - ;; Set the playlist-change callback (fires when scheduler playlist actually starts) - (setf (cl-streamer/harmony:pipeline-on-playlist-change *harmony-pipeline*) - #'on-harmony-playlist-change) + ;; Register hooks + (cl-streamer/harmony:pipeline-add-hook *harmony-pipeline* + :track-change #'on-harmony-track-change) + (cl-streamer/harmony:pipeline-add-hook *harmony-pipeline* + :playlist-change #'on-harmony-playlist-change) ;; Start the audio pipeline - (cl-streamer/harmony:start-pipeline *harmony-pipeline*) + (cl-streamer/harmony:pipeline-start *harmony-pipeline*) (log:info "Harmony streaming started on port ~A (MP3 + AAC)" port) *harmony-pipeline*) (defun stop-harmony-streaming () - "Stop the cl-streamer pipeline and stream server." + "Stop the cl-streamer pipeline and stream server. + Pipeline owns encoders and server — cleanup is automatic." (when *harmony-pipeline* - (cl-streamer/harmony:stop-pipeline *harmony-pipeline*) + (cl-streamer/harmony:pipeline-stop *harmony-pipeline*) (setf *harmony-pipeline* nil)) - (when *harmony-mp3-encoder* - (cl-streamer:close-encoder *harmony-mp3-encoder*) - (setf *harmony-mp3-encoder* nil)) - (when *harmony-aac-encoder* - (cl-streamer:close-aac-encoder *harmony-aac-encoder*) - (setf *harmony-aac-encoder* nil)) - (cl-streamer:stop) (log:info "Harmony streaming stopped")) ;;; ---- Playlist Control ----