Server

The server receives OSC Messages from connected clients and invoked the appropriate callback functions with the dispatcher. There are several server types available. Server implementations are available for both UDP and TCP protocols.

Blocking Server

The blocking server type is the simplest of them all. Once it starts to serve, it blocks the program execution forever and remains idle inbetween handling requests. This type is good enough if your application is very simple and only has to react to OSC messages coming in and nothing else.

from pythonosc.dispatcher import Dispatcher
from pythonosc.osc_server import BlockingOSCUDPServer


def print_handler(address, *args):
    print(f"{address}: {args}")


def default_handler(address, *args):
    print(f"DEFAULT {address}: {args}")


dispatcher = Dispatcher()
dispatcher.map("/something/*", print_handler)
dispatcher.set_default_handler(default_handler)

ip = "127.0.0.1"
port = 1337

server = BlockingOSCUDPServer((ip, port), dispatcher)
server.serve_forever()  # Blocks forever

Threading Server

Each incoming packet will be handled in it’s own thread. This also blocks further program execution, but allows concurrent handling of multiple incoming messages. Otherwise usage is identical to blocking type. Use for lightweight message handlers.

Forking Server

The process is forked every time a packet is coming in. Also blocks program execution forever. Use for heavyweight message handlers.

Async Server

This server type takes advantage of the asyncio functionality of python, and allows truly non-blocking parallel execution of both your main loop and the server loop. You can use it in two ways, exclusively and concurrently. In the concurrent mode other tasks (like a main loop) can run in parallel to the server, meaning that the server doesn’t block further program execution. In exclusive mode the server task is the only task that is started.

Concurrent Mode

Use this mode if you have a main program loop that needs to run without being blocked by the server. The below example runs init_main() once, which creates the serve endpoint and adds it to the asyncio event loop. The transport object is returned, which is required later to clean up the endpoint and release the socket. Afterwards we start the main loop with await loop(). The example loop runs 10 times and sleeps for a second on every iteration. During the sleep the program execution is handed back to the event loop which gives the serve endpoint a chance to handle incoming OSC messages. Your loop needs to at least do an await asyncio.sleep(0) every iteration, otherwise your main loop will never release program control back to the event loop.

from pythonosc.osc_server import AsyncIOOSCUDPServer
from pythonosc.dispatcher import Dispatcher
import asyncio


def filter_handler(address, *args):
    print(f"{address}: {args}")


dispatcher = Dispatcher()
dispatcher.map("/filter", filter_handler)

ip = "127.0.0.1"
port = 1337


async def loop():
    """Example main loop that only runs for 10 iterations before finishing"""
    for i in range(10):
        print(f"Loop {i}")
        await asyncio.sleep(1)


async def init_main():
    server = AsyncIOOSCUDPServer((ip, port), dispatcher, asyncio.get_event_loop())
    transport, protocol = await server.create_serve_endpoint()  # Create datagram endpoint and start serving

    await loop()  # Enter main loop of program

    transport.close()  # Clean up serve endpoint


asyncio.run(init_main())

Exclusive Mode

This mode comes without a main loop. You only have the OSC server running in the event loop initially. You could of course use an OSC message to start a main loop from within a handler.

from pythonosc.osc_server import AsyncIOOSCUDPServer
from pythonosc.dispatcher import Dispatcher
import asyncio


def filter_handler(address, *args):
    print(f"{address}: {args}")


dispatcher = Dispatcher()
dispatcher.map("/filter", filter_handler)

ip = "127.0.0.1"
port = 1337

server = AsyncIOOSCUDPServer((ip, port), dispatcher, asyncio.get_event_loop())
server.serve()

Server Module Documentation

OSC Servers that receive UDP packets and invoke handlers accordingly.

class pythonosc.osc_server.AsyncIOOSCUDPServer(server_address: Tuple[str, int], dispatcher: Dispatcher, loop: BaseEventLoop)[source]

Asynchronous OSC Server

An asynchronous OSC Server using UDP. It creates a datagram endpoint that runs in an event loop.

__init__(server_address: Tuple[str, int], dispatcher: Dispatcher, loop: BaseEventLoop) None[source]

Initialize

Parameters:
  • server_address – IP and port of server

  • dispatcher – Dispatcher this server shall use

  • loop – Event loop to add the server task to. Use asyncio.get_event_loop() unless you know what you’re doing.

create_serve_endpoint() Coroutine[Any, Any, Tuple[BaseTransport, DatagramProtocol]][source]

Creates a datagram endpoint and registers it with event loop as coroutine.

Returns:

Awaitable coroutine that returns transport and protocol objects

serve() None[source]

Creates a datagram endpoint and registers it with event loop.

Use this only in synchronous code (i.e. not from within a coroutine). This will start the server and run it forever or until a stop() is called on the event loop.

