"""OSC Servers that receive UDP packets and invoke handlers accordingly."""
import asyncio
import os
import socket
import socketserver
from socket import socket as _socket
from typing import Any, Coroutine, Tuple, Union, cast
from pythonosc import osc_bundle, osc_message
from pythonosc.dispatcher import Dispatcher
from pythonosc.osc_message_builder import build_msg
_RequestType = Union[_socket, Tuple[bytes, _socket]]
_AddressType = Union[Tuple[str, int], str]
class _UDPHandler(socketserver.BaseRequestHandler):
"""Handles correct UDP messages for all types of server."""
def __init__(self, request, client_address, server):
self.socket = request[1]
super().__init__(request, client_address, server)
def handle(self) -> None:
"""Calls the handlers via dispatcher
This method is called after a basic sanity check was done on the datagram,
whether this datagram looks like an osc message or bundle.
If not the server won't call it and so no new
threads/processes will be spawned.
"""
server = cast(OSCUDPServer, self.server)
resp = server.dispatcher.call_handlers_for_packet(
self.request[0], self.client_address
)
for r in resp:
if not isinstance(r, tuple):
r = [r]
msg = build_msg(r[0], r[1:])
self.socket.sendto(msg.dgram, self.client_address)
def _is_valid_request(request: _RequestType) -> bool:
"""Returns true if the request's data looks like an osc bundle or message.
Returns:
True if request is OSC bundle or OSC message
"""
assert isinstance(
request, tuple
) # TODO: handle requests which are passed just as a socket?
data = request[0]
return osc_bundle.OscBundle.dgram_is_bundle(
data
) or osc_message.OscMessage.dgram_is_message(data)
[docs]
class OSCUDPServer(socketserver.UDPServer):
"""Superclass for different flavors of OSC UDP servers"""
[docs]
def __init__(
self,
server_address: Tuple[str, int],
dispatcher: Dispatcher,
bind_and_activate: bool = True,
timeout: float | None = None,
family: socket.AddressFamily | None = None,
) -> None:
"""Initialize
Args:
server_address: IP and port of server
dispatcher: Dispatcher this server will use
(optional) bind_and_activate: default=True defines if the server has to start on call of constructor
(optional) timeout: Default timeout in seconds for socket operations
(optional) family: socket.AF_INET or socket.AF_INET6. If None, it will be inferred from server_address.
"""
if family is not None:
self.address_family = family
else:
# Try to infer address family from server_address
try:
infos = socket.getaddrinfo(
server_address[0],
server_address[1],
type=socket.SOCK_DGRAM,
family=socket.AF_UNSPEC,
)
if infos:
self.address_family = infos[0][0]
except (socket.gaierror, IndexError):
# Fallback to default if resolution fails
pass
super().__init__(server_address, _UDPHandler, bind_and_activate)
self._dispatcher = dispatcher
self.timeout = timeout
[docs]
def verify_request(
self, request: _RequestType, client_address: _AddressType
) -> bool:
"""Returns true if the data looks like a valid OSC UDP datagram
Args:
request: Incoming data
client_address: IP and port of client this message came from
Returns:
True if request is OSC bundle or OSC message
"""
return _is_valid_request(request)
@property
def dispatcher(self) -> Dispatcher:
return self._dispatcher
[docs]
class BlockingOSCUDPServer(OSCUDPServer):
"""Blocking version of the UDP server.
Each message will be handled sequentially on the same thread.
Use this is you don't care about latency in your message handling or don't
have a multiprocess/multithread environment.
"""
[docs]
class ThreadingOSCUDPServer(socketserver.ThreadingMixIn, OSCUDPServer):
"""Threading version of the OSC UDP server.
Each message will be handled in its own new thread.
Use this when lightweight operations are done by each message handlers.
"""
if hasattr(os, "fork"):
[docs]
class ForkingOSCUDPServer(socketserver.ForkingMixIn, OSCUDPServer):
"""Forking version of the OSC UDP server.
Each message will be handled in its own new process.
Use this when heavyweight operations are done by each message handlers
and forking a whole new process for each of them is worth it.
"""
[docs]
class AsyncIOOSCUDPServer:
"""Asynchronous OSC Server
An asynchronous OSC Server using UDP. It creates a datagram endpoint that runs in an event loop.
"""
[docs]
def __init__(
self,
server_address: Tuple[str, int],
dispatcher: Dispatcher,
loop: asyncio.BaseEventLoop,
) -> None:
"""Initialize
Args:
server_address: IP and port of server
dispatcher: Dispatcher this server shall use
loop: Event loop to add the server task to. Use ``asyncio.get_event_loop()`` unless you know what you're
doing.
"""
self._server_address = server_address
self._dispatcher = dispatcher
self._loop = loop
class _OSCProtocolFactory(asyncio.DatagramProtocol):
"""OSC protocol factory which passes datagrams to dispatcher"""
def __init__(self, dispatcher: Dispatcher) -> None:
self.dispatcher = dispatcher
def connection_made(self, transport):
self.transport = transport
def datagram_received(
self, data: bytes, client_address: Tuple[str, int]
) -> None:
resp = self.dispatcher.call_handlers_for_packet(data, client_address)
for r in resp:
if not isinstance(r, tuple):
r = [r]
msg = build_msg(r[0], r[1:])
self.transport.sendto(msg.dgram, client_address)
[docs]
def serve(self) -> None:
"""Creates a datagram endpoint and registers it with event loop.
Use this only in synchronous code (i.e. not from within a coroutine). This will start the server and run it
forever or until a ``stop()`` is called on the event loop.
"""
self._loop.run_until_complete(self.create_serve_endpoint())
[docs]
def create_serve_endpoint(
self,
) -> Coroutine[
Any, Any, Tuple[asyncio.transports.BaseTransport, asyncio.DatagramProtocol]
]:
"""Creates a datagram endpoint and registers it with event loop as coroutine.
Returns:
Awaitable coroutine that returns transport and protocol objects
"""
return self._loop.create_datagram_endpoint(
lambda: self._OSCProtocolFactory(self.dispatcher),
local_addr=self._server_address,
)
@property
def dispatcher(self) -> Dispatcher:
return self._dispatcher