Phase 3: Refactor stream-server.lisp to iolib

- Replace usocket with iolib for socket I/O
- iolib:make-socket with :connect :passive for listener
- iolib:accept-connection for client connections
- Add SO_KEEPALIVE for stale connection detection
- Add TCP_NODELAY for low-latency streaming
- Add SO_SNDTIMEO (30s) write timeout for stale client detection
- Handle iolib:socket-connection-reset-error and isys:ewouldblock
- Update cl-streamer.asd: usocket→iolib, drop chunga/trivial-gray-streams
- Fix now-playing poll interval 5s→15s to eliminate 429 rate limit errors

Runtime verified: audio streams, metadata displays, clients connect.
This commit is contained in:
Glenn Thompson 2026-03-08 11:33:55 +03:00
parent b7266e3ac2
commit 2e86dc4a88
3 changed files with 49 additions and 17 deletions

View File

@ -6,10 +6,8 @@
:serial t :serial t
:depends-on (#:alexandria :depends-on (#:alexandria
#:bordeaux-threads #:bordeaux-threads
#:usocket #:iolib
#:flexi-streams #:flexi-streams
#:chunga
#:trivial-gray-streams
#:split-sequence #:split-sequence
#:log4cl) #:log4cl)
:components ((:file "package") :components ((:file "package")

View File

@ -3,6 +3,9 @@
(defparameter *default-port* 8000 (defparameter *default-port* 8000
"Default port for the streaming server.") "Default port for the streaming server.")
(defparameter *client-write-timeout* 30
"Seconds before a stale client write is abandoned.")
(defclass stream-server () (defclass stream-server ()
((port :initarg :port :accessor server-port :initform *default-port*) ((port :initarg :port :accessor server-port :initform *default-port*)
(socket :initform nil :accessor server-socket) (socket :initform nil :accessor server-socket)
@ -77,14 +80,27 @@
(server-clients server)) (server-clients server))
(count-if #'client-active-p (server-clients server))))) (count-if #'client-active-p (server-clients server)))))
(defun configure-client-socket (socket)
"Set socket options on an accepted client connection.
Enables SO_KEEPALIVE for stale connection detection and
TCP_NODELAY for low-latency streaming."
(setf (iolib:socket-option socket :keep-alive) t)
(setf (iolib:socket-option socket :tcp-nodelay) t)
;; SO_SNDTIMEO for write timeout — detect stale clients
(setf (iolib:socket-option socket :send-timeout) *client-write-timeout*))
(defun start-server (server) (defun start-server (server)
"Start the streaming server." "Start the streaming server."
(when (server-running-p server) (when (server-running-p server)
(error 'streamer-error :message "Server already running")) (error 'streamer-error :message "Server already running"))
(setf (server-socket server) (setf (server-socket server)
(usocket:socket-listen "0.0.0.0" (server-port server) (iolib:make-socket :connect :passive
:reuse-address t :address-family :internet
:element-type '(unsigned-byte 8))) :type :stream
:local-host "0.0.0.0"
:local-port (server-port server)
:reuse-address t
:backlog 128))
(setf (server-running-p server) t) (setf (server-running-p server) t)
(setf (server-accept-thread server) (setf (server-accept-thread server)
(bt:make-thread (lambda () (accept-loop server)) (bt:make-thread (lambda () (accept-loop server))
@ -98,8 +114,8 @@
(bt:with-lock-held ((server-clients-lock server)) (bt:with-lock-held ((server-clients-lock server))
(dolist (client (server-clients server)) (dolist (client (server-clients server))
(setf (client-active-p client) nil) (setf (client-active-p client) nil)
(ignore-errors (usocket:socket-close (client-socket client))))) (ignore-errors (close (client-socket client)))))
(ignore-errors (usocket:socket-close (server-socket server))) (ignore-errors (close (server-socket server)))
(log:info "CL-Streamer stopped") (log:info "CL-Streamer stopped")
server) server)
@ -107,18 +123,27 @@
"Main accept loop for incoming connections." "Main accept loop for incoming connections."
(loop while (server-running-p server) (loop while (server-running-p server)
do (handler-case do (handler-case
(let ((client-socket (usocket:socket-accept (server-socket server)))) (let ((client-socket (iolib:accept-connection (server-socket server))))
(handler-case
(configure-client-socket client-socket)
(error (e)
(log:debug "Socket option error: ~A" e)))
(bt:make-thread (lambda () (handle-client server client-socket)) (bt:make-thread (lambda () (handle-client server client-socket))
:name "cl-streamer-client")) :name "cl-streamer-client"))
(usocket:socket-error (e) (iolib:socket-connection-aborted-error ()
;; Client disconnected before we accepted — skip
nil)
(error (e)
(unless (server-running-p server) (unless (server-running-p server)
(return)) (return))
(log:warn "Accept error: ~A" e))))) (log:warn "Accept error: ~A" e)))))
(defun handle-client (server client-socket) (defun handle-client (server client-socket)
"Handle a single client connection." "Handle a single client connection.
The iolib socket is a dual-channel gray stream supporting both
binary and character I/O."
(let ((stream (flexi-streams:make-flexi-stream (let ((stream (flexi-streams:make-flexi-stream
(usocket:socket-stream client-socket) client-socket
:external-format :latin-1))) :external-format :latin-1)))
(handler-case (handler-case
(let* ((request-line (read-line stream)) (let* ((request-line (read-line stream))
@ -127,7 +152,7 @@
;; Handle CORS preflight ;; Handle CORS preflight
(when (string-equal method "OPTIONS") (when (string-equal method "OPTIONS")
(send-cors-preflight stream) (send-cors-preflight stream)
(ignore-errors (usocket:socket-close client-socket)) (ignore-errors (close client-socket))
(return-from handle-client)) (return-from handle-client))
(multiple-value-bind (path wants-meta) (multiple-value-bind (path wants-meta)
(parse-icy-request request-line headers) (parse-icy-request request-line headers)
@ -139,7 +164,7 @@
(send-404 stream path)))))) (send-404 stream path))))))
(error (e) (error (e)
(log:debug "Client error: ~A" e) (log:debug "Client error: ~A" e)
(ignore-errors (usocket:socket-close client-socket)))))) (ignore-errors (close client-socket))))))
(defun read-http-headers (stream) (defun read-http-headers (stream)
"Read HTTP headers from STREAM. Returns alist of (name . value)." "Read HTTP headers from STREAM. Returns alist of (name . value)."
@ -170,7 +195,7 @@
(unwind-protect (unwind-protect
(stream-to-client client) (stream-to-client client)
(setf (client-active-p client) nil) (setf (client-active-p client) nil)
(ignore-errors (usocket:socket-close client-socket)) (ignore-errors (close client-socket))
(bt:with-lock-held ((server-clients-lock server)) (bt:with-lock-held ((server-clients-lock server))
(setf (server-clients server) (setf (server-clients server)
(remove client (server-clients server)))) (remove client (server-clients server))))
@ -205,6 +230,15 @@
(write-with-metadata client chunk bytes-read) (write-with-metadata client chunk bytes-read)
(write-sequence chunk stream :end bytes-read)) (write-sequence chunk stream :end bytes-read))
(force-output stream)) (force-output stream))
(iolib:socket-connection-reset-error ()
(setf (client-active-p client) nil)
(return))
(isys:ewouldblock ()
;; Write timeout — stale client
(log:warn "Write timeout on ~A, disconnecting stale client"
(mount-path mount))
(setf (client-active-p client) nil)
(return))
(error (e) (error (e)
(log:warn "Client stream error on ~A: ~A" (log:warn "Client stream error on ~A: ~A"
(mount-path mount) e) (mount-path mount) e)

View File

@ -960,7 +960,7 @@
;; Start now playing updates ;; Start now playing updates
(set-timeout update-mini-now-playing 1000) (set-timeout update-mini-now-playing 1000)
(set-interval update-mini-now-playing 5000)))) (set-interval update-mini-now-playing 15000))))
;; Initialize popout player ;; Initialize popout player
(defun init-popout-player () (defun init-popout-player ()
@ -996,7 +996,7 @@
;; Start now playing updates ;; Start now playing updates
(update-popout-now-playing) (update-popout-now-playing)
(set-interval update-popout-now-playing 5000) (set-interval update-popout-now-playing 15000)
;; Notify parent window ;; Notify parent window
(notify-popout-opened) (notify-popout-opened)