Client-side stream sync with in-flight guards, fix connection exhaustion

Stream sync improvements:
- Server now sends changed_at timestamp + raw remaining seconds
- Client schedules UI updates based on changed_at + measured buffer lag
- Removed server-side delay logic entirely
- Poll interval set to 10s (was 15s, briefly 5s which caused issues)

Connection exhaustion fix:
- Added in-flight guards to update-mini-now-playing and poll-now-playing
- Prevents fetch pileup when server is slow or stalled
- Each poller skips if previous request hasn't completed

Other:
- Include changed_at in now-playing JSON API response
- Replace em dashes with hyphens throughout
- Update cl-streamer submodule (get-metadata-changed-at export)
This commit is contained in:
Glenn Thompson 2026-04-13 09:30:40 +01:00
parent 20ed7ecb02
commit da28c70254
6 changed files with 160 additions and 110 deletions

@ -1 +1 @@
Subproject commit cc4215d1c663c5aed4e9758c755a944016fa6aaa Subproject commit b38f4d1f8cb0df919761281162f4debaad123e72

View File

@ -120,14 +120,16 @@
(artist (getf parsed :artist)) (artist (getf parsed :artist))
(song (getf parsed :song)) (song (getf parsed :song))
(search-url (generate-music-search-url artist song))) (search-url (generate-music-search-url artist song)))
(let ((remaining (cdr (assoc :remaining now-playing-stats)))) (let ((remaining (cdr (assoc :remaining now-playing-stats)))
(changed-at (cdr (assoc :changed-at now-playing-stats))))
(api-output `(("status" . "success") (api-output `(("status" . "success")
("title" . ,title) ("title" . ,title)
("listeners" . ,(cdr (assoc :listeners now-playing-stats))) ("listeners" . ,(cdr (assoc :listeners now-playing-stats)))
("track_id" . ,(cdr (assoc :track-id now-playing-stats))) ("track_id" . ,(cdr (assoc :track-id now-playing-stats)))
("favorite_count" . ,favorite-count) ("favorite_count" . ,favorite-count)
("search_url" . ,search-url) ("search_url" . ,search-url)
,@(when remaining `(("remaining" . ,remaining))))))) ,@(when remaining `(("remaining" . ,remaining)))
,@(when changed-at `(("changed_at" . ,changed-at)))))))
(api-output `(("status" . "offline") (api-output `(("status" . "offline")
("title" . "Stream Offline") ("title" . "Stream Offline")
("track_id" . nil))))))) ("track_id" . nil)))))))

View File

@ -870,6 +870,7 @@
;; Main page countdown timer ;; Main page countdown timer
(defvar *main-remaining* nil) (defvar *main-remaining* nil)
(defvar *poll-now-playing-in-flight* false)
(defun format-countdown (seconds) (defun format-countdown (seconds)
(let ((m (ps:chain -math (floor (/ seconds 60)))) (let ((m (ps:chain -math (floor (/ seconds 60))))
@ -877,6 +878,8 @@
(+ (if (< m 10) (+ "0" m) m) ":" (if (< s 10) (+ "0" s) s)))) (+ (if (< m 10) (+ "0" m) m) ":" (if (< s 10) (+ "0" s) s))))
(defun poll-now-playing () (defun poll-now-playing ()
(unless *poll-now-playing-in-flight*
(setf *poll-now-playing-in-flight* true)
(let ((mount (or (ps:chain local-storage (get-item "stream-mount")) "asteroid.mp3"))) (let ((mount (or (ps:chain local-storage (get-item "stream-mount")) "asteroid.mp3")))
(ps:chain (ps:chain
(fetch (+ "/api/asteroid/partial/now-playing-json?mount=" mount)) (fetch (+ "/api/asteroid/partial/now-playing-json?mount=" mount))
@ -897,11 +900,13 @@
(setf (ps:@ listener-el text-content) listeners)) (setf (ps:@ listener-el text-content) listeners))
(when remaining (when remaining
(setf *main-remaining* remaining)))))) (setf *main-remaining* remaining))))))
(catch (lambda (error) nil))))) (catch (lambda (error) nil))
(then (lambda () (setf *poll-now-playing-in-flight* false)))
(catch (lambda () (setf *poll-now-playing-in-flight* false)))))))
;; Start polling and countdown ticker on the main page ;; Start polling and countdown ticker on the main page
(set-timeout poll-now-playing 2000) (set-timeout poll-now-playing 2000)
(set-interval poll-now-playing 15000) (set-interval poll-now-playing 10000)
(set-interval (set-interval
(lambda () (lambda ()
(let ((el (ps:chain document (get-element-by-id "track-countdown-main")))) (let ((el (ps:chain document (get-element-by-id "track-countdown-main"))))

View File

@ -340,6 +340,12 @@
(defvar *track-remaining-seconds* nil) (defvar *track-remaining-seconds* nil)
(defvar *countdown-interval* nil) (defvar *countdown-interval* nil)
;; Client-side sync: schedule title/notification updates based on server timestamp
(defvar *pending-title-timer* nil)
(defvar *pending-title* nil)
(defvar *measured-buffer-lag-ms* 2300)
(defvar *mini-now-playing-in-flight* false)
(defun format-countdown (seconds) (defun format-countdown (seconds)
(let ((m (ps:chain -math (floor (/ seconds 60)))) (let ((m (ps:chain -math (floor (/ seconds 60))))
(s (ps:chain -math (floor (mod seconds 60))))) (s (ps:chain -math (floor (mod seconds 60)))))
@ -528,26 +534,14 @@
(ps:@ response ok))) (ps:@ response ok)))
(catch (lambda (error) nil))))) (catch (lambda (error) nil)))))
;; Update mini now playing display (for persistent player frame) ;; Apply a title update to the UI immediately
(defun update-mini-now-playing () (defun apply-title-update (title data)
(let ((mount (get-current-mount)))
(ps:chain
(fetch (+ "/api/asteroid/partial/now-playing-json?mount=" mount))
(then (lambda (response)
(if (ps:@ response ok)
(ps:chain response (json))
nil)))
(then (lambda (data)
(when data
(let ((el (ps:chain document (get-element-by-id "mini-now-playing"))) (let ((el (ps:chain document (get-element-by-id "mini-now-playing")))
(track-id-el (ps:chain document (get-element-by-id "current-track-id-mini"))) (track-id-el (ps:chain document (get-element-by-id "current-track-id-mini"))))
(title (or (ps:@ data data title) (ps:@ data title) "Loading...")))
(when el (when el
;; Check if track changed and record to history + notify (ps:chain console (log "[STREAM-SYNC] Applying title:" title))
(when (not (= (ps:@ el text-content) title))
(ps:chain console (log "[STREAM-SYNC] Title changed:" title))
(record-track-listen title) (record-track-listen title)
(notify-track-change title)) (notify-track-change title)
(setf (ps:@ el text-content) title) (setf (ps:@ el text-content) title)
(check-favorite-status-mini)) (check-favorite-status-mini))
(update-media-session title) (update-media-session title)
@ -561,7 +555,7 @@
((= fav-count 0) (setf (ps:@ count-el text-content) "")) ((= fav-count 0) (setf (ps:@ count-el text-content) ""))
((= fav-count 1) (setf (ps:@ count-el text-content) "1 ❤️")) ((= fav-count 1) (setf (ps:@ count-el text-content) "1 ❤️"))
(t (setf (ps:@ count-el text-content) (+ fav-count " ❤️")))))) (t (setf (ps:@ count-el text-content) (+ fav-count " ❤️"))))))
;; Sync countdown timer from server ;; Sync countdown timer from server remaining
(let ((remaining (or (ps:@ data data remaining) (ps:@ data remaining)))) (let ((remaining (or (ps:@ data data remaining) (ps:@ data remaining))))
(when remaining (when remaining
(setf *track-remaining-seconds* remaining))) (setf *track-remaining-seconds* remaining)))
@ -572,9 +566,69 @@
(progn (progn
(setf (ps:@ mb-link href) search-url) (setf (ps:@ mb-link href) search-url)
(setf (ps:@ mb-link style display) "inline")) (setf (ps:@ mb-link style display) "inline"))
(setf (ps:@ mb-link style display) "none")))))))) (setf (ps:@ mb-link style display) "none"))))))
;; Update mini now playing display (for persistent player frame)
(defun update-mini-now-playing ()
(unless *mini-now-playing-in-flight*
(setf *mini-now-playing-in-flight* true)
(let ((mount (get-current-mount)))
(ps:chain
(fetch (+ "/api/asteroid/partial/now-playing-json?mount=" mount))
(then (lambda (response)
(if (ps:@ response ok)
(ps:chain response (json))
nil)))
(then (lambda (data)
(when data
(let ((el (ps:chain document (get-element-by-id "mini-now-playing")))
(title (or (ps:@ data data title) (ps:@ data title) "Loading..."))
(changed-at (or (ps:@ data data changed_at) (ps:@ data changed_at))))
;; Update buffer lag measurement from audio element
(let ((audio (ps:chain document (get-element-by-id "persistent-audio"))))
(when audio
(let ((ahead (get-buffer-ahead audio)))
(when (and ahead (> ahead 0))
(setf *measured-buffer-lag-ms*
(ps:chain -math (round (* ahead 1000))))))))
;; If title hasn't changed from what's displayed, just update remaining
(when el
(if (= (ps:@ el text-content) title)
;; Same title - just sync countdown
(let ((remaining (or (ps:@ data data remaining) (ps:@ data remaining))))
(when remaining
(setf *track-remaining-seconds* remaining)))
;; New title detected - schedule update based on changed_at
(progn
;; Cancel any pending scheduled update
(when *pending-title-timer*
(clear-timeout *pending-title-timer*)
(setf *pending-title-timer* nil))
(if changed-at
;; Calculate when listener will hear this track
(let* ((now (ps:chain -date (now)))
(target-time (+ changed-at *measured-buffer-lag-ms*))
(delay (- target-time now)))
(ps:chain console (log "[STREAM-SYNC] New title:" title
"changed_at:" changed-at
"buffer_lag:" *measured-buffer-lag-ms*
"delay:" delay "ms"))
(if (> delay 0)
;; Schedule for when listener will hear it
(setf *pending-title-timer*
(set-timeout
(lambda ()
(setf *pending-title-timer* nil)
(apply-title-update title data))
delay))
;; Delay already passed - apply immediately
(apply-title-update title data)))
;; No changed_at (first track) - apply immediately
(apply-title-update title data)))))))))
(catch (lambda (error) (catch (lambda (error)
(ps:chain console (log "Could not fetch now playing:" error))))))) (ps:chain console (log "Could not fetch now playing:" error))))
(then (lambda () (setf *mini-now-playing-in-flight* false)))
(catch (lambda () (setf *mini-now-playing-in-flight* false)))))))
;; Toggle favorite for mini player ;; Toggle favorite for mini player
(defun toggle-favorite-mini () (defun toggle-favorite-mini ()
@ -739,14 +793,14 @@
(setf (ps:@ new-source type) (ps:@ config type)) (setf (ps:@ new-source type) (ps:@ config type))
(ps:chain audio (append-child new-source)))) (ps:chain audio (append-child new-source))))
;; Reload and play keep *is-reconnecting* true until 'playing' fires ;; Reload and play - keep *is-reconnecting* true until 'playing' fires
(ps:chain audio (load)) (ps:chain audio (load))
(set-timeout (set-timeout
(lambda () (lambda ()
(ps:chain audio (play) (ps:chain audio (play)
(catch (lambda (error) (catch (lambda (error)
(ps:chain console (log "Reconnect play failed:" error)) (ps:chain console (log "Reconnect play failed:" error))
;; play() rejected reset so next stall/error can retry ;; play() rejected - reset so next stall/error can retry
(setf *is-reconnecting* false))))) (setf *is-reconnecting* false)))))
500))) 500)))
@ -829,7 +883,7 @@
;; Exponential backoff: 5s, 10s, 20s, max 60s ;; Exponential backoff: 5s, 10s, 20s, max 60s
(let ((delay (ps:chain -math (min (* 5000 (ps:chain -math (pow 2 (- *stall-count* 1)))) 60000)))) (let ((delay (ps:chain -math (min (* 5000 (ps:chain -math (pow 2 (- *stall-count* 1)))) 60000))))
(if (> *stall-count* 10) (if (> *stall-count* 10)
;; Give up after 10 stall attempts show manual retry ;; Give up after 10 stall attempts - show manual retry
(progn (progn
(ps:chain console (log "Too many stall retries, giving up auto-reconnect")) (ps:chain console (log "Too many stall retries, giving up auto-reconnect"))
(show-status "⚠️ Stream unavailable - click play to retry" true)) (show-status "⚠️ Stream unavailable - click play to retry" true))
@ -1023,7 +1077,7 @@
;; Start now playing updates and countdown ticker ;; Start now playing updates and countdown ticker
(set-timeout update-mini-now-playing 1000) (set-timeout update-mini-now-playing 1000)
(set-interval update-mini-now-playing 15000) (set-interval update-mini-now-playing 10000)
(start-countdown-ticker)))) (start-countdown-ticker))))
;; Initialize popout player ;; Initialize popout player

View File

@ -7,7 +7,7 @@
#EXTINF:-1,Tycho - Glider #EXTINF:-1,Tycho - Glider
/app/music/Tycho - Epoch (Deluxe Version) (2019) [WEB FLAC16-44.1]/01 - Glider.flac /app/music/Tycho - Epoch (Deluxe Version) (2019) [WEB FLAC16-44.1]/01 - Glider.flac
#EXTINF:-1,Boards of Canada - Spectrum #EXTINF:-1,Boards of Canada - Spectrum The issue
/app/music/Boards of Canada/A Few Old Tunes/01 - Spectrum.mp3 /app/music/Boards of Canada/A Few Old Tunes/01 - Spectrum.mp3
#EXTINF:-1,Ulrich Schnauss - Melts into Air #EXTINF:-1,Ulrich Schnauss - Melts into Air
/app/music/Ulrich Schnauss - No Further Ahead Than Tomorrow (2020) - WEB FLAC/01. Melts into Air (2019 Version).flac /app/music/Ulrich Schnauss - No Further Ahead Than Tomorrow (2020) - WEB FLAC/01. Melts into Air (2019 Version).flac

View File

@ -13,7 +13,7 @@
"Port for the cl-streamer HTTP stream server.") "Port for the cl-streamer HTTP stream server.")
(defvar *shuffle-pipeline* nil (defvar *shuffle-pipeline* nil
"The shuffle stream pipeline plays random tracks from the music library.") "The shuffle stream pipeline - plays random tracks from the music library.")
;; Encoder instances are now owned by the pipeline (Phase 2). ;; Encoder instances are now owned by the pipeline (Phase 2).
;; Kept as aliases for backward compatibility with any external references. ;; Kept as aliases for backward compatibility with any external references.
@ -99,12 +99,12 @@
(setf *current-playlist-path* playlist-path) (setf *current-playlist-path* playlist-path)
(setf *resumed-from-saved-state* t) (setf *resumed-from-saved-state* t)
(if playlist-changed-p (if playlist-changed-p
;; Different playlist should be active start from beginning ;; Different playlist should be active - start from beginning
(progn (progn
(log:info "Scheduled playlist changed: ~A -> ~A, starting from beginning" (log:info "Scheduled playlist changed: ~A -> ~A, starting from beginning"
saved-playlist-name scheduled-name) saved-playlist-name scheduled-name)
(values file-list playlist-path)) (values file-list playlist-path))
;; Same playlist resume from saved position ;; Same playlist - resume from saved position
(let ((pos (when saved-file (let ((pos (when saved-file
(position saved-file file-list :test #'string=)))) (position saved-file file-list :test #'string=))))
(if pos (if pos
@ -197,41 +197,30 @@
(defun harmony-now-playing (&optional (mount "asteroid.mp3")) (defun harmony-now-playing (&optional (mount "asteroid.mp3"))
"Get now-playing information from cl-streamer pipeline. "Get now-playing information from cl-streamer pipeline.
Uses the metadata timeline to report what listeners are actually hearing, Returns the current pipeline title, remaining seconds, and a server
accounting for ring buffer and browser decode buffering." timestamp (epoch ms) of when the metadata last changed. The client
uses this timestamp plus its known buffer lag to schedule UI updates."
(when (and *harmony-pipeline* (when (and *harmony-pipeline*
(cl-streamer/harmony:pipeline-current-track *harmony-pipeline*)) (cl-streamer/harmony:pipeline-current-track *harmony-pipeline*))
(let* ((server (cl-streamer/harmony:pipeline-server *harmony-pipeline*)) (let* ((track-info (cl-streamer/harmony:pipeline-current-track *harmony-pipeline*))
(listener-title (when server (display-title (or (getf track-info :display-title) "Unknown"))
(cl-streamer:get-listener-now-playing
server (format nil "/~A" mount))))
(track-info (cl-streamer/harmony:pipeline-current-track *harmony-pipeline*))
(display-title (or listener-title
(getf track-info :display-title)
"Unknown"))
(listeners (cl-streamer:pipeline-listener-count *harmony-pipeline*)) (listeners (cl-streamer:pipeline-listener-count *harmony-pipeline*))
(track-id (or (find-track-by-title display-title) (track-id (or (find-track-by-title display-title)
(find-track-by-file-path (getf track-info :file)))) (find-track-by-file-path (getf track-info :file))))
(pipeline-title (getf track-info :display-title))
(raw-remaining (cl-streamer/harmony:pipeline-track-remaining *harmony-pipeline*)) (raw-remaining (cl-streamer/harmony:pipeline-track-remaining *harmony-pipeline*))
(titles-match (or (null listener-title) (remaining (when raw-remaining (max 0 (floor raw-remaining))))
(null pipeline-title) ;; Server epoch ms when metadata last changed
(string= listener-title pipeline-title))) (server (cl-streamer/harmony:pipeline-server *harmony-pipeline*))
;; Only show remaining when titles match (delay has passed). (changed-at (when server
;; During the transition window the countdown would be inaccurate. (cl-streamer:get-metadata-changed-at
(remaining (when (and raw-remaining titles-match) server (format nil "/~A" mount)))))
(max 0 (floor raw-remaining)))))
;; Diagnostic: log when listener-title differs from pipeline title
(when (and listener-title pipeline-title
(not (string= listener-title pipeline-title)))
(log:info "[SYNC-DIAG] API returning ~S (pipeline has ~S, delay=~As)"
listener-title pipeline-title cl-streamer::*browser-buffer-seconds*))
`((:listenurl . ,(format nil "~A/~A" *stream-base-url* mount)) `((:listenurl . ,(format nil "~A/~A" *stream-base-url* mount))
(:title . ,display-title) (:title . ,display-title)
(:listeners . ,(or listeners 0)) (:listeners . ,(or listeners 0))
(:track-id . ,track-id) (:track-id . ,track-id)
(:favorite-count . ,(or (get-track-favorite-count display-title) 0)) (:favorite-count . ,(or (get-track-favorite-count display-title) 0))
,@(when remaining `((:remaining . ,remaining))))))) ,@(when remaining `((:remaining . ,remaining)))
,@(when changed-at `((:changed-at . ,changed-at)))))))
;;; ---- Pipeline Lifecycle ---- ;;; ---- Pipeline Lifecycle ----
@ -245,7 +234,7 @@
(log:warn "Harmony streaming already running") (log:warn "Harmony streaming already running")
(return-from start-harmony-streaming *harmony-pipeline*)) (return-from start-harmony-streaming *harmony-pipeline*))
;; Create pipeline from declarative spec server, mounts, encoders all handled ;; Create pipeline from declarative spec - server, mounts, encoders all handled
(setf *harmony-pipeline* (setf *harmony-pipeline*
(cl-streamer/harmony:make-pipeline (cl-streamer/harmony:make-pipeline
:port port :port port
@ -272,7 +261,7 @@
(defun stop-harmony-streaming () (defun stop-harmony-streaming ()
"Stop the cl-streamer pipeline and stream server. "Stop the cl-streamer pipeline and stream server.
Pipeline owns encoders and server cleanup is automatic." Pipeline owns encoders and server - cleanup is automatic."
(when *harmony-pipeline* (when *harmony-pipeline*
(cl-streamer/harmony:pipeline-stop *harmony-pipeline*) (cl-streamer/harmony:pipeline-stop *harmony-pipeline*)
(setf *harmony-pipeline* nil)) (setf *harmony-pipeline* nil))
@ -288,7 +277,7 @@
(when *harmony-pipeline* (when *harmony-pipeline*
(let ((file-list (m3u-to-file-list m3u-path))) (let ((file-list (m3u-to-file-list m3u-path)))
(when file-list (when file-list
;; Store pending playlist path on pipeline it will be applied ;; Store pending playlist path on pipeline - it will be applied
;; when drain-queue-into-remaining fires and the new tracks ;; when drain-queue-into-remaining fires and the new tracks
;; actually start playing, not now at queue time. ;; actually start playing, not now at queue time.
(setf (cl-streamer/harmony:pipeline-pending-playlist-path *harmony-pipeline*) (setf (cl-streamer/harmony:pipeline-pending-playlist-path *harmony-pipeline*)
@ -327,7 +316,7 @@
(list :running nil))) (list :running nil)))
;;; ============================================================ ;;; ============================================================
;;; Shuffle Stream random tracks from the music library ;;; Shuffle Stream - random tracks from the music library
;;; ============================================================ ;;; ============================================================
(defvar *shuffle-batch-size* 20 (defvar *shuffle-batch-size* 20