Source code for goojprt.transport.ble

"""Bluetooth Low Energy transport backed by :mod:`bleak`.

Handles three concerns that any BLE-facing printer driver must address:

1. **Characteristic discovery.** The write characteristic is located by
   UUID (primary then alternate GoojPrt family UUIDs); the optional
   notify characteristic is registered if present, so the printer's
   responses can be read back.
2. **MTU chunking with pacing.** BLE packets are limited by the
   connection MTU; ``write-without-response`` is used (no ACK from the
   link layer) so a small sleep between chunks is required to avoid
   overflowing the peripheral's queue.
3. **Post-raster throttle.** After a large raster image, the peripheral
   needs time to actually feed paper. A short sleep proportional to the
   number of raster rows is inserted to keep the producer in sync with
   the paper motor.
"""

import asyncio
from typing import Optional

from goojprt.constants import (
    BLE_NOTIFY_CHAR_UUID,
    BLE_NOTIFY_CHAR_UUID_ALT,
    BLE_SERVICE_UUID,
    BLE_SERVICE_UUID_ALT,
    BLE_WRITE_CHAR_UUID,
    BLE_WRITE_CHAR_UUID_ALT,
)

try:
    from bleak import BleakClient, BleakScanner
    BLEAK_AVAILABLE = True
except ImportError:
    BLEAK_AVAILABLE = False


