Phase 3: Refactor stream-server.lisp to iolib
- Replace usocket with iolib for socket I/O - iolib:make-socket with :connect :passive for listener - iolib:accept-connection for client connections - Add SO_KEEPALIVE for stale connection detection - Add TCP_NODELAY for low-latency streaming - Add SO_SNDTIMEO (30s) write timeout for stale client detection - Handle iolib:socket-connection-reset-error and isys:ewouldblock - Update cl-streamer.asd: usocket→iolib, drop chunga/trivial-gray-streams - Fix now-playing poll interval 5s→15s to eliminate 429 rate limit errors Runtime verified: audio streams, metadata displays, clients connect.
This commit is contained in:
parent
b7266e3ac2
commit
2e86dc4a88
|
|
@ -6,10 +6,8 @@
|
||||||
:serial t
|
:serial t
|
||||||
:depends-on (#:alexandria
|
:depends-on (#:alexandria
|
||||||
#:bordeaux-threads
|
#:bordeaux-threads
|
||||||
#:usocket
|
#:iolib
|
||||||
#:flexi-streams
|
#:flexi-streams
|
||||||
#:chunga
|
|
||||||
#:trivial-gray-streams
|
|
||||||
#:split-sequence
|
#:split-sequence
|
||||||
#:log4cl)
|
#:log4cl)
|
||||||
:components ((:file "package")
|
:components ((:file "package")
|
||||||
|
|
|
||||||
|
|
@ -3,6 +3,9 @@
|
||||||
(defparameter *default-port* 8000
|
(defparameter *default-port* 8000
|
||||||
"Default port for the streaming server.")
|
"Default port for the streaming server.")
|
||||||
|
|
||||||
|
(defparameter *client-write-timeout* 30
|
||||||
|
"Seconds before a stale client write is abandoned.")
|
||||||
|
|
||||||
(defclass stream-server ()
|
(defclass stream-server ()
|
||||||
((port :initarg :port :accessor server-port :initform *default-port*)
|
((port :initarg :port :accessor server-port :initform *default-port*)
|
||||||
(socket :initform nil :accessor server-socket)
|
(socket :initform nil :accessor server-socket)
|
||||||
|
|
@ -77,14 +80,27 @@
|
||||||
(server-clients server))
|
(server-clients server))
|
||||||
(count-if #'client-active-p (server-clients server)))))
|
(count-if #'client-active-p (server-clients server)))))
|
||||||
|
|
||||||
|
(defun configure-client-socket (socket)
|
||||||
|
"Set socket options on an accepted client connection.
|
||||||
|
Enables SO_KEEPALIVE for stale connection detection and
|
||||||
|
TCP_NODELAY for low-latency streaming."
|
||||||
|
(setf (iolib:socket-option socket :keep-alive) t)
|
||||||
|
(setf (iolib:socket-option socket :tcp-nodelay) t)
|
||||||
|
;; SO_SNDTIMEO for write timeout — detect stale clients
|
||||||
|
(setf (iolib:socket-option socket :send-timeout) *client-write-timeout*))
|
||||||
|
|
||||||
(defun start-server (server)
|
(defun start-server (server)
|
||||||
"Start the streaming server."
|
"Start the streaming server."
|
||||||
(when (server-running-p server)
|
(when (server-running-p server)
|
||||||
(error 'streamer-error :message "Server already running"))
|
(error 'streamer-error :message "Server already running"))
|
||||||
(setf (server-socket server)
|
(setf (server-socket server)
|
||||||
(usocket:socket-listen "0.0.0.0" (server-port server)
|
(iolib:make-socket :connect :passive
|
||||||
|
:address-family :internet
|
||||||
|
:type :stream
|
||||||
|
:local-host "0.0.0.0"
|
||||||
|
:local-port (server-port server)
|
||||||
:reuse-address t
|
:reuse-address t
|
||||||
:element-type '(unsigned-byte 8)))
|
:backlog 128))
|
||||||
(setf (server-running-p server) t)
|
(setf (server-running-p server) t)
|
||||||
(setf (server-accept-thread server)
|
(setf (server-accept-thread server)
|
||||||
(bt:make-thread (lambda () (accept-loop server))
|
(bt:make-thread (lambda () (accept-loop server))
|
||||||
|
|
@ -98,8 +114,8 @@
|
||||||
(bt:with-lock-held ((server-clients-lock server))
|
(bt:with-lock-held ((server-clients-lock server))
|
||||||
(dolist (client (server-clients server))
|
(dolist (client (server-clients server))
|
||||||
(setf (client-active-p client) nil)
|
(setf (client-active-p client) nil)
|
||||||
(ignore-errors (usocket:socket-close (client-socket client)))))
|
(ignore-errors (close (client-socket client)))))
|
||||||
(ignore-errors (usocket:socket-close (server-socket server)))
|
(ignore-errors (close (server-socket server)))
|
||||||
(log:info "CL-Streamer stopped")
|
(log:info "CL-Streamer stopped")
|
||||||
server)
|
server)
|
||||||
|
|
||||||
|
|
@ -107,18 +123,27 @@
|
||||||
"Main accept loop for incoming connections."
|
"Main accept loop for incoming connections."
|
||||||
(loop while (server-running-p server)
|
(loop while (server-running-p server)
|
||||||
do (handler-case
|
do (handler-case
|
||||||
(let ((client-socket (usocket:socket-accept (server-socket server))))
|
(let ((client-socket (iolib:accept-connection (server-socket server))))
|
||||||
|
(handler-case
|
||||||
|
(configure-client-socket client-socket)
|
||||||
|
(error (e)
|
||||||
|
(log:debug "Socket option error: ~A" e)))
|
||||||
(bt:make-thread (lambda () (handle-client server client-socket))
|
(bt:make-thread (lambda () (handle-client server client-socket))
|
||||||
:name "cl-streamer-client"))
|
:name "cl-streamer-client"))
|
||||||
(usocket:socket-error (e)
|
(iolib:socket-connection-aborted-error ()
|
||||||
|
;; Client disconnected before we accepted — skip
|
||||||
|
nil)
|
||||||
|
(error (e)
|
||||||
(unless (server-running-p server)
|
(unless (server-running-p server)
|
||||||
(return))
|
(return))
|
||||||
(log:warn "Accept error: ~A" e)))))
|
(log:warn "Accept error: ~A" e)))))
|
||||||
|
|
||||||
(defun handle-client (server client-socket)
|
(defun handle-client (server client-socket)
|
||||||
"Handle a single client connection."
|
"Handle a single client connection.
|
||||||
|
The iolib socket is a dual-channel gray stream supporting both
|
||||||
|
binary and character I/O."
|
||||||
(let ((stream (flexi-streams:make-flexi-stream
|
(let ((stream (flexi-streams:make-flexi-stream
|
||||||
(usocket:socket-stream client-socket)
|
client-socket
|
||||||
:external-format :latin-1)))
|
:external-format :latin-1)))
|
||||||
(handler-case
|
(handler-case
|
||||||
(let* ((request-line (read-line stream))
|
(let* ((request-line (read-line stream))
|
||||||
|
|
@ -127,7 +152,7 @@
|
||||||
;; Handle CORS preflight
|
;; Handle CORS preflight
|
||||||
(when (string-equal method "OPTIONS")
|
(when (string-equal method "OPTIONS")
|
||||||
(send-cors-preflight stream)
|
(send-cors-preflight stream)
|
||||||
(ignore-errors (usocket:socket-close client-socket))
|
(ignore-errors (close client-socket))
|
||||||
(return-from handle-client))
|
(return-from handle-client))
|
||||||
(multiple-value-bind (path wants-meta)
|
(multiple-value-bind (path wants-meta)
|
||||||
(parse-icy-request request-line headers)
|
(parse-icy-request request-line headers)
|
||||||
|
|
@ -139,7 +164,7 @@
|
||||||
(send-404 stream path))))))
|
(send-404 stream path))))))
|
||||||
(error (e)
|
(error (e)
|
||||||
(log:debug "Client error: ~A" e)
|
(log:debug "Client error: ~A" e)
|
||||||
(ignore-errors (usocket:socket-close client-socket))))))
|
(ignore-errors (close client-socket))))))
|
||||||
|
|
||||||
(defun read-http-headers (stream)
|
(defun read-http-headers (stream)
|
||||||
"Read HTTP headers from STREAM. Returns alist of (name . value)."
|
"Read HTTP headers from STREAM. Returns alist of (name . value)."
|
||||||
|
|
@ -170,7 +195,7 @@
|
||||||
(unwind-protect
|
(unwind-protect
|
||||||
(stream-to-client client)
|
(stream-to-client client)
|
||||||
(setf (client-active-p client) nil)
|
(setf (client-active-p client) nil)
|
||||||
(ignore-errors (usocket:socket-close client-socket))
|
(ignore-errors (close client-socket))
|
||||||
(bt:with-lock-held ((server-clients-lock server))
|
(bt:with-lock-held ((server-clients-lock server))
|
||||||
(setf (server-clients server)
|
(setf (server-clients server)
|
||||||
(remove client (server-clients server))))
|
(remove client (server-clients server))))
|
||||||
|
|
@ -205,6 +230,15 @@
|
||||||
(write-with-metadata client chunk bytes-read)
|
(write-with-metadata client chunk bytes-read)
|
||||||
(write-sequence chunk stream :end bytes-read))
|
(write-sequence chunk stream :end bytes-read))
|
||||||
(force-output stream))
|
(force-output stream))
|
||||||
|
(iolib:socket-connection-reset-error ()
|
||||||
|
(setf (client-active-p client) nil)
|
||||||
|
(return))
|
||||||
|
(isys:ewouldblock ()
|
||||||
|
;; Write timeout — stale client
|
||||||
|
(log:warn "Write timeout on ~A, disconnecting stale client"
|
||||||
|
(mount-path mount))
|
||||||
|
(setf (client-active-p client) nil)
|
||||||
|
(return))
|
||||||
(error (e)
|
(error (e)
|
||||||
(log:warn "Client stream error on ~A: ~A"
|
(log:warn "Client stream error on ~A: ~A"
|
||||||
(mount-path mount) e)
|
(mount-path mount) e)
|
||||||
|
|
|
||||||
|
|
@ -960,7 +960,7 @@
|
||||||
|
|
||||||
;; Start now playing updates
|
;; Start now playing updates
|
||||||
(set-timeout update-mini-now-playing 1000)
|
(set-timeout update-mini-now-playing 1000)
|
||||||
(set-interval update-mini-now-playing 5000))))
|
(set-interval update-mini-now-playing 15000))))
|
||||||
|
|
||||||
;; Initialize popout player
|
;; Initialize popout player
|
||||||
(defun init-popout-player ()
|
(defun init-popout-player ()
|
||||||
|
|
@ -996,7 +996,7 @@
|
||||||
|
|
||||||
;; Start now playing updates
|
;; Start now playing updates
|
||||||
(update-popout-now-playing)
|
(update-popout-now-playing)
|
||||||
(set-interval update-popout-now-playing 5000)
|
(set-interval update-popout-now-playing 15000)
|
||||||
|
|
||||||
;; Notify parent window
|
;; Notify parent window
|
||||||
(notify-popout-opened)
|
(notify-popout-opened)
|
||||||
|
|
|
||||||
Loading…
Reference in New Issue