Source code for goojprt.transport.spp
"""Classic Bluetooth Serial Port Profile (RFCOMM) transport.
Synchronous, Linux-only. macOS does not expose ``AF_BLUETOOTH`` through
the CPython ``socket`` module; BLE should be used there instead.
"""
import socket
import time
from typing import Optional
[docs]
class SppTransport:
"""Synchronous Bluetooth SPP / RFCOMM transport.
Only supported on Linux. Attempts to :meth:`connect` on macOS raise
:class:`OSError` with a hint to switch to BLE.
"""
def __init__(self) -> None:
"""Create a disconnected transport; call :meth:`connect` to attach."""
self._sock: Optional[socket.socket] = None
CHUNK_SIZE_RASTER = 192 # multiple of 48 (one raster row = 48 bytes)
CHUNK_DELAY = 0.04 # seconds between chunks, same as BleTransport
[docs]
def connect(self, address: str, port: int = 1) -> None:
"""Open an RFCOMM socket to the printer.
:param address: Bluetooth MAC address of the printer.
:param port: RFCOMM channel, usually ``1``.
:raises OSError: On platforms without ``AF_BLUETOOTH`` support
(notably macOS).
"""
if not hasattr(socket, "AF_BLUETOOTH"):
raise OSError(
"SPP / RFCOMM is not available on macOS via the Python socket module.\n"
"Use the BLE transport instead (connect_ble(address))."
)
self._sock = socket.socket(
socket.AF_BLUETOOTH, socket.SOCK_STREAM, socket.BTPROTO_RFCOMM # type: ignore[attr-defined]
)
self._sock.connect((address, port))
[docs]
def disconnect(self) -> None:
"""Close the RFCOMM socket (if open)."""
if self._sock:
self._sock.close()
self._sock = None
@property
def is_connected(self) -> bool:
"""Whether the RFCOMM socket is currently open."""
return self._sock is not None
[docs]
def write(self, data: bytes) -> None:
"""Send raw bytes over the RFCOMM socket.
:raises RuntimeError: When the transport is not connected.
"""
if not self.is_connected:
raise RuntimeError("SPP transport is not connected.")
assert self._sock is not None
self._sock.sendall(data)
[docs]
def write_raster_strip(self, data: bytes, rows: int = 24) -> None:
"""Send one raster strip using row-aligned 192-byte chunks.
Mirrors :meth:`~goojprt.transport.ble.BleTransport.write_raster_strip`
but synchronous. Chunk boundaries are aligned to 48 bytes (one raster
row) to prevent pixel-shift artifacts on cheap printer firmware.
After all chunks are sent, sleeps ``max(0.05, rows * 0.002)`` seconds
so the paper motor can advance before the next strip is issued.
:param data: A complete ``GS v 0`` strip payload.
:param rows: Number of raster rows in this strip (used to compute
the post-strip throttle). Defaults to 24.
:raises RuntimeError: When the transport is not connected.
"""
if not self.is_connected:
raise RuntimeError("SPP transport is not connected.")
assert self._sock is not None
for i in range(0, len(data), self.CHUNK_SIZE_RASTER):
chunk = data[i : i + self.CHUNK_SIZE_RASTER]
self._sock.send(chunk)
time.sleep(self.CHUNK_DELAY)
time.sleep(max(0.05, rows * 0.002))