[docs] class BleTransport: """Asynchronous BLE transport for GoojPrt PT-210 printers. All methods are ``async``. Instances are not thread-safe — use a single transport from a single event loop at a time. """ #: BLE MTU minus protocol overhead. Most GoojPrt firmware handles 182 bytes. CHUNK_SIZE = 182 #: Pause between chunks, covers the typical BLE connection interval (~30 ms). CHUNK_DELAY = 0.04 #: Row-aligned chunk size for raster data. Must be a multiple of 48 #: (PAPER_WIDTH_PX // 8) so chunk boundaries never split a raster row. CHUNK_SIZE_RASTER = 192 # = 4 rows × 48 bytes def __init__(self, on_disconnect=None) -> None: """Create a disconnected transport; call :meth:`connect` to attach.""" self._client: Optional["BleakClient"] = None self._write_char: Optional[str] = None self._notify_char: Optional[str] = None self._notify_buffer: bytearray = bytearray() self._on_disconnect_cb = on_disconnect
[docs] async def scan(self, timeout: float = 10.0) -> list[dict]: """Discover nearby BLE devices. :param timeout: Scan duration in seconds. :returns: List of ``{"name": str, "address": str}`` dictionaries. :raises ImportError: When :mod:`bleak` is not installed. """ if not BLEAK_AVAILABLE: raise ImportError("bleak is required: pip install bleak") devices = await BleakScanner.discover(timeout=timeout) return [{"name": d.name or "?", "address": d.address} for d in devices]
[docs] async def connect(self, address: str) -> None: """Connect to the printer and locate the write/notify characteristics. :param address: Bluetooth MAC address (or UUID on macOS). :raises ImportError: When :mod:`bleak` is not installed. :raises RuntimeError: When no write characteristic can be found (the device is probably not a GoojPrt PT-210). """ if not BLEAK_AVAILABLE: raise ImportError("bleak is required: pip install bleak") self._client = BleakClient(address, disconnected_callback=self._on_disconnect_cb) await self._client.connect() self._write_char = await self._find_write_characteristic() if not self._write_char: await self._client.disconnect() raise RuntimeError( "Failed to locate the write characteristic. " "The remote device is probably not a GoojPrt PT-210." )
async def _find_write_characteristic(self) -> Optional[str]: """Walk the GATT services and resolve write + notify characteristics. Tries the primary UUIDs first, then the alternates. If a notify characteristic is found, subscribes to it so that :meth:`_on_notify` captures any incoming responses. :returns: UUID of the write characteristic, or ``None`` when none is found. """ assert self._client is not None write_char = None for service in self._client.services: for char in service.characteristics: if "notify" in char.properties and self._notify_char is None: if char.uuid.lower() in ( BLE_NOTIFY_CHAR_UUID.lower(), BLE_NOTIFY_CHAR_UUID_ALT.lower(), ): self._notify_char = char.uuid if char.uuid.lower() in ( BLE_WRITE_CHAR_UUID.lower(), BLE_WRITE_CHAR_UUID_ALT.lower(), ): write_char = char.uuid continue if write_char is None: if "write-without-response" in char.properties or "write" in char.properties: if service.uuid.lower() in ( BLE_SERVICE_UUID.lower(), BLE_SERVICE_UUID_ALT.lower(), ): write_char = char.uuid if self._notify_char: await self._client.start_notify(self._notify_char, self._on_notify) return write_char def _on_notify(self, _sender, data: bytearray) -> None: """Callback for inbound notifications (e.g. responses to ``GS I``).""" self._notify_buffer.extend(data)
[docs] async def disconnect(self) -> None: """Disconnect from the printer and release all references.""" if self._client and self._client.is_connected: await self._client.disconnect() self._client = None self._write_char = None
@property def is_connected(self) -> bool: """Whether the BLE client is currently connected.""" return bool(self._client and self._client.is_connected) @property def has_notify(self) -> bool: """Whether the remote device exposes a notify characteristic.""" return self._notify_char is not None
[docs] async def write(self, data: bytes) -> None: """Send raw bytes over BLE with MTU chunking and pacing. :param data: Bytes to send. :raises RuntimeError: When the transport is not connected. """ if not self.is_connected: raise RuntimeError("BLE transport is not connected.") assert self._client is not None assert self._write_char is not None for i in range(0, len(data), self.CHUNK_SIZE): chunk = data[i : i + self.CHUNK_SIZE] await self._client.write_gatt_char( self._write_char, chunk, response=False ) await asyncio.sleep(self.CHUNK_DELAY)
[docs] async def write_raster_strip(self, data: bytes, rows: int = 24) -> None: """Send one raster strip using row-aligned 192-byte chunks. Chunk boundaries are aligned to ``CHUNK_SIZE_RASTER`` (a multiple of 48 bytes = one raster row) so the printer never receives a partial row at a chunk boundary, which would cause a pixel-shift on the next line. After all chunks are sent, sleeps ``max(0.05, rows * 0.002)`` seconds so the paper motor can advance before the next strip is issued. :param data: A complete ``GS v 0`` strip payload as returned by :func:`~goojprt.raster.image_to_raster_strips`. :param rows: Number of raster rows in this strip (used to compute the post-strip throttle). Defaults to 24. :raises RuntimeError: When the transport is not connected. """ if not self.is_connected: raise RuntimeError("BLE transport is not connected.") assert self._client is not None assert self._write_char is not None for i in range(0, len(data), self.CHUNK_SIZE_RASTER): chunk = data[i : i + self.CHUNK_SIZE_RASTER] await self._client.write_gatt_char( self._write_char, chunk, response=False ) await asyncio.sleep(self.CHUNK_DELAY) await asyncio.sleep(max(0.05, rows * 0.002))
[docs] async def read_notify(self, timeout: float) -> bytes: """Return the current notify buffer after waiting ``timeout`` seconds. Callers typically call :meth:`clear_notify_buffer` beforehand to discard responses from earlier queries. """ await asyncio.sleep(timeout) return bytes(self._notify_buffer)
[docs] def clear_notify_buffer(self) -> None: """Drop any buffered notify bytes (call before issuing a new query).""" self._notify_buffer.clear()
[docs] async def read_gatt(self, uuid: str) -> bytes: """Read a raw value from a GATT characteristic (e.g. Device Info). :raises RuntimeError: When the transport is not connected. """ if not self.is_connected: raise RuntimeError("BLE transport is not connected.") assert self._client is not None return bytes(await self._client.read_gatt_char(uuid))