Add shuffle stream as second pipeline

stream-harmony.lisp:
- scan-music-library-files: recursive directory scanner for supported formats
- Cached shuffle library pool (2797 tracks, 1hr TTL refresh)
- shuffle-random-batch: picks N random tracks from pool
- refill-shuffle-queue: track-change hook keeps queue topped up
- on-shuffle-track-change: updates recently-played, refills queue
- shuffle-now-playing: now-playing stats for shuffle mounts
- start-shuffle-streaming: creates pipeline sharing curated server,
  registers hooks, seeds queue, starts play-list with crossfade
- stop-shuffle-streaming: stops pipeline without stopping shared server

asteroid.lisp:
- Start shuffle pipeline after curated pipeline on startup
- Stop/restart shuffle in stream restart endpoint
- Fix stale cl-streamer:get-listener-count call (needs server arg)

frontend-partials.lisp:
- Route shuffle mounts to shuffle-now-playing
- Fix stale cl-streamer:get-listener-count (use pipeline-listener-count)

listener-stats.lisp:
- Poll shuffle mounts (/shuffle.mp3, /shuffle.aac) for listener stats

parenscript/stream-player.lisp:
- get-stream-config returns /shuffle.* mounts when channel is shuffle

parenscript/front-page.lisp, parenscript/player.lisp:
- Normalize now-playing polling to 15s (was 5s/10s, caused 429s)

Build verified, runtime tested: both pipelines play simultaneously,
channel selector switches streams correctly.
This commit is contained in:
Glenn Thompson 2026-03-08 13:32:51 +03:00
parent 3e6b496340
commit 5efa49321e
7 changed files with 204 additions and 25 deletions

View File

