Refactor Canvas with new revert(), text(), fill() methods

This commit is contained in:
Yusuf Cihan
2025-04-17 17:53:26 +03:00
parent 3e47b0116e
commit fe3cf155e3
5 changed files with 379 additions and 159 deletions

View File

@ -23,6 +23,7 @@
import asyncio
from io import BytesIO
from pathlib import Path
from sys import stderr
from PIL import Image, ImageChops
from bleak import BleakScanner, BleakClient
from typing import List, TYPE_CHECKING
@ -41,16 +42,14 @@ PRINT_REPLY_UUID = "be3dd652-{uuid}-42f1-99c1-f0f749dd0678".format(uuid = "2b3d"
UNKNOWN_UUID = "be3dd653-{uuid}-42f1-99c1-f0f749dd0678".format(uuid = "2b3d")
def is_espressif(input_mac : str):
def is_espressif(input_mac: str):
"""
Returns True if given MAC address is from a valid DYMO printer.
The mac address blocks are owned by Espressif Inc.
Returns True if given MAC address is from Espressif Inc.
"""
mac_blocks = [
"58:CF:79",
# Confirmed in pull request #2
"DC:54:75",
"34:85:18"
"DC:54:75", # confirmed in #2
"34:85:18", # confirmed in #4
]
check_mac = int(input_mac.replace(":", ""), base = 16)
for mac in mac_blocks:
@ -62,7 +61,7 @@ def is_espressif(input_mac : str):
class Printer:
def __init__(self, impl : "BLEDevice") -> None:
def __init__(self, impl: "BLEDevice") -> None:
self._impl = impl
self._client = BleakClient(self._impl)
@ -76,16 +75,16 @@ class Printer:
return
await self._client.disconnect()
async def print(self, canvas : Canvas):
async def print(self, canvas: Canvas):
if not self._client.is_connected:
raise Exception("Printer is not connected!")
print_request : "BleakGATTCharacteristic" = self._client.services.get_characteristic(PRINT_REQUEST_UUID) # type: ignore # noqa: E501
print_reply : "BleakGATTCharacteristic" = self._client.services.get_characteristic(PRINT_REPLY_UUID) # type: ignore # noqa: E501
future : asyncio.Future[Result] = asyncio.Future()
should_discard : bool = False
print_request: "BleakGATTCharacteristic" = self._client.services.get_characteristic(PRINT_REQUEST_UUID) # type: ignore # noqa: E501
print_reply: "BleakGATTCharacteristic" = self._client.services.get_characteristic(PRINT_REPLY_UUID) # type: ignore # noqa: E501
future: asyncio.Future[Result] = asyncio.Future()
should_discard: bool = False
# Printer sends two messages, first is the PRINTING, and second one is the
# printing result. So we discard the first message.
async def reply_get(_, data : bytearray): # noqa: E501
async def reply_get(_, data: bytearray): # noqa: E501
nonlocal should_discard
result = Result.from_bytes(data)
if (not should_discard) and (result.value in [0, 1]):
@ -99,24 +98,27 @@ class Printer:
return await future
async def discover_printers(max_timeout : int = 5) -> List[Printer]:
async def discover_printers(max_timeout: int = 5, ensure_mac: bool = False) -> List[Printer]:
"""
Searches for printers nearby and returns a list of Printer objects. If no printer
has found in the initial search, waits for scanning until the max timeout has been
reached.
"""
printers : List[Printer] = []
printers: List[Printer] = []
waited_total = 0
async with BleakScanner(service_uuids = [SERVICE_UUID]) as scanner:
while True:
# TODO: In some cases, advetisement data may be non-null, containing
# additional metadata about printer state but it is not implemented yet.
for device, _ in scanner.discovered_devices_and_advertisement_data.values(): # noqa: E501
if not is_espressif(device.address):
continue
# Also match the "Letratag" name just to be sure that we are looking for
# the right device, since there are also other products made by DYMO.
if (device.name or "") == f"Letratag {device.address.replace(':', '')}":
if ensure_mac and (not is_espressif(device.address)):
print(
f"A possible printer is found, but its MAC {device.address} isn't whitelisted, " +
"thus ignored. If it isn't right, either disable MAC checking or open a issue.",
file = stderr
)
continue
printers.append(Printer(device))
# Do we have any candidate printers? If so, return the found printers.
# Otherwise, wait for the next scans until we found any.
@ -130,9 +132,9 @@ async def discover_printers(max_timeout : int = 5) -> List[Printer]:
def convert_image_to_canvas(
image : Image.Image,
dither : bool = True,
trim : bool = False
image: Image.Image,
dither: bool = True,
trim: bool = False
):
"""
Converts an Pillow Image to a Canvas object.
@ -147,14 +149,15 @@ def convert_image_to_canvas(
output = output.crop(diff.getbbox(alpha_only = False))
# Shrink the image from the center if it exceeds the print height,
# or max printable width.
if (output.height > Canvas.HEIGHT):
start_y = int(output.height / 2) - int(Canvas.HEIGHT / 2)
canvas_height = Canvas.BYTES_PER_LINE * 8
if (output.height > canvas_height):
start_y = int(output.height / 2) - int(canvas_height / 2)
output = output.crop(
(0, start_y, output.width, start_y + Canvas.HEIGHT)
(0, start_y, output.width, start_y + canvas_height)
)
elif (output.height < Canvas.HEIGHT):
elif (output.height < canvas_height):
raise ValueError("Image is too small, resizing not implemented.")
if (output.width) > Canvas.MAX_WIDTH:
if (output.width) > Canvas.MAX_LENGTH:
raise ValueError("Image is too large, resizing not implemented.")
# Convert image to pixel array.
canvas = Canvas()
@ -165,7 +168,7 @@ def convert_image_to_canvas(
return canvas
def create_image(path : Path, dither : bool = True):
def create_image(path: Path, dither: bool = True):
"""
Converts an image file in given path to Canvas.
"""
@ -177,7 +180,7 @@ def create_image(path : Path, dither : bool = True):
return convert_image_to_canvas(image, dither)
def create_code_128(text : str):
def create_code_128(text: str):
"""
Creates a Code 128 barcode and dumps to Canvas.
"""