Source code for pythonosc.udp_client
"""UDP Clients for sending OSC messages to an OSC server"""
import sys
if sys.version_info > (3, 5):
from collections.abc import Iterable
else:
from collections import Iterable
import socket
from typing import Generator, Union
from pythonosc.dispatcher import Dispatcher
from pythonosc.osc_bundle import OscBundle
from pythonosc.osc_message import OscMessage
from pythonosc.osc_message_builder import ArgValue, OscMessageBuilder
[docs]
class UDPClient(object):
"""OSC client to send :class:`OscMessage` or :class:`OscBundle` via UDP"""
[docs]
def __init__(
self,
address: str,
port: int,
allow_broadcast: bool = False,
family: socket.AddressFamily = socket.AF_UNSPEC,
timeout: float | None = None,
) -> None:
"""Initialize client
As this is UDP it will not actually make any attempt to connect to the
given server at ip:port until the send() method is called.
Args:
address: IP address of server
port: Port of server
allow_broadcast: Allow for broadcast transmissions
family: address family parameter (passed to socket.getaddrinfo)
timeout: Default timeout in seconds for socket operations
"""
for addr in socket.getaddrinfo(
address, port, type=socket.SOCK_DGRAM, family=family
):
af, socktype, protocol, canonname, sa = addr
try:
self._sock = socket.socket(af, socktype)
except OSError:
continue
break
self._sock.setblocking(False)
if timeout is not None:
self._sock.settimeout(timeout)
self._timeout = timeout
if allow_broadcast:
self._sock.setsockopt(socket.SOL_SOCKET, socket.SO_BROADCAST, 1)
self._address = address
self._port = port
def __enter__(self) -> "UDPClient":
return self
def __exit__(
self, exc_type: type | None, exc_val: Exception | None, exc_tb: object | None
) -> None:
self.close()
[docs]
def close(self) -> None:
"""Close the socket"""
self._sock.close()
[docs]
def send(self, content: Union[OscMessage, OscBundle]) -> None:
"""Sends an :class:`OscMessage` or :class:`OscBundle` via UDP
Args:
content: Message or bundle to be sent
"""
self._sock.sendto(content.dgram, (self._address, self._port))
[docs]
def receive(self, timeout: float | None = None) -> bytes:
"""Wait :int:`timeout` seconds for a message an return the raw bytes
Args:
timeout: Number of seconds to wait for a message.
If None, uses the default timeout set in __init__.
"""
if timeout is not None:
self._sock.settimeout(timeout)
elif self._timeout is not None:
self._sock.settimeout(self._timeout)
try:
return self._sock.recv(4096)
except (TimeoutError, socket.timeout, BlockingIOError):
return b""
[docs]
class SimpleUDPClient(UDPClient):
"""Simple OSC client that automatically builds :class:`OscMessage` from arguments"""
[docs]
def send_message(
self, address: str, value: Union[ArgValue, Iterable[ArgValue]]
) -> None:
"""Build :class:`OscMessage` from arguments and send to server
Args:
address: OSC address the message shall go to
value: One or more arguments to be added to the message
"""
builder = OscMessageBuilder(address=address)
if value is None:
pass
elif not isinstance(value, Iterable) or isinstance(value, (str, bytes)):
builder.add_arg(value)
else:
for val in value:
builder.add_arg(val)
msg = builder.build()
self.send(msg)
[docs]
def get_messages(self, timeout: float | None = None) -> Generator:
"""Wait :int:`timeout` seconds for a message from the server and convert it to a :class:`OscMessage`
Args:
timeout: Time in seconds to wait for a message.
If None, uses the default timeout set in __init__.
"""
msg = self.receive(timeout)
while msg:
yield OscMessage(msg)
msg = self.receive(timeout)
[docs]
class DispatchClient(SimpleUDPClient):
"""OSC Client that includes a :class:`Dispatcher` for handling responses and other messages from the server"""
dispatcher = Dispatcher()
[docs]
def handle_messages(self, timeout: float | None = None) -> None:
"""Wait :int:`timeout` seconds for a message from the server and process each message with the registered
handlers. Continue until a timeout occurs.
Args:
timeout: Time in seconds to wait for a message.
If None, uses the default timeout set in __init__.
"""
msg = self.receive(timeout)
while msg:
self.dispatcher.call_handlers_for_packet(msg, (self._address, self._port))
msg = self.receive(timeout)