Initial commit

This commit is contained in:
Yusuf Cihan
2024-11-10 19:32:30 +03:00
commit 95ee08244e
12 changed files with 1252 additions and 0 deletions

View File

@@ -0,0 +1,38 @@
# MIT License
#
# Copyright (c) 2024 ysfchn / Yusuf Cihan
#
# Permission is hereby granted, free of charge, to any person obtaining a copy
# of this software and associated documentation files (the "Software"), to deal
# in the Software without restriction, including without limitation the rights
# to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
# copies of the Software, and to permit persons to whom the Software is
# furnished to do so, subject to the following conditions:
#
# The above copyright notice and this permission notice shall be included in all
# copies or substantial portions of the Software.
#
# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
# IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
# FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
# AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
# LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
# OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
# SOFTWARE.
__all__ = [
"discover_printers",
"Canvas",
"Result",
"create_image",
"create_code_128",
"convert_image_to_canvas"
]
from dymo_bluetooth.bluetooth import (
discover_printers,
create_image,
create_code_128,
convert_image_to_canvas
)
from dymo_bluetooth.printer import Canvas, Result

View File

@@ -0,0 +1,78 @@
# MIT License
#
# Copyright (c) 2024 ysfchn / Yusuf Cihan
#
# Permission is hereby granted, free of charge, to any person obtaining a copy
# of this software and associated documentation files (the "Software"), to deal
# in the Software without restriction, including without limitation the rights
# to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
# copies of the Software, and to permit persons to whom the Software is
# furnished to do so, subject to the following conditions:
#
# The above copyright notice and this permission notice shall be included in all
# copies or substantial portions of the Software.
#
# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
# IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
# FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
# AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
# LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
# OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
# SOFTWARE.
from argparse import ArgumentParser, BooleanOptionalAction
from pathlib import Path
from typing import cast
from dymo_bluetooth.bluetooth import discover_printers, create_image
import sys
import asyncio
async def print_image(
input_file : Path,
max_timeout : int,
stretch_factor : int,
use_dither : bool
):
canvas = create_image(input_file, dither = use_dither)
canvas = canvas.stretch_image(factor = stretch_factor)
printers = await discover_printers(max_timeout)
if not printers:
print("Couldn't find any printers in given timeout!")
exit(1)
printer = printers[0]
print(f"Starting to print on {printer._impl.address}...")
await printer.connect()
await printer.print(canvas)
def main():
module_name = cast(str, sys.modules[__name__].__file__).split("/")[-2]
args = ArgumentParser(
prog = f"python -m {module_name}",
description = (
"Print monochrome labels with DYMO LetraTag LT-200B label printer over "
"Bluetooth."
)
)
args.add_argument("image", help = "Image file to print. Must be 32 pixels in height.", type = Path, metavar = "IMAGE") # noqa: E501
args.add_argument("--timeout", default = 5, type = int, help = "Maximum timeout to search for printers.") # noqa: E501
args.add_argument("--dither", default = True, action = BooleanOptionalAction,
help = "Use or don't use dithering when converting it to monochrome image."
)
args.add_argument("--factor", default = 2, type = int,
help = "Stretch the image N times. Default is 2, otherwise printer" + \
"will print images too thin."
)
parsed = args.parse_args()
input_file = cast(Path, parsed.image).absolute()
max_timeout = cast(int, parsed.timeout)
stretch_factor = cast(int, parsed.factor)
use_dither = cast(bool, parsed.dither)
if not input_file.exists():
print(f"Given image file doesn't exists: {input_file.as_posix()}")
exit(1)
asyncio.run(print_image(input_file, max_timeout, stretch_factor, use_dither))
if __name__ == "__main__":
main()

184
dymo_bluetooth/bluetooth.py Normal file
View File