class pythonosc.osc_server.BlockingOSCUDPServer(server_address: Tuple[str, int], dispatcher: Dispatcher, bind_and_activate: bool = True)[source]

Blocking version of the UDP server.

Each message will be handled sequentially on the same thread. Use this is you don’t care about latency in your message handling or don’t have a multiprocess/multithread environment.

class pythonosc.osc_server.ForkingOSCUDPServer(server_address: Tuple[str, int], dispatcher: Dispatcher, bind_and_activate: bool = True)[source]

Forking version of the OSC UDP server.

Each message will be handled in its own new process. Use this when heavyweight operations are done by each message handlers and forking a whole new process for each of them is worth it.

class pythonosc.osc_server.OSCUDPServer(server_address: Tuple[str, int], dispatcher: Dispatcher, bind_and_activate: bool = True)[source]

Superclass for different flavors of OSC UDP servers

__init__(server_address: Tuple[str, int], dispatcher: Dispatcher, bind_and_activate: bool = True) None[source]

Initialize

Parameters:
  • server_address – IP and port of server

  • dispatcher – Dispatcher this server will use

  • bind_and_activate ((optional)) – default=True defines if the server has to start on call of constructor

verify_request(request: socket | Tuple[bytes, socket], client_address: Tuple[str, int] | str) bool[source]

Returns true if the data looks like a valid OSC UDP datagram

Parameters:
  • request – Incoming data

  • client_address – IP and port of client this message came from

Returns:

True if request is OSC bundle or OSC message

class pythonosc.osc_server.ThreadingOSCUDPServer(server_address: Tuple[str, int], dispatcher: Dispatcher, bind_and_activate: bool = True)[source]

Threading version of the OSC UDP server.

Each message will be handled in its own new thread. Use this when lightweight operations are done by each message handlers.

OSC Servers that receive TCP packets and invoke handlers accordingly.

Use like this:

dispatcher = dispatcher.Dispatcher() # This will print all parameters to stdout. dispatcher.map(“/bpm”, print) server = ForkingOSCTCPServer((ip, port), dispatcher) server.serve_forever()

or run the server on its own thread: server = ForkingOSCTCPServer((ip, port), dispatcher) server_thread = threading.Thread(target=server.serve_forever) server_thread.start() … server.shutdown()

Those servers are using the standard socketserver from the standard library: http://docs.python.org/library/socketserver.html

Alternatively, the AsyncIOOSCTCPServer server can be integrated with an asyncio event loop:

loop = asyncio.get_event_loop() server = AsyncIOOSCTCPServer(server_address, dispatcher) server.serve() loop.run_forever()

class pythonosc.osc_tcp_server.AsyncOSCTCPServer(server_address: str, port: int, dispatcher: Dispatcher, mode: str = '1.1')[source]

Asyncio version of the OSC TCP Server. Each TCP message is handled by _call_handlers_for_packet, the same method as in the OSCTCPServer family of blocking, threading, and forking servers

__init__(server_address: str, port: int, dispatcher: Dispatcher, mode: str = '1.1')[source]
Parameters:
  • server_address – tuple of (IP address to bind to, port)

  • dispatcher – a pythonosc.dispatcher.Dispatcher

async start() None[source]

creates a socket endpoint and registers it with our event loop

class pythonosc.osc_tcp_server.BlockingOSCTCPServer(server_address: Tuple[str | bytes | bytearray, int], dispatcher: Dispatcher, mode: str = '1.1')[source]

Blocking version of the TCP server.

Each message will be handled sequentially on the same thread. Use this is you don’t care about latency in your message handling or don’t have a multiprocess/multithread environment (really?).

class pythonosc.osc_tcp_server.ForkingOSCTCPServer(server_address: Tuple[str | bytes | bytearray, int], dispatcher: Dispatcher, mode: str = '1.1')[source]

Forking version of the OSC TCP server.

Each message will be handled in its own new process. Use this when heavyweight operations are done by each message handlers and forking a whole new process for each of them is worth it.

class pythonosc.osc_tcp_server.OSCTCPServer(server_address: Tuple[str | bytes | bytearray, int], dispatcher: Dispatcher, mode: str = '1.1')[source]

Superclass for different flavors of OSCTCPServer

__init__(server_address: Tuple[str | bytes | bytearray, int], dispatcher: Dispatcher, mode: str = '1.1')[source]

Constructor. May be extended, do not override.

property dispatcher

Dispatcher accessor for handlers to dispatch osc messages.

class pythonosc.osc_tcp_server.ThreadingOSCTCPServer(server_address: Tuple[str | bytes | bytearray, int], dispatcher: Dispatcher, mode: str = '1.1')[source]

Threading version of the OSC TCP server.

Each message will be handled in its own new thread. Use this when lightweight operations are done by each message handlers.