Source code for goojprt.printer

"""High-level SDK facade for the GoojPrt PT-210 thermal printer.

:class:`GoojPrtPT210` is the entry point of the SDK. It owns both
transports (BLE / SPP), composes :mod:`goojprt.commands`,
:mod:`goojprt.raster` and :mod:`goojprt.rendering` into an ergonomic
API, and exposes a dual set of methods:

* **BLE (async)** — ``connect_ble`` / ``print_text`` / ``print_image``
  / … . This is the primary, cross-platform path.
* **SPP (sync, Linux only)** — ``connect_spp`` / ``print_text_spp`` /
  … . Preserved for callers that need classic RFCOMM.

The two API surfaces share the underlying byte builders so their output
is byte-identical.
"""

from goojprt import commands, raster, rendering
from goojprt.enums import Align, CodePage, TextSize
from goojprt.transport import BleTransport, SppTransport


[docs] class GoojPrtPT210: """Driver for the GoojPrt PT-210 thermal printer (48 mm / 384 px head). Supports BLE (async, cross-platform, primary) and classic Bluetooth SPP (sync, Linux only). Both API surfaces are exposed side-by-side for backward compatibility. Example (BLE):: import asyncio from goojprt import GoojPrtPT210 async def main() -> None: printer = GoojPrtPT210() await printer.connect_ble("XX:XX:XX:XX:XX:XX") await printer.initialize() await printer.print_text("Hello, world!") await printer.feed(3) await printer.disconnect() asyncio.run(main()) """ def __init__(self) -> None: """Create an empty driver. Transports are attached on first connect.""" self._ble: BleTransport | None = None self._spp: SppTransport | None = None # ------------------------------------------------------------------ # # BLE lifecycle # ------------------------------------------------------------------ #
[docs] async def scan_ble(self, timeout: float = 10.0) -> list[dict]: """Discover nearby BLE devices. :param timeout: Scan duration in seconds. :returns: List of ``{"name", "address"}`` dictionaries. """ if self._ble is None: self._ble = BleTransport() return await self._ble.scan(timeout=timeout)
[docs] async def connect_ble(self, address: str, *, on_disconnect=None) -> None: """Connect to the printer over BLE. Creates the BLE transport lazily on first use. """ if self._ble is None: self._ble = BleTransport(on_disconnect=on_disconnect) await self._ble.connect(address)
[docs] async def disconnect(self) -> None: """Disconnect the BLE client (see :meth:`BleTransport.disconnect`).""" if self._ble is not None: await self._ble.disconnect()
@property def is_connected_ble(self) -> bool: """Whether the BLE transport is currently connected.""" return self._ble is not None and self._ble.is_connected # ------------------------------------------------------------------ # # SPP lifecycle # ------------------------------------------------------------------ #
[docs] def connect_spp(self, address: str, port: int = 1) -> None: """Connect to the printer over classic Bluetooth SPP (Linux only).""" if self._spp is None: self._spp = SppTransport() self._spp.connect(address, port)
[docs] def disconnect_spp(self) -> None: """Close the SPP socket.""" if self._spp is not None: self._spp.disconnect()
@property def is_connected_spp(self) -> bool: """Whether the SPP socket is currently open.""" return self._spp is not None and self._spp.is_connected # ------------------------------------------------------------------ # # BLE high-level API # ------------------------------------------------------------------ #
[docs] async def initialize(self) -> None: """Initialise the printer (``ESC @``) — reset to factory defaults.""" await self._ble.write(commands.init())
[docs] async def print_text( self, text: str, align: Align = Align.LEFT, bold: bool = False, underline: bool = False, size: TextSize = TextSize.NORMAL, newline: bool = True, encoding: str = "gb2312", ) -> None: """Print formatted text over BLE using the native ESC/POS engine. :param text: String to print. :param align: Horizontal alignment. :param bold: Enable emphasised printing. :param underline: Enable underlined printing. :param size: Character size multiplier. :param newline: Append a trailing line feed after the text. :param encoding: Python codec name used for byte encoding. """ await self._ble.write(commands.build_text_block( text, align, bold, underline, size, newline, encoding, ))
[docs] async def print_line(self, char: str = "-", encoding: str = "ascii") -> None: """Print a horizontal rule built from a repeated character. Defaults to roughly 32 characters wide — ``PAPER_WIDTH_PX // 12``, matching Font A metrics. """ line = char * (384 // 12) # PAPER_WIDTH_PX // 12 — approximate column count await self.print_text(line, encoding=encoding)
[docs] async def feed(self, lines: int = 3) -> None: """Feed paper by ``lines`` rows (``ESC d n``).""" await self._ble.write(commands.feed(lines))
[docs] async def cut(self) -> None: """Trigger the cutter (no-op on PT-210, which has no cutter hardware).""" await self._ble.write(commands.cut())
[docs] async def print_qr( self, data: str, size: int = 6, align: Align = Align.CENTER, error_correction: int = 1, ) -> None: """Print a QR code over BLE using the native ESC/POS QR command.""" await self._ble.write(commands.build_qr_block(data, size, align, error_correction))
[docs] async def print_image(self, image, strip_height: int = 24) -> None: """Print a :class:`PIL.Image.Image` over BLE as an ESC/POS raster. The image is split into horizontal strips of ``strip_height`` rows and sent as separate ``GS v 0`` commands. This keeps each payload small enough to fit in the printer's limited buffer and aligns chunk boundaries to raster row width, preventing pixel-shift artifacts. :param image: PIL image in any mode. :param strip_height: Rows per strip (default 24 → 1 152 bytes per strip, safe for cheap hardware with small buffers). """ for strip in raster.image_to_raster_strips(image, strip_height=strip_height): rows = strip[6] | (strip[7] << 8) # from GS v 0 header await self._ble.write_raster_strip(strip, rows=rows)
[docs] async def print_image_from_file(self, path: str) -> None: """Load an image from disk (via :func:`PIL.Image.open`) and print it.""" from PIL import Image await self.print_image(Image.open(path))
[docs] async def print_text_image( self, text: str, font_size: int = 24, font_path: str | None = None, align: Align = Align.LEFT, line_spacing: int = 6, padding: int = 8, supersample: int = 3, dither: bool = True, threshold: int = 140, ) -> None: """Print text as an antialiased bitmap (full Unicode / diacritics). Unlike :meth:`print_text`, this method does not depend on the firmware code page: the text is rendered via PIL with LANCZOS resampling and Floyd–Steinberg dithering, so any installed font produces a readable result. """ img = rendering.render_text_image( text, font_size=font_size, font_path=font_path, line_spacing=line_spacing, padding=padding, align=align, supersample=supersample, dither=dither, threshold=threshold, ) await self.print_image(img)
[docs] async def print_grid( self, columns: list[dict], font_size: int = 22, font_path: str | None = None, padding: int = 6, supersample: int = 3, dither: bool = False, threshold: int = 128, ) -> None: """Print a single row of a multi-column table over BLE. See :func:`goojprt.rendering.grid.render_grid` for the column dictionary schema. """ img = rendering.render_grid( columns, font_size=font_size, font_path=font_path, padding=padding, supersample=supersample, dither=dither, threshold=threshold, ) await self.print_image(img)
[docs] async def print_pdf417( self, data: str, align: Align = Align.CENTER, scale: int = 2, row_height: int = 5, columns: int = 5, padding: int = 10, min_rows: int | None = None, ) -> None: """Print a PDF417 barcode over BLE. The barcode is rendered as a bitmap and padded to the full paper width according to ``align``. """ img = rendering.render_pdf417( data, scale=scale, row_height=row_height, columns=columns, padding=padding, min_rows=min_rows, ) img = raster.pad_image_to_paper_width(img, align) await self.print_image(img)
[docs] async def print_ekg( self, beats: int = 4, height_px: int = 160, line_width: int = 2, grid: bool = True, grid_step_px: int = 32, amplitude: float = 0.82, portrait: bool = False, px_per_beat: int = 240, ) -> None: """Print a synthetic ECG waveform. ``portrait=True`` renders time running down the paper; ``portrait=False`` (default) renders time running across the paper width. """ img = rendering.render_ekg( beats=beats, height_px=height_px, line_width=line_width, grid=grid, grid_step_px=grid_step_px, amplitude=amplitude, portrait=portrait, px_per_beat=px_per_beat, ) await self.print_image(img)
[docs] async def set_charset(self, code_page: CodePage) -> None: """Switch the printer's active code page (``ESC t n``). Note: this only works if the firmware actually ships the chosen table. For guaranteed diacritic support, use :meth:`print_text_image` instead. """ await self._ble.write(commands.charset(code_page))
[docs] async def query_printer_info(self, timeout: float = 2.0) -> dict: """Query model / firmware strings via ``GS I 1 / 2 / 3``. :returns: A dictionary with keys ``model_id``, ``firmware``, ``printer_type`` and ``raw_responses`` — or ``{"error": ...}`` when the printer does not expose a notify characteristic. """ if not self._ble.has_notify: return {"error": "Printer has no notify characteristic; responses cannot be read."} results = {} queries = { "model_id": bytes([0x1D, 0x49, 0x01]), "printer_type": bytes([0x1D, 0x49, 0x03]), "firmware": bytes([0x1D, 0x49, 0x02]), } raw_responses = [] for key, cmd in queries.items(): self._ble.clear_notify_buffer() await self._ble.write(cmd) response = await self._ble.read_notify(timeout / len(queries)) raw_responses.append(list(response)) try: results[key] = response.decode("ascii", errors="replace").strip("\x00").strip() except Exception: results[key] = response.hex() results["raw_responses"] = raw_responses return results
[docs] async def query_full_info(self, timeout: float = 1.5) -> dict: """Read BLE Device Info (0x180A) + Battery (0x180F) + ESC/POS status + ``GS I``. :returns: A dictionary with keys ``ble_device``, ``ble_battery``, ``escpos_status`` and ``escpos_info``. """ info: dict = {} ble_device = {} ble_char_map = { "manufacturer": "00002a29-0000-1000-8000-00805f9b34fb", "model_number": "00002a24-0000-1000-8000-00805f9b34fb", "serial_number": "00002a25-0000-1000-8000-00805f9b34fb", "hardware_rev": "00002a27-0000-1000-8000-00805f9b34fb", "firmware_rev": "00002a26-0000-1000-8000-00805f9b34fb", "software_rev": "00002a28-0000-1000-8000-00805f9b34fb", "system_id": "00002a23-0000-1000-8000-00805f9b34fb", } for key, uuid in ble_char_map.items(): try: raw = await self._ble.read_gatt(uuid) if key == "system_id": ble_device[key] = raw.hex(":") else: ble_device[key] = raw.decode("utf-8", errors="replace").strip("\x00").strip() except Exception: pass info["ble_device"] = ble_device battery: dict = {} try: raw = await self._ble.read_gatt("00002a19-0000-1000-8000-00805f9b34fb") battery["level_pct"] = raw[0] except Exception: pass info["ble_battery"] = battery escpos_status: dict = {} if self._ble.has_notify: dle_queries = { "printer": bytes([0x10, 0x04, 0x01]), "offline": bytes([0x10, 0x04, 0x02]), "error": bytes([0x10, 0x04, 0x03]), "paper": bytes([0x10, 0x04, 0x04]), } for key, cmd in dle_queries.items(): self._ble.clear_notify_buffer() await self._ble.write(cmd) response = await self._ble.read_notify(timeout / len(dle_queries)) if response: byte = response[0] escpos_status[key] = { "raw": byte, "ok": not bool(byte & 0b00101000), } info["escpos_status"] = escpos_status escpos_info = await self.query_printer_info(timeout=timeout) escpos_info.pop("raw_responses", None) info["escpos_info"] = escpos_info return info
[docs] async def probe_charsets( self, test_string: str = "Příliš žluťoučký kůň", pages: list[CodePage] | None = None, ) -> None: """Print ``test_string`` using each code page plus a bitmap reference. Useful for verifying which code pages the firmware actually ships — compare the native outputs against the always-correct bitmap row at the bottom. """ from goojprt.encoding import CODEPAGE_TO_ENCODING if pages is None: pages = [CodePage.PC437, CodePage.PC850, CodePage.PC852, CodePage.WPC1250, CodePage.PC858] await self.initialize() await self.print_text_image("=== PROBE CHARSETS ===", align=Align.CENTER, font_size=20) for cp in pages: encoding = CODEPAGE_TO_ENCODING.get(cp, "latin-1") await self.print_text_image( f"-- CodePage.{cp.name} (ESC t {cp.value}) --", font_size=18, align=Align.LEFT, ) await self._ble.write(commands.charset(cp)) try: encoded = test_string.encode(encoding, errors="replace") except LookupError: encoded = test_string.encode("latin-1", errors="replace") await self._ble.write(encoded + b"\n") await self.print_text_image( "=== Bitmap (always OK) ===\n" + test_string, font_size=22, align=Align.CENTER, ) await self.feed(4)
# ------------------------------------------------------------------ # # SPP high-level API (synchronous, backward-compatible) # ------------------------------------------------------------------ #
[docs] def initialize_spp(self) -> None: """Initialise the printer over SPP (``ESC @``).""" self._spp.write(commands.init())
[docs] def print_text_spp( self, text: str, align: Align = Align.LEFT, bold: bool = False, underline: bool = False, size: TextSize = TextSize.NORMAL, newline: bool = True, encoding: str = "gb2312", ) -> None: """Print text over SPP (shares :func:`commands.build_text_block` with BLE).""" self._spp.write(commands.build_text_block( text, align, bold, underline, size, newline, encoding, ))
[docs] def feed_spp(self, lines: int = 3) -> None: """Feed paper over SPP.""" self._spp.write(commands.feed(lines))
[docs] def cut_spp(self) -> None: """Trigger the cutter over SPP (no-op on PT-210).""" self._spp.write(commands.cut())
[docs] def print_qr_spp( self, data: str, size: int = 6, align: Align = Align.CENTER, error_correction: int = 1, ) -> None: """Print a QR code over SPP (shares :func:`commands.build_qr_block`).""" self._spp.write(commands.build_qr_block(data, size, align, error_correction))
[docs] def print_image_spp(self, image, strip_height: int = 24) -> None: """Print a :class:`PIL.Image.Image` over SPP as an ESC/POS raster. :param image: PIL image in any mode. :param strip_height: Rows per strip (default 24). """ for strip in raster.image_to_raster_strips(image, strip_height=strip_height): rows = strip[6] | (strip[7] << 8) # from GS v 0 header self._spp.write_raster_strip(strip, rows=rows)
[docs] def print_pdf417_spp( self, data: str, align: Align = Align.CENTER, scale: int = 2, row_height: int = 5, columns: int = 5, padding: int = 10, min_rows: int | None = None, ) -> None: """Print a PDF417 barcode over SPP, padded to paper width per ``align``.""" img = rendering.render_pdf417( data, scale=scale, row_height=row_height, columns=columns, padding=padding, min_rows=min_rows, ) img = raster.pad_image_to_paper_width(img, align) self.print_image_spp(img)