@@ -0,0 +1,184 @@
# MIT License
#
# Copyright (c) 2024 ysfchn / Yusuf Cihan
#
# Permission is hereby granted, free of charge, to any person obtaining a copy
# of this software and associated documentation files (the "Software"), to deal
# in the Software without restriction, including without limitation the rights
# to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
# copies of the Software, and to permit persons to whom the Software is
# furnished to do so, subject to the following conditions:
#
# The above copyright notice and this permission notice shall be included in all
# copies or substantial portions of the Software.
#
# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
# IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
# FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
# AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
# LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
# OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
# SOFTWARE.
import asyncio
from io import BytesIO
from pathlib import Path
from PIL import Image, ImageChops
from bleak import BleakScanner, BleakClient
from typing import List, TYPE_CHECKING
from dymo_bluetooth.printer import Canvas, Result, command_print
if TYPE_CHECKING:
from bleak.backends.device import BLEDevice
from bleak.backends.characteristic import BleakGATTCharacteristic
SERVICE_UUID = "be3dd650-{uuid}-42f1-99c1-f0f749dd0678".format(uuid = "2b3d")
PRINT_REQUEST_UUID = "be3dd651-{uuid}-42f1-99c1-f0f749dd0678".format(uuid = "2b3d")
PRINT_REPLY_UUID = "be3dd652-{uuid}-42f1-99c1-f0f749dd0678".format(uuid = "2b3d")
# Not used in the actual vendor app.
UNKNOWN_UUID = "be3dd653-{uuid}-42f1-99c1-f0f749dd0678".format(uuid = "2b3d")
def is_espressif(mac : str):
"""
Returns True if given MAC address is in the range of Espressif Inc.
"""
start_block = 0x58_CF_79 << 24
end_block = start_block + (1 << 24) # Exclusive
mac_value = int(mac.replace(":", ""), base = 16)
if (mac_value >= start_block) and (mac_value < end_block):
return True
return False
class Printer:
def __init__(self, impl : "BLEDevice") -> None:
self._impl = impl
self._client = BleakClient(self._impl)
async def connect(self):
if self._client.is_connected:
return
await self._client.connect()
async def disconnect(self):
if not self._client.is_connected:
return
await self._client.disconnect()
async def print(self, canvas : Canvas):
if not self._client.is_connected:
raise Exception("Printer is not connected!")
print_request : "BleakGATTCharacteristic" = self._client.services.get_characteristic(PRINT_REQUEST_UUID) # type: ignore # noqa: E501
print_reply : "BleakGATTCharacteristic" = self._client.services.get_characteristic(PRINT_REPLY_UUID) # type: ignore # noqa: E501
future : asyncio.Future[Result] = asyncio.Future()
should_discard : bool = False
# Printer sends two messages, first is the PRINTING, and second one is the
# printing result. So we discard the first message.
async def reply_get(_, data : bytearray): # noqa: E501
nonlocal should_discard
result = Result.from_bytes(data)
if (not should_discard) and (result.value in [0, 1]):
should_discard = True
return
# This is the second reply, which holds the actual status code.
future.set_result(result)
await self._client.start_notify(print_reply, reply_get)
for chunk in command_print(canvas):
await self._client.write_gatt_char(print_request, chunk, True)
return await future
async def discover_printers(max_timeout : int = 5) -> List[Printer]:
"""
Searches for printers nearby and returns a list of Printer objects. If no printer
has found in the initial search, waits for scanning until the max timeout has been
reached.
"""
printers : List[Printer] = []
waited_total = 0
async with BleakScanner(service_uuids = [SERVICE_UUID]) as scanner:
while True:
# TODO: In some cases, advetisement data may be non-null, containing
# additional metadata about printer state but it is not implemented yet.
for device, _ in scanner.discovered_devices_and_advertisement_data.values(): # noqa: E501
if not is_espressif(device.address):
continue
# Also match the "Letratag" name just to be sure that we are looking for
# the right device, since there are also other products made by DYMO.
if (device.name or "") == f"Letratag {device.address.replace(':', '')}":
printers.append(Printer(device))
# Do we have any candidate printers? If so, return the found printers.
# Otherwise, wait for the next scans until we found any.
if printers:
break
elif waited_total >= max_timeout:
return []
await asyncio.sleep(0.5)
waited_total += 0.5
return printers
def convert_image_to_canvas(
image : Image.Image,
dither : bool = True,
trim : bool = False
):
"""
Converts an Pillow Image to a Canvas object.
"""
output = image.convert("1", dither = \
Image.Dither.FLOYDSTEINBERG if dither else Image.Dither.NONE
)
# If trim is enabled, discard trailing and leading blank lines.
if trim:
mask = Image.new("1", output.size, color = 255)
diff = ImageChops.difference(output, mask)
output = output.crop(diff.getbbox(alpha_only = False))
# Shrink the image from the center if it exceeds the print height,
# or max printable width.
if (output.height > Canvas.HEIGHT):
start_y = int(output.height / 2) - int(Canvas.HEIGHT / 2)
output = output.crop(
(0, start_y, output.width, start_y + Canvas.HEIGHT)
)
elif (output.height < Canvas.HEIGHT):
raise ValueError("Image is too small, resizing not implemented.")
if (output.width) > Canvas.MAX_WIDTH:
raise ValueError("Image is too large, resizing not implemented.")
# Convert image to pixel array.
canvas = Canvas()
for w in range(output.width):
for h in range(output.height):
pixel = output.getpixel((w, h, ))
canvas.set_pixel(w, h, not pixel)
return canvas
def create_image(path : Path, dither : bool = True):
"""
Converts an image file in given path to Canvas.
"""
buffer = BytesIO()
with path.open("rb") as op:
buffer.write(op.read())
buffer.seek(0)
image = Image.open(buffer)
return convert_image_to_canvas(image, dither)
def create_code_128(text : str):
"""
Creates a Code 128 barcode and dumps to Canvas.
"""
try:
from barcode import Code128 # type: ignore
from barcode.writer import ImageWriter # type: ignore
except ModuleNotFoundError:
raise Exception("This method requires 'python-barcode' to be installed.")
imwrite = ImageWriter()
imwrite.dpi = 200
code = Code128(text, writer = imwrite)
return convert_image_to_canvas(code.render(text = ""), dither = False, trim = True)

