Documentation Index
Fetch the complete documentation index at: https://docs.oxd.sh/llms.txt
Use this file to discover all available pages before exploring further.
Setup
Placeliboxaccel.so (Linux), liboxaccel.dylib (macOS), or oxaccel.dll (Windows) in your project directory or system library path.
ctypes wrapper
import ctypes
import os
import platform
def _load_lib():
system = platform.system()
if system == "Linux":
return ctypes.CDLL("./liboxaccel.so")
elif system == "Darwin":
return ctypes.CDLL("./liboxaccel.dylib")
elif system == "Windows":
return ctypes.CDLL("./oxaccel.dll")
raise OSError(f"Unsupported platform: {system}")
_lib = _load_lib()
class OxAccelConfig(ctypes.Structure):
_fields_ = [
("api_key", ctypes.c_char_p),
("relay_host", ctypes.c_char_p),
("relay_port", ctypes.c_uint16),
("enable_fec", ctypes.c_bool),
("enable_compression", ctypes.c_bool),
("enable_multipath", ctypes.c_bool),
]
class AccelStats(ctypes.Structure):
_fields_ = [
("packets_sent", ctypes.c_uint64),
("packets_recv", ctypes.c_uint64),
("bytes_sent", ctypes.c_uint64),
("bytes_recv", ctypes.c_uint64),
]
# Type signatures
_lib.ox_accel_create.argtypes = [ctypes.POINTER(OxAccelConfig)]
_lib.ox_accel_create.restype = ctypes.c_void_p
_lib.ox_accel_connect.argtypes = [ctypes.c_void_p]
_lib.ox_accel_connect.restype = ctypes.c_int
_lib.ox_accel_destroy.argtypes = [ctypes.c_void_p]
_lib.ox_accel_destroy.restype = None
_lib.ox_accel_send.argtypes = [ctypes.c_void_p, ctypes.POINTER(ctypes.c_uint8), ctypes.c_size_t]
_lib.ox_accel_send.restype = ctypes.c_int
_lib.ox_accel_recv.argtypes = [ctypes.c_void_p, ctypes.POINTER(ctypes.c_uint8), ctypes.c_size_t, ctypes.POINTER(ctypes.c_size_t)]
_lib.ox_accel_recv.restype = ctypes.c_int
_lib.ox_accel_stats.argtypes = [ctypes.c_void_p, ctypes.POINTER(AccelStats)]
_lib.ox_accel_stats.restype = ctypes.c_int
_lib.ox_accel_version.argtypes = []
_lib.ox_accel_version.restype = ctypes.c_char_p
class Accel:
"""High-level Python wrapper for the Oxidize Accel SDK."""
def __init__(self, api_key: str, relay_host: str = "relay.oxd.sh",
relay_port: int = 51820, fec: bool = True,
compression: bool = True, multipath: bool = True):
config = OxAccelConfig(
api_key=api_key.encode(),
relay_host=relay_host.encode(),
relay_port=relay_port,
enable_fec=fec,
enable_compression=compression,
enable_multipath=multipath,
)
self._ctx = _lib.ox_accel_create(ctypes.byref(config))
if not self._ctx:
raise RuntimeError("Failed to create Accel context")
def connect(self):
err = _lib.ox_accel_connect(self._ctx)
if err != 0:
raise RuntimeError(f"Connect failed: error {err}")
def send(self, data: bytes):
buf = (ctypes.c_uint8 * len(data))(*data)
err = _lib.ox_accel_send(self._ctx, buf, len(data))
if err != 0:
raise RuntimeError(f"Send failed: error {err}")
def recv(self, max_size: int = 65536) -> bytes:
buf = (ctypes.c_uint8 * max_size)()
out_len = ctypes.c_size_t(0)
err = _lib.ox_accel_recv(self._ctx, buf, max_size, ctypes.byref(out_len))
if err != 0:
raise RuntimeError(f"Recv failed: error {err}")
return bytes(buf[:out_len.value])
def stats(self) -> dict:
s = AccelStats()
_lib.ox_accel_stats(self._ctx, ctypes.byref(s))
return {
"packets_sent": s.packets_sent,
"packets_recv": s.packets_recv,
"bytes_sent": s.bytes_sent,
"bytes_recv": s.bytes_recv,
}
def close(self):
if self._ctx:
_lib.ox_accel_destroy(self._ctx)
self._ctx = None
def __enter__(self):
self.connect()
return self
def __exit__(self, *_):
self.close()
def __del__(self):
self.close()
def version() -> str:
return _lib.ox_accel_version().decode()
Usage
from oxaccel import Accel
with Accel(api_key="your-api-key", relay_host="relay.oxd.sh") as accel:
accel.send(b"hello world")
data = accel.recv()
print(f"Received: {data}")
print(f"Stats: {accel.stats()}")
Async usage
The SDK’ssend/recv block the calling thread. For async frameworks, run them in a thread pool:
import asyncio
from oxaccel import Accel
accel = Accel(api_key="your-api-key")
accel.connect()
async def send_async(data: bytes):
loop = asyncio.get_event_loop()
await loop.run_in_executor(None, accel.send, data)
async def recv_async() -> bytes:
loop = asyncio.get_event_loop()
return await loop.run_in_executor(None, accel.recv)
For production Python deployments, consider using the Go or C++ bindings for the network-critical path and Python for orchestration.