@ -553,8 +553,10 @@
"Restart the streaming pipeline." "Restart the streaming pipeline."
(require-role :admin) (require-role :admin)
(with-error-handling (with-error-handling
(stop-shuffle-streaming)
(stop-harmony-streaming) (stop-harmony-streaming)
(start-harmony-streaming) (start-harmony-streaming)
(start-shuffle-streaming)
(api-output `(("status" . "success") (api-output `(("status" . "success")
("message" . "Streaming pipeline restarted"))))) ("message" . "Streaming pipeline restarted")))))
@ -1430,7 +1432,9 @@
(with-error-handling (with-error-handling
(let* ((now-playing (get-now-playing-stats "asteroid.mp3")) (let* ((now-playing (get-now-playing-stats "asteroid.mp3"))
(title (if now-playing (cdr (assoc :title now-playing)) "Unknown")) (title (if now-playing (cdr (assoc :title now-playing)) "Unknown"))
(listeners (or (cl-streamer:get-listener-count) 0))) (listeners (if *harmony-pipeline*
(or (cl-streamer:pipeline-listener-count *harmony-pipeline*) 0)
0)))
(api-output (api-output
`(("icestats" . (("source" . (("listenurl" . ,(format nil "~a/asteroid.mp3" *stream-base-url*)) `(("icestats" . (("source" . (("listenurl" . ,(format nil "~a/asteroid.mp3" *stream-base-url*))
("title" . ,title) ("title" . ,title)
@ -1601,7 +1605,15 @@
(length resumed-list) (length resumed-list)
(if playlist-path (file-namestring playlist-path) "stream-queue.m3u")))) (if playlist-path (file-namestring playlist-path) "stream-queue.m3u"))))
(format t "📡 Stream: ~a/asteroid.mp3~%" *stream-base-url*) (format t "📡 Stream: ~a/asteroid.mp3~%" *stream-base-url*)
(format t "📡 Stream: ~a/asteroid.aac~%" *stream-base-url*)) (format t "📡 Stream: ~a/asteroid.aac~%" *stream-base-url*)
;; Start shuffle stream (shares the same HTTP server)
(handler-case
(progn
(start-shuffle-streaming)
(format t "📡 Shuffle: ~a/shuffle.mp3~%" *stream-base-url*)
(format t "📡 Shuffle: ~a/shuffle.aac~%" *stream-base-url*))
(error (e)
(format t "⚠️ Could not start shuffle stream: ~a~%" e))))
(error (e) (error (e)
(format t "⚠️ Could not start streaming: ~a~%" e) (format t "⚠️ Could not start streaming: ~a~%" e)
(format t " (Web server will run without streaming)~%"))) (format t " (Web server will run without streaming)~%")))

View File

@ -55,14 +55,18 @@
"DJ Live")) "DJ Live"))
(owner (or (cdr (assoc "owner" status :test #'string=)) "DJ")) (owner (or (cdr (assoc "owner" status :test #'string=)) "DJ"))
(title (format nil "~A [DJ: ~A]" display-title owner)) (title (format nil "~A [DJ: ~A]" display-title owner))
(listeners (or (cl-streamer:get-listener-count) 0))) (listeners (if *harmony-pipeline*
(or (cl-streamer:pipeline-listener-count *harmony-pipeline*) 0)
0)))
`((:listenurl . ,(format nil "~A/~A" *stream-base-url* mount)) `((:listenurl . ,(format nil "~A/~A" *stream-base-url* mount))
(:title . ,title) (:title . ,title)
(:listeners . ,listeners) (:listeners . ,listeners)
(:track-id . nil) (:track-id . nil)
(:favorite-count . 0))) (:favorite-count . 0)))
;; Normal auto-playlist mode ;; Normal mode — route to curated or shuffle based on mount name
(harmony-now-playing mount))) (if (search "shuffle" mount :test #'char-equal)
(shuffle-now-playing mount)
(harmony-now-playing mount))))
(define-api-with-limit asteroid/partial/now-playing (&optional mount) (:limit 30 :timeout 60) (define-api-with-limit asteroid/partial/now-playing (&optional mount) (:limit 30 :timeout 60)
"Get Partial HTML with live now-playing status. "Get Partial HTML with live now-playing status.

View File

@ -384,12 +384,20 @@
(defun poll-and-store-stats () (defun poll-and-store-stats ()
"Single poll iteration: fetch listener counts from cl-streamer and store." "Single poll iteration: fetch listener counts from cl-streamer and store."
;; Curated stream mounts
(dolist (mount '("/asteroid.mp3" "/asteroid.aac")) (dolist (mount '("/asteroid.mp3" "/asteroid.aac"))
(let ((listeners (when *harmony-pipeline* (let ((listeners (when *harmony-pipeline*
(cl-streamer:pipeline-listener-count *harmony-pipeline* mount)))) (cl-streamer:pipeline-listener-count *harmony-pipeline* mount))))
(when (and listeners (> listeners 0)) (when (and listeners (> listeners 0))
(store-listener-snapshot mount listeners) (store-listener-snapshot mount listeners)
(log:debug "Stored snapshot: ~a = ~a listeners" mount listeners)))) (log:debug "Stored snapshot: ~a = ~a listeners" mount listeners))))
;; Shuffle stream mounts
(dolist (mount '("/shuffle.mp3" "/shuffle.aac"))
(let ((listeners (when *shuffle-pipeline*
(cl-streamer:pipeline-listener-count *shuffle-pipeline* mount))))
(when (and listeners (> listeners 0))
(store-listener-snapshot mount listeners)
(log:debug "Stored snapshot: ~a = ~a listeners" mount listeners))))
;; Collect geo stats from web listeners (uses real IPs from X-Forwarded-For) ;; Collect geo stats from web listeners (uses real IPs from X-Forwarded-For)
(collect-geo-stats-from-web-listeners)) (collect-geo-stats-from-web-listeners))

View File

@ -746,8 +746,8 @@
(catch (lambda (error) (catch (lambda (error)
(ps:chain console (error "Error adding favorite:" error))))))))))) (ps:chain console (error "Error adding favorite:" error)))))))))))
;; Update now playing every 5 seconds ;; Update now playing every 15 seconds
(set-interval update-now-playing 5000) (set-interval update-now-playing 15000)
;; Poll server for channel name changes (works across all listeners) ;; Poll server for channel name changes (works across all listeners)
(let ((last-channel-name nil)) (let ((last-channel-name nil))

View File

@ -810,8 +810,8 @@
;; Initial update after 1 second ;; Initial update after 1 second
(set-timeout update-now-playing 1000) (set-timeout update-now-playing 1000)
;; Update live stream info every 10 seconds ;; Update live stream info every 15 seconds
(set-interval update-now-playing 10000) (set-interval update-now-playing 15000)
;; Make functions globally accessible for onclick handlers ;; Make functions globally accessible for onclick handlers
(defvar window (ps:@ window)) (defvar window (ps:@ window))

View File

@ -149,23 +149,23 @@
;; ======================================== ;; ========================================
;; Get stream configuration for a given channel and quality ;; Get stream configuration for a given channel and quality
;; With cl-streamer, both channels use the same stream mounts - ;; Curated channel uses /asteroid.* mounts, shuffle uses /shuffle.* mounts
;; channel switching loads a different playlist server-side
(defun get-stream-config (stream-base-url channel quality) (defun get-stream-config (stream-base-url channel quality)
(let ((config (ps:create (let ((prefix (if (= channel "shuffle") "/shuffle" "/asteroid")))
:aac (ps:create :url (+ stream-base-url "/asteroid.aac") (let ((config (ps:create
:type "audio/aac" :aac (ps:create :url (+ stream-base-url prefix ".aac")
:format "AAC 96kbps Stereo" :type "audio/aac"
:mount "asteroid.aac") :format "AAC 96kbps Stereo"
:mp3 (ps:create :url (+ stream-base-url "/asteroid.mp3") :mount (+ (ps:chain prefix (substring 1)) ".aac"))
:type "audio/mpeg" :mp3 (ps:create :url (+ stream-base-url prefix ".mp3")
:format "MP3 128kbps Stereo" :type "audio/mpeg"
:mount "asteroid.mp3") :format "MP3 128kbps Stereo"
:low (ps:create :url (+ stream-base-url "/asteroid.mp3") :mount (+ (ps:chain prefix (substring 1)) ".mp3"))
:type "audio/mpeg" :low (ps:create :url (+ stream-base-url prefix ".mp3")
:format "MP3 128kbps Stereo" :type "audio/mpeg"
:mount "asteroid.mp3")))) :format "MP3 128kbps Stereo"
(ps:getprop config quality))) :mount (+ (ps:chain prefix (substring 1)) ".mp3")))))
(ps:getprop config quality))))
;; Get current channel from selector or localStorage ;; Get current channel from selector or localStorage
(defun get-current-channel () (defun get-current-channel ()

View File

@ -12,6 +12,9 @@
(defvar *harmony-stream-port* 8000 (defvar *harmony-stream-port* 8000
"Port for the cl-streamer HTTP stream server.") "Port for the cl-streamer HTTP stream server.")
(defvar *shuffle-pipeline* nil
"The shuffle stream pipeline — plays random tracks from the music library.")
;; Encoder instances are now owned by the pipeline (Phase 2). ;; Encoder instances are now owned by the pipeline (Phase 2).
;; Kept as aliases for backward compatibility with any external references. ;; Kept as aliases for backward compatibility with any external references.
(defun harmony-mp3-encoder () (defun harmony-mp3-encoder ()
@ -294,3 +297,155 @@
:queue-length (length (cl-streamer/harmony:pipeline-get-queue :queue-length (length (cl-streamer/harmony:pipeline-get-queue
*harmony-pipeline*)))) *harmony-pipeline*))))
(list :running nil))) (list :running nil)))
;;; ============================================================
;;; Shuffle Stream — random tracks from the music library
;;; ============================================================
(defvar *shuffle-batch-size* 20
"Number of tracks to queue at a time on the shuffle pipeline.")
(defun scan-music-library-files (&optional (directory *music-library-path*))
"Recursively scan DIRECTORY for supported audio files.
Returns a list of namestrings."
(let ((files nil)
(extensions *supported-formats*))
(labels ((scan (dir)
(handler-case
(dolist (entry (uiop:directory-files dir))
(let ((ext (pathname-type entry)))
(when (and ext (member ext extensions :test #'string-equal))
(push (namestring entry) files))))
(error (e)
(log:debug "Error scanning ~A: ~A" dir e)))
(handler-case
(dolist (sub (uiop:subdirectories dir))
(scan sub))
(error (e)
(log:debug "Error listing subdirs of ~A: ~A" dir e)))))
(scan (pathname directory)))
(nreverse files)))
(defvar *shuffle-library-cache* nil
"Cached list of audio files from the music library for shuffle.")
(defvar *shuffle-library-cache-time* 0
"Universal time when *shuffle-library-cache* was last refreshed.")
(defvar *shuffle-cache-ttl* 3600
"Seconds before the shuffle library cache expires (default 1 hour).")
(defun get-shuffle-library ()
"Return the cached list of music library files, refreshing if stale."
(when (or (null *shuffle-library-cache*)
(> (- (get-universal-time) *shuffle-library-cache-time*)
*shuffle-cache-ttl*))
(log:info "Scanning music library for shuffle pool...")
(let ((files (scan-music-library-files)))
(setf *shuffle-library-cache* files
*shuffle-library-cache-time* (get-universal-time))
(log:info "Shuffle pool: ~A tracks" (length files))))
*shuffle-library-cache*)
(defun shuffle-random-batch (&optional (n *shuffle-batch-size*))
"Pick N random tracks from the music library (with replacement for small libs)."
(let ((library (get-shuffle-library)))
(when library
(let ((len (length library)))
(loop repeat (min n len)
collect (list :file (nth (random len) library)))))))
(defun refill-shuffle-queue ()
"Queue another batch of random tracks on the shuffle pipeline.
Called by the track-change hook when the queue is running low."
(when *shuffle-pipeline*
(let ((queue-len (length (cl-streamer/harmony:pipeline-get-queue *shuffle-pipeline*))))
(when (< queue-len (floor *shuffle-batch-size* 2))
(let ((batch (shuffle-random-batch)))
(when batch
(cl-streamer/harmony:pipeline-queue-files *shuffle-pipeline* batch)
(log:debug "Shuffle: queued ~A tracks (~A in queue)"
(length batch) (+ queue-len (length batch)))))))))
(defun on-shuffle-track-change (pipeline track-info)
"Called by cl-streamer when the shuffle stream changes tracks.
Updates the shuffle recently-played list and refills the queue."
(declare (ignore pipeline))
(let* ((display-title (getf track-info :display-title))
(artist (getf track-info :artist))
(title (getf track-info :title))
(file-path (getf track-info :file))
(track-id (or (find-track-by-title display-title)
(find-track-by-file-path file-path))))
(when (and display-title (not (string= display-title "Unknown")))
(add-recently-played (list :title display-title
:artist artist
:song title
:timestamp (get-universal-time)
:track-id track-id)
:shuffle)
(setf *last-known-track-shuffle* display-title))
(log:info "Shuffle track change: ~A" display-title))
(refill-shuffle-queue))
(defun shuffle-now-playing (&optional (mount "shuffle.mp3"))
"Get now-playing information from the shuffle pipeline."
(when (and *shuffle-pipeline*
(cl-streamer/harmony:pipeline-current-track *shuffle-pipeline*))
(let* ((track-info (cl-streamer/harmony:pipeline-current-track *shuffle-pipeline*))
(display-title (or (getf track-info :display-title) "Unknown"))
(listeners (cl-streamer:pipeline-listener-count *shuffle-pipeline*)))
`((:listenurl . ,(format nil "~A/~A" *stream-base-url* mount))
(:title . ,display-title)
(:listeners . ,(or listeners 0))
(:track-id . nil)
(:favorite-count . 0)))))
;;; ---- Shuffle Pipeline Lifecycle ----
(defun start-shuffle-streaming (&key (mp3-bitrate 128) (aac-bitrate 128))
"Start the shuffle pipeline, sharing the curated pipeline's stream server.
Must be called after start-harmony-streaming."
(when *shuffle-pipeline*
(log:warn "Shuffle streaming already running")
(return-from start-shuffle-streaming *shuffle-pipeline*))
(unless *harmony-pipeline*
(error "Cannot start shuffle pipeline: curated pipeline not running"))
(let ((shared-server (cl-streamer/harmony:pipeline-server *harmony-pipeline*)))
(setf *shuffle-pipeline*
(cl-streamer/harmony:make-pipeline
:server shared-server
:outputs (list (list :format :mp3
:mount "/shuffle.mp3"
:bitrate mp3-bitrate
:name "Asteroid Radio Shuffle MP3")
(list :format :aac
:mount "/shuffle.aac"
:bitrate aac-bitrate
:name "Asteroid Radio Shuffle AAC"))))
;; Register hooks
(cl-streamer/harmony:pipeline-add-hook *shuffle-pipeline*
:track-change #'on-shuffle-track-change)
;; Seed the queue before starting
(let ((batch (shuffle-random-batch)))
(when batch
(cl-streamer/harmony:pipeline-queue-files *shuffle-pipeline* batch)))
;; Start the pipeline and begin playback
(cl-streamer/harmony:pipeline-start *shuffle-pipeline*)
;; Start the play-list loop (plays queued tracks, refill hook keeps it going)
(let ((initial-files (mapcar (lambda (entry) (getf entry :file))
(cl-streamer/harmony:pipeline-get-queue *shuffle-pipeline*))))
(when initial-files
(cl-streamer/harmony:play-list *shuffle-pipeline* initial-files
:crossfade-duration 3.0
:loop-queue t)))
(log:info "Shuffle streaming started (MP3 + AAC, ~A tracks in pool)"
(length (get-shuffle-library)))
*shuffle-pipeline*))
(defun stop-shuffle-streaming ()
"Stop the shuffle pipeline. Does not stop the shared server."
(when *shuffle-pipeline*
(cl-streamer/harmony:pipeline-stop *shuffle-pipeline*)
(setf *shuffle-pipeline* nil))
(log:info "Shuffle streaming stopped"))