392
dymo_bluetooth/printer.py Normal file
View File

@@ -0,0 +1,392 @@
# MIT License
#
# Copyright (c) 2024 ysfchn / Yusuf Cihan
#
# Permission is hereby granted, free of charge, to any person obtaining a copy
# of this software and associated documentation files (the "Software"), to deal
# in the Software without restriction, including without limitation the rights
# to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
# copies of the Software, and to permit persons to whom the Software is
# furnished to do so, subject to the following conditions:
#
# The above copyright notice and this permission notice shall be included in all
# copies or substantial portions of the Software.
#
# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
# IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
# FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
# AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
# LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
# OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
# SOFTWARE.
from enum import Enum
from io import SEEK_END, BytesIO
from typing import Union
import math
BytesLike = Union[bytes, bytearray]
class DirectiveCommand(Enum):
"""
Directives that accepted by the printer.
"""
START = "s"
MEDIA_TYPE = "M"
PRINT_DENSITY = "C" # Unused
PRINT_DATA = "D"
FORM_FEED = "E"
STATUS = "A"
END = "Q"
def to_bytes(self) -> bytes:
# 27 = ASCII escape sequence
return bytes((27, ord(self.value), ))
class Result(Enum):
# Printing has been completed (supposedly). This may not always mean that a
# label has been printed out from the printer. See FAILED_NO_CASETTE status
# below for details.
SUCCESS = 0
# PRINTING (or SUCCESS) is 1
# Print failed due to some unknown reason.
FAILED = 2
# Printing has been completed but battery is low.
SUCCESS_LOW_BATTERY = 3
# Print failed due to being cancelled.
FAILED_CANCEL = 4
# FAILED (with a different status value) is 5
# Print failed due to the low batteries.
FAILED_LOW_BATTERY = 6
# Failed due to casette not inserted. On my tests, printer never sends this
# status, instead it will spin its gear even casette is not inserted, and
# will send a SUCCESS status instead, so there is no way to check if
# casette is actually inserted.
FAILED_NO_CASETTE = 7
@classmethod
def from_bytes(cls, data : BytesLike):
if data[0] != 27:
raise ValueError("Not a valid result value; 1st byte must be 0x1b (27)")
if chr(data[1]) != "R":
raise ValueError("Not a valid result value; 2nd byte must be 0x52 (82)")
# There is a value 5, which also means printing has completed but has
# a different status value, so we return FAILED since that's what it means.
if data[2] == 5:
return cls(Result.FAILED.value)
# There is a value 1, which also means printing has completed but has
# a different status value, so we return SUCCESS since that's what it means.
if data[2] == 1:
return cls(Result.SUCCESS.value)
return cls(data[2])
class Canvas:
"""
An implementation of 1-bit monochrome little-endian encoded 32 pixel height
images for printing them to the label.
1 byte equals 8 bits, and each bit specifies if pixel should be black (= 1)
or white (= 0). The generated image will contain a multiple of 4 bytes, equaling
the label height in pixels (4 * 8 = 32 pixels).
"""
__slots__ = ("buffer", )
HEIGHT = 32
MAX_WIDTH = 8186
def __init__(self) -> None:
self.buffer = BytesIO()
def get_pixel(self, x : int, y : int):
"""
Gets the pixel in given coordinates.
Returns True if pixel is black, otherwise False.
"""
if y >= Canvas.HEIGHT:
raise ValueError(f"Canvas can't exceed {Canvas.HEIGHT} pixels in height.")
if x >= Canvas.MAX_WIDTH:
raise ValueError(f"Canvas can't exceed {Canvas.MAX_WIDTH} pixels in width.")
# Get the byte containing the pixel value of given coordinates.
x_offset = math.ceil((x * Canvas.HEIGHT) / 8)
y_offset = 3 - math.floor(y / 8)
self.buffer.seek(x_offset + y_offset)
# Check if there is a bit value in given line.
read = self.buffer.read(1)
value = int.from_bytes(read or b"\x00", "little")
is_black = bool(value & (1 << (7 - (y % 8))))
return is_black
def set_pixel(self, x : int, y : int, color : bool):
"""
Sets a pixel to given coordinates.
Setting color to True will paint a black color.
"""
if y >= Canvas.HEIGHT:
raise ValueError(f"Canvas can't exceed {Canvas.HEIGHT} pixels in height.")
if x >= Canvas.MAX_WIDTH:
raise ValueError(f"Canvas can't exceed {Canvas.MAX_WIDTH} pixels in width.")
# Get the byte containing the pixel value of given coordinates.
x_offset = math.ceil((x * Canvas.HEIGHT) / 8)
y_offset = 3 - math.floor(y / 8)
self.buffer.seek(x_offset + y_offset)
# Get the one of four slices in line in the given coordinates. Add the bit in
# given location if color is black, otherwise exclude the bit to make it white.
read = self.buffer.read(1)
value = int.from_bytes(read or b"\x00", "little")
if color:
value = value | (1 << (7 - (y % 8)))
else:
value = value & ~(1 << (7 - (y % 8)))
# Change the current part of the line with modified one.
self.buffer.seek(x_offset + y_offset)
self.buffer.write(bytes([value]))
def stretch_image(self, factor : int = 2):
"""
Stretches image to the right in N times, and returns a new Canvas
containing the stretched image. Factor 1 results in the same image.
"""
if factor < 1:
raise ValueError("Stretch factor must be at least 1!")
canvas = Canvas()
for x in range(self.get_width()):
for y in range(Canvas.HEIGHT):
pixel = self.get_pixel(x, y)
start_x = (x * factor)
for i in range(factor):
canvas.set_pixel(start_x + i, y, pixel)
return canvas
def pad_image(self, width : int):
"""
Adds blank spacing to both sides of the image, until the image
reaches to the given width. Returns a new Canvas containing the
modified image. If image is already longer than given width, it
will return the current Canvas.
"""
current_width = self.get_width()
if current_width >= width:
return self
canvas = Canvas()
left_padding = math.ceil(width / 2)
for x in range(current_width):
for y in range(Canvas.HEIGHT):
canvas.set_pixel(left_padding + x, y, self.get_pixel(x, y))
for i in range(Canvas.HEIGHT):
canvas.set_pixel(current_width + left_padding, i, False)
return canvas
def print(self):
"""
Prints the image to the console.
"""
for y in range(Canvas.HEIGHT):
for x in range(self.get_width()):
print("" if self.get_pixel(x, y) else "", end = "")
print()
def get_width(self):
"""
Gets the painted width of the image.
"""
self.buffer.seek(0, SEEK_END)
cur = self.buffer.tell()
return math.ceil(cur / 4)
def get_size(self):
"""
Gets the width and height of the image in tuple.
"""
return (self.get_width(), Canvas.HEIGHT, )
def get_image(self):
"""
Gets the created image with added blank padding.
"""
self.buffer.seek(0)
image = self.buffer.read()
return image + (b"\x00" * (self.buffer.tell() % 4))
def empty(self):
"""
Makes all pixels in the canvas in white. Canvas size won't be changed.
"""
self.buffer.seek(0, SEEK_END)
byte_count = self.buffer.tell()
self.buffer.seek(0)
self.buffer.truncate(0)
self.buffer.seek(byte_count)
self.buffer.write(b"\x00")
def copy(self):
"""
Creates a copy of this canvas.
"""
canvas = Canvas()
self.buffer.seek(0)
canvas.buffer.write(self.buffer.read())
return canvas
def clear(self):
"""
Clears the canvas. Canvas size will be changed to 0.
"""
self.buffer.seek(0)
self.buffer.truncate(0)
def __eq__(self, value: object, /) -> bool:
if not isinstance(value, Canvas):
return False
return value.get_image() == self.get_image()
def __repr__(self) -> str:
self.buffer.seek(0, SEEK_END)
byte_size = self.buffer.tell()
image_size = "x".join(map(str, self.get_size()))
return f"<{self.__class__.__name__} image={image_size} bytes={byte_size}>"
class DirectiveBuilder:
"""
Builds directives for the printer.
"""
@staticmethod
def start():
# [154, 2, 0, 0] is the "job ID".
# Without that, printer won't print anything but a small blank label.
# This the only "job ID" that the printer uses in start directive, it is not
# related with some sort of queue or anything else, just a constant value.
return bytes([*DirectiveCommand.START.to_bytes(), 154, 2, 0, 0])
@staticmethod
def media_type(value : int):
return bytes([*DirectiveCommand.MEDIA_TYPE.to_bytes(), value])
@staticmethod
def form_feed():
return bytes(DirectiveCommand.FORM_FEED.to_bytes())
@staticmethod
def status():
return bytes(DirectiveCommand.STATUS.to_bytes())
@staticmethod
def end():
return bytes(DirectiveCommand.END.to_bytes())
@staticmethod
def print(
data : BytesLike,
image_width : int,
bits_per_pixel : int,
alignment : int
):
size = \
image_width.to_bytes(4, "little") + \
Canvas.HEIGHT.to_bytes(4, "little")
return bytes([
*DirectiveCommand.PRINT_DATA.to_bytes(),
bits_per_pixel,
alignment,
*size,
*data
])
def create_payload(data : BytesLike, is_print : bool = False):
"""
Creates a final payload to be sent to the printer from the input data.
Each iteration of this generator will yield the bytes that needs to be sent over
the Bluetooth for each GATT write transaction.
"""
# Longer inputs needs to be splitted.
CHUNK_SIZE = 500
# Magic value.
MAGIC = [18, 52]
# Length of the data in 4 bytes.
length = len(data).to_bytes(4, "little")
# byte[9] = [255, 240, 18, 52, ...LENGTH{4}, CHECKSUM]
header = bytearray([
255, # Preamble
240, # Flags
*MAGIC,
*length
])
# For checksum, we get the sum of all bytes, then get the first byte of the sum.
checksum = sum(header) & 0xFF
header.append(checksum)
assert len(header) == 9, "Header must be 9 bytes"
# Payloads other than writing doesn't require chunking,
# so the input data can be added as-is.
if not is_print:
header.extend(data)
yield header
# However, write payloads must be chunked properly.
else:
# First yield the header.
yield header
# Split chunk in each 500 bytes.
for index, step in enumerate(range(0, len(data), CHUNK_SIZE)):
current_chunk = bytearray()
chunk = data[step : step + CHUNK_SIZE]
# TODO: Not sure what is the purpose of the this, but the original
# vendor app skips this index, so we do the same here.
chunk_index = index + 1 if index >= 27 else index
current_chunk.append(chunk_index)
current_chunk.extend(chunk)
# If this is the last chunk, append MAGIC to the end.
if (step + CHUNK_SIZE) >= len(data):
current_chunk.extend(MAGIC)
yield current_chunk
def command_print(
canvas : Canvas
):
"""
Creates a print directive.
"""
payload = bytearray()
payload.extend(DirectiveBuilder.start())
payload.extend(DirectiveBuilder.print(
canvas.get_image(),
canvas.get_width(),
bits_per_pixel = 1,
alignment = 2
))
payload.extend(DirectiveBuilder.form_feed())
payload.extend(DirectiveBuilder.status())
payload.extend(DirectiveBuilder.end())
return create_payload(payload, is_print = True)
def command_casette(
media_type : int
):
"""
Creates a casette directive.
"""
payload = bytearray()
payload.extend(DirectiveBuilder.start())
payload.extend(DirectiveBuilder.media_type(media_type))
payload.extend(DirectiveBuilder.end())
return create_payload(payload)