Add shuffle stream as second pipeline
stream-harmony.lisp: - scan-music-library-files: recursive directory scanner for supported formats - Cached shuffle library pool (2797 tracks, 1hr TTL refresh) - shuffle-random-batch: picks N random tracks from pool - refill-shuffle-queue: track-change hook keeps queue topped up - on-shuffle-track-change: updates recently-played, refills queue - shuffle-now-playing: now-playing stats for shuffle mounts - start-shuffle-streaming: creates pipeline sharing curated server, registers hooks, seeds queue, starts play-list with crossfade - stop-shuffle-streaming: stops pipeline without stopping shared server asteroid.lisp: - Start shuffle pipeline after curated pipeline on startup - Stop/restart shuffle in stream restart endpoint - Fix stale cl-streamer:get-listener-count call (needs server arg) frontend-partials.lisp: - Route shuffle mounts to shuffle-now-playing - Fix stale cl-streamer:get-listener-count (use pipeline-listener-count) listener-stats.lisp: - Poll shuffle mounts (/shuffle.mp3, /shuffle.aac) for listener stats parenscript/stream-player.lisp: - get-stream-config returns /shuffle.* mounts when channel is shuffle parenscript/front-page.lisp, parenscript/player.lisp: - Normalize now-playing polling to 15s (was 5s/10s, caused 429s) Build verified, runtime tested: both pipelines play simultaneously, channel selector switches streams correctly.
This commit is contained in:
parent
3e6b496340
commit
5efa49321e
|
|
@ -553,8 +553,10 @@
|
|||
"Restart the streaming pipeline."
|
||||
(require-role :admin)
|
||||
(with-error-handling
|
||||
(stop-shuffle-streaming)
|
||||
(stop-harmony-streaming)
|
||||
(start-harmony-streaming)
|
||||
(start-shuffle-streaming)
|
||||
(api-output `(("status" . "success")
|
||||
("message" . "Streaming pipeline restarted")))))
|
||||
|
||||
|
|
@ -1430,7 +1432,9 @@
|
|||
(with-error-handling
|
||||
(let* ((now-playing (get-now-playing-stats "asteroid.mp3"))
|
||||
(title (if now-playing (cdr (assoc :title now-playing)) "Unknown"))
|
||||
(listeners (or (cl-streamer:get-listener-count) 0)))
|
||||
(listeners (if *harmony-pipeline*
|
||||
(or (cl-streamer:pipeline-listener-count *harmony-pipeline*) 0)
|
||||
0)))
|
||||
(api-output
|
||||
`(("icestats" . (("source" . (("listenurl" . ,(format nil "~a/asteroid.mp3" *stream-base-url*))
|
||||
("title" . ,title)
|
||||
|
|
@ -1601,7 +1605,15 @@
|
|||
(length resumed-list)
|
||||
(if playlist-path (file-namestring playlist-path) "stream-queue.m3u"))))
|
||||
(format t "📡 Stream: ~a/asteroid.mp3~%" *stream-base-url*)
|
||||
(format t "📡 Stream: ~a/asteroid.aac~%" *stream-base-url*))
|
||||
(format t "📡 Stream: ~a/asteroid.aac~%" *stream-base-url*)
|
||||
;; Start shuffle stream (shares the same HTTP server)
|
||||
(handler-case
|
||||
(progn
|
||||
(start-shuffle-streaming)
|
||||
(format t "📡 Shuffle: ~a/shuffle.mp3~%" *stream-base-url*)
|
||||
(format t "📡 Shuffle: ~a/shuffle.aac~%" *stream-base-url*))
|
||||
(error (e)
|
||||
(format t "⚠️ Could not start shuffle stream: ~a~%" e))))
|
||||
(error (e)
|
||||
(format t "⚠️ Could not start streaming: ~a~%" e)
|
||||
(format t " (Web server will run without streaming)~%")))
|
||||
|
|
|
|||
|
|
@ -55,14 +55,18 @@
|
|||
"DJ Live"))
|
||||
(owner (or (cdr (assoc "owner" status :test #'string=)) "DJ"))
|
||||
(title (format nil "~A [DJ: ~A]" display-title owner))
|
||||
(listeners (or (cl-streamer:get-listener-count) 0)))
|
||||
(listeners (if *harmony-pipeline*
|
||||
(or (cl-streamer:pipeline-listener-count *harmony-pipeline*) 0)
|
||||
0)))
|
||||
`((:listenurl . ,(format nil "~A/~A" *stream-base-url* mount))
|
||||
(:title . ,title)
|
||||
(:listeners . ,listeners)
|
||||
(:track-id . nil)
|
||||
(:favorite-count . 0)))
|
||||
;; Normal auto-playlist mode
|
||||
(harmony-now-playing mount)))
|
||||
;; Normal mode — route to curated or shuffle based on mount name
|
||||
(if (search "shuffle" mount :test #'char-equal)
|
||||
(shuffle-now-playing mount)
|
||||
(harmony-now-playing mount))))
|
||||
|
||||
(define-api-with-limit asteroid/partial/now-playing (&optional mount) (:limit 30 :timeout 60)
|
||||
"Get Partial HTML with live now-playing status.
|
||||
|
|
|
|||
|
|
@ -384,12 +384,20 @@
|
|||
|
||||
(defun poll-and-store-stats ()
|
||||
"Single poll iteration: fetch listener counts from cl-streamer and store."
|
||||
;; Curated stream mounts
|
||||
(dolist (mount '("/asteroid.mp3" "/asteroid.aac"))
|
||||
(let ((listeners (when *harmony-pipeline*
|
||||
(cl-streamer:pipeline-listener-count *harmony-pipeline* mount))))
|
||||
(when (and listeners (> listeners 0))
|
||||
(store-listener-snapshot mount listeners)
|
||||
(log:debug "Stored snapshot: ~a = ~a listeners" mount listeners))))
|
||||
;; Shuffle stream mounts
|
||||
(dolist (mount '("/shuffle.mp3" "/shuffle.aac"))
|
||||
(let ((listeners (when *shuffle-pipeline*
|
||||
(cl-streamer:pipeline-listener-count *shuffle-pipeline* mount))))
|
||||
(when (and listeners (> listeners 0))
|
||||
(store-listener-snapshot mount listeners)
|
||||
(log:debug "Stored snapshot: ~a = ~a listeners" mount listeners))))
|
||||
;; Collect geo stats from web listeners (uses real IPs from X-Forwarded-For)
|
||||
(collect-geo-stats-from-web-listeners))
|
||||
|
||||
|
|
|
|||
|
|
@ -746,8 +746,8 @@
|
|||
(catch (lambda (error)
|
||||
(ps:chain console (error "Error adding favorite:" error)))))))))))
|
||||
|
||||
;; Update now playing every 5 seconds
|
||||
(set-interval update-now-playing 5000)
|
||||
;; Update now playing every 15 seconds
|
||||
(set-interval update-now-playing 15000)
|
||||
|
||||
;; Poll server for channel name changes (works across all listeners)
|
||||
(let ((last-channel-name nil))
|
||||
|
|
|
|||
|
|
@ -810,8 +810,8 @@
|
|||
|
||||
;; Initial update after 1 second
|
||||
(set-timeout update-now-playing 1000)
|
||||
;; Update live stream info every 10 seconds
|
||||
(set-interval update-now-playing 10000)
|
||||
;; Update live stream info every 15 seconds
|
||||
(set-interval update-now-playing 15000)
|
||||
|
||||
;; Make functions globally accessible for onclick handlers
|
||||
(defvar window (ps:@ window))
|
||||
|
|
|
|||
|
|
@ -149,23 +149,23 @@
|
|||
;; ========================================
|
||||
|
||||
;; Get stream configuration for a given channel and quality
|
||||
;; With cl-streamer, both channels use the same stream mounts -
|
||||
;; channel switching loads a different playlist server-side
|
||||
;; Curated channel uses /asteroid.* mounts, shuffle uses /shuffle.* mounts
|
||||
(defun get-stream-config (stream-base-url channel quality)
|
||||
(let ((prefix (if (= channel "shuffle") "/shuffle" "/asteroid")))
|
||||
(let ((config (ps:create
|
||||
:aac (ps:create :url (+ stream-base-url "/asteroid.aac")
|
||||
:aac (ps:create :url (+ stream-base-url prefix ".aac")
|
||||
:type "audio/aac"
|
||||
:format "AAC 96kbps Stereo"
|
||||
:mount "asteroid.aac")
|
||||
:mp3 (ps:create :url (+ stream-base-url "/asteroid.mp3")
|
||||
:mount (+ (ps:chain prefix (substring 1)) ".aac"))
|
||||
:mp3 (ps:create :url (+ stream-base-url prefix ".mp3")
|
||||
:type "audio/mpeg"
|
||||
:format "MP3 128kbps Stereo"
|
||||
:mount "asteroid.mp3")
|
||||
:low (ps:create :url (+ stream-base-url "/asteroid.mp3")
|
||||
:mount (+ (ps:chain prefix (substring 1)) ".mp3"))
|
||||
:low (ps:create :url (+ stream-base-url prefix ".mp3")
|
||||
:type "audio/mpeg"
|
||||
:format "MP3 128kbps Stereo"
|
||||
:mount "asteroid.mp3"))))
|
||||
(ps:getprop config quality)))
|
||||
:mount (+ (ps:chain prefix (substring 1)) ".mp3")))))
|
||||
(ps:getprop config quality))))
|
||||
|
||||
;; Get current channel from selector or localStorage
|
||||
(defun get-current-channel ()
|
||||
|
|
|
|||
|
|
@ -12,6 +12,9 @@
|
|||
(defvar *harmony-stream-port* 8000
|
||||
"Port for the cl-streamer HTTP stream server.")
|
||||
|
||||
(defvar *shuffle-pipeline* nil
|
||||
"The shuffle stream pipeline — plays random tracks from the music library.")
|
||||
|
||||
;; Encoder instances are now owned by the pipeline (Phase 2).
|
||||
;; Kept as aliases for backward compatibility with any external references.
|
||||
(defun harmony-mp3-encoder ()
|
||||
|
|
@ -294,3 +297,155 @@
|
|||
:queue-length (length (cl-streamer/harmony:pipeline-get-queue
|
||||
*harmony-pipeline*))))
|
||||
(list :running nil)))
|
||||
|
||||
;;; ============================================================
|
||||
;;; Shuffle Stream — random tracks from the music library
|
||||
;;; ============================================================
|
||||
|
||||
(defvar *shuffle-batch-size* 20
|
||||
"Number of tracks to queue at a time on the shuffle pipeline.")
|
||||
|
||||
(defun scan-music-library-files (&optional (directory *music-library-path*))
|
||||
"Recursively scan DIRECTORY for supported audio files.
|
||||
Returns a list of namestrings."
|
||||
(let ((files nil)
|
||||
(extensions *supported-formats*))
|
||||
(labels ((scan (dir)
|
||||
(handler-case
|
||||
(dolist (entry (uiop:directory-files dir))
|
||||
(let ((ext (pathname-type entry)))
|
||||
(when (and ext (member ext extensions :test #'string-equal))
|
||||
(push (namestring entry) files))))
|
||||
(error (e)
|
||||
(log:debug "Error scanning ~A: ~A" dir e)))
|
||||
(handler-case
|
||||
(dolist (sub (uiop:subdirectories dir))
|
||||
(scan sub))
|
||||
(error (e)
|
||||
(log:debug "Error listing subdirs of ~A: ~A" dir e)))))
|
||||
(scan (pathname directory)))
|
||||
(nreverse files)))
|
||||
|
||||
(defvar *shuffle-library-cache* nil
|
||||
"Cached list of audio files from the music library for shuffle.")
|
||||
|
||||
(defvar *shuffle-library-cache-time* 0
|
||||
"Universal time when *shuffle-library-cache* was last refreshed.")
|
||||
|
||||
(defvar *shuffle-cache-ttl* 3600
|
||||
"Seconds before the shuffle library cache expires (default 1 hour).")
|
||||
|
||||
(defun get-shuffle-library ()
|
||||
"Return the cached list of music library files, refreshing if stale."
|
||||
(when (or (null *shuffle-library-cache*)
|
||||
(> (- (get-universal-time) *shuffle-library-cache-time*)
|
||||
*shuffle-cache-ttl*))
|
||||
(log:info "Scanning music library for shuffle pool...")
|
||||
(let ((files (scan-music-library-files)))
|
||||
(setf *shuffle-library-cache* files
|
||||
*shuffle-library-cache-time* (get-universal-time))
|
||||
(log:info "Shuffle pool: ~A tracks" (length files))))
|
||||
*shuffle-library-cache*)
|
||||
|
||||
(defun shuffle-random-batch (&optional (n *shuffle-batch-size*))
|
||||
"Pick N random tracks from the music library (with replacement for small libs)."
|
||||
(let ((library (get-shuffle-library)))
|
||||
(when library
|
||||
(let ((len (length library)))
|
||||
(loop repeat (min n len)
|
||||
collect (list :file (nth (random len) library)))))))
|
||||
|
||||
(defun refill-shuffle-queue ()
|
||||
"Queue another batch of random tracks on the shuffle pipeline.
|
||||
Called by the track-change hook when the queue is running low."
|
||||
(when *shuffle-pipeline*
|
||||
(let ((queue-len (length (cl-streamer/harmony:pipeline-get-queue *shuffle-pipeline*))))
|
||||
(when (< queue-len (floor *shuffle-batch-size* 2))
|
||||
(let ((batch (shuffle-random-batch)))
|
||||
(when batch
|
||||
(cl-streamer/harmony:pipeline-queue-files *shuffle-pipeline* batch)
|
||||
(log:debug "Shuffle: queued ~A tracks (~A in queue)"
|
||||
(length batch) (+ queue-len (length batch)))))))))
|
||||
|
||||
(defun on-shuffle-track-change (pipeline track-info)
|
||||
"Called by cl-streamer when the shuffle stream changes tracks.
|
||||
Updates the shuffle recently-played list and refills the queue."
|
||||
(declare (ignore pipeline))
|
||||
(let* ((display-title (getf track-info :display-title))
|
||||
(artist (getf track-info :artist))
|
||||
(title (getf track-info :title))
|
||||
(file-path (getf track-info :file))
|
||||
(track-id (or (find-track-by-title display-title)
|
||||
(find-track-by-file-path file-path))))
|
||||
(when (and display-title (not (string= display-title "Unknown")))
|
||||
(add-recently-played (list :title display-title
|
||||
:artist artist
|
||||
:song title
|
||||
:timestamp (get-universal-time)
|
||||
:track-id track-id)
|
||||
:shuffle)
|
||||
(setf *last-known-track-shuffle* display-title))
|
||||
(log:info "Shuffle track change: ~A" display-title))
|
||||
(refill-shuffle-queue))
|
||||
|
||||
(defun shuffle-now-playing (&optional (mount "shuffle.mp3"))
|
||||
"Get now-playing information from the shuffle pipeline."
|
||||
(when (and *shuffle-pipeline*
|
||||
(cl-streamer/harmony:pipeline-current-track *shuffle-pipeline*))
|
||||
(let* ((track-info (cl-streamer/harmony:pipeline-current-track *shuffle-pipeline*))
|
||||
(display-title (or (getf track-info :display-title) "Unknown"))
|
||||
(listeners (cl-streamer:pipeline-listener-count *shuffle-pipeline*)))
|
||||
`((:listenurl . ,(format nil "~A/~A" *stream-base-url* mount))
|
||||
(:title . ,display-title)
|
||||
(:listeners . ,(or listeners 0))
|
||||
(:track-id . nil)
|
||||
(:favorite-count . 0)))))
|
||||
|
||||
;;; ---- Shuffle Pipeline Lifecycle ----
|
||||
|
||||
(defun start-shuffle-streaming (&key (mp3-bitrate 128) (aac-bitrate 128))
|
||||
"Start the shuffle pipeline, sharing the curated pipeline's stream server.
|
||||
Must be called after start-harmony-streaming."
|
||||
(when *shuffle-pipeline*
|
||||
(log:warn "Shuffle streaming already running")
|
||||
(return-from start-shuffle-streaming *shuffle-pipeline*))
|
||||
(unless *harmony-pipeline*
|
||||
(error "Cannot start shuffle pipeline: curated pipeline not running"))
|
||||
(let ((shared-server (cl-streamer/harmony:pipeline-server *harmony-pipeline*)))
|
||||
(setf *shuffle-pipeline*
|
||||
(cl-streamer/harmony:make-pipeline
|
||||
:server shared-server
|
||||
:outputs (list (list :format :mp3
|
||||
:mount "/shuffle.mp3"
|
||||
:bitrate mp3-bitrate
|
||||
:name "Asteroid Radio Shuffle MP3")
|
||||
(list :format :aac
|
||||
:mount "/shuffle.aac"
|
||||
:bitrate aac-bitrate
|
||||
:name "Asteroid Radio Shuffle AAC"))))
|
||||
;; Register hooks
|
||||
(cl-streamer/harmony:pipeline-add-hook *shuffle-pipeline*
|
||||
:track-change #'on-shuffle-track-change)
|
||||
;; Seed the queue before starting
|
||||
(let ((batch (shuffle-random-batch)))
|
||||
(when batch
|
||||
(cl-streamer/harmony:pipeline-queue-files *shuffle-pipeline* batch)))
|
||||
;; Start the pipeline and begin playback
|
||||
(cl-streamer/harmony:pipeline-start *shuffle-pipeline*)
|
||||
;; Start the play-list loop (plays queued tracks, refill hook keeps it going)
|
||||
(let ((initial-files (mapcar (lambda (entry) (getf entry :file))
|
||||
(cl-streamer/harmony:pipeline-get-queue *shuffle-pipeline*))))
|
||||
(when initial-files
|
||||
(cl-streamer/harmony:play-list *shuffle-pipeline* initial-files
|
||||
:crossfade-duration 3.0
|
||||
:loop-queue t)))
|
||||
(log:info "Shuffle streaming started (MP3 + AAC, ~A tracks in pool)"
|
||||
(length (get-shuffle-library)))
|
||||
*shuffle-pipeline*))
|
||||
|
||||
(defun stop-shuffle-streaming ()
|
||||
"Stop the shuffle pipeline. Does not stop the shared server."
|
||||
(when *shuffle-pipeline*
|
||||
(cl-streamer/harmony:pipeline-stop *shuffle-pipeline*)
|
||||
(setf *shuffle-pipeline* nil))
|
||||
(log:info "Shuffle streaming stopped"))
|
||||
|
|
|
|||
Loading…
Reference in New Issue