493 lines
16 KiB
Python

# MIT License
#
# Copyright (c) 2024 ysfchn / Yusuf Cihan
#
# Permission is hereby granted, free of charge, to any person obtaining a copy
# of this software and associated documentation files (the "Software"), to deal
# in the Software without restriction, including without limitation the rights
# to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
# copies of the Software, and to permit persons to whom the Software is
# furnished to do so, subject to the following conditions:
#
# The above copyright notice and this permission notice shall be included in all
# copies or substantial portions of the Software.
#
# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
# IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
# FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
# AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
# LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
# OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
# SOFTWARE.
from enum import Enum
from io import SEEK_END, BytesIO
from typing import Literal, Sequence, Tuple
import math
class DirectiveCommand(Enum):
"""
Directives that accepted by the printer.
"""
START = "s"
MEDIA_TYPE = "M"
PRINT_DENSITY = "C" # Unused
PRINT_DATA = "D"
FORM_FEED = "E"
STATUS = "A"
END = "Q"
def to_bytes(self) -> bytes:
# 27 = ASCII escape sequence
return bytes((27, ord(self.value), ))
class Result(Enum):
# Printing has been completed (supposedly). This may not always mean that a
# label has been printed out from the printer. See FAILED_NO_CASETTE status
# below for details.
SUCCESS = 0
# PRINTING (or SUCCESS) is 1
# Print failed due to some unknown reason.
FAILED = 2
# Printing has been completed but battery is low.
SUCCESS_LOW_BATTERY = 3
# Print failed due to being cancelled.
FAILED_CANCEL = 4
# FAILED (with a different status value) is 5
# Print failed due to the low batteries.
FAILED_LOW_BATTERY = 6
# Failed due to casette not inserted. On my tests, printer never sends this
# status, instead it will spin its gear even casette is not inserted, and
# will send a SUCCESS status instead, so there is no way to check if
# casette is actually inserted.
FAILED_NO_CASETTE = 7
@classmethod
def from_bytes(cls, data: Sequence[int]):
if data[0] != 27:
raise ValueError("Not a valid result value; 1st byte must be 0x1b (27)")
if chr(data[1]) != "R":
raise ValueError("Not a valid result value; 2nd byte must be 0x52 (82)")
# There is a value 5, which also means printing has completed but has
# a different status value, so we return FAILED since that's what it means.
if data[2] == 5:
return cls(Result.FAILED.value)
# There is a value 1, which also means printing has completed but has
# a different status value, so we return SUCCESS since that's what it means.
if data[2] == 1:
return cls(Result.SUCCESS.value)
return cls(data[2])
class Canvas:
"""
An implementation of 1-bit monochrome horizontal images, sequentially
ordered and stored in big-endian for each pixel.
1 byte equals 8 bits, and each bit specifies if pixel should be filled
(= 1) or not (= 0).
"""
__slots__ = ("buffer", )
# Each line takes 4 bytes, equals to 32 pixels (1 byte = 8 bits).
BYTES_PER_LINE = 4
# Maximum count of bytes that the image can extend into the fixed direction.
MAX_LENGTH = 1024
def __init__(self) -> None:
self.buffer = BytesIO()
def get_pixel(self, x: int, y: int) -> bool:
"""
Gets the pixel in given coordinates.
Returns True if pixel is filled (= black), otherwise False.
"""
self._raise_if_out_bounds(x, y)
# Get the byte containing the pixel value of given coordinates.
x_offset = x * Canvas.BYTES_PER_LINE
y_offset = Canvas.BYTES_PER_LINE - 1 - math.floor(y / 8)
self.buffer.seek(x_offset + y_offset)
# Check if there is a bit value in given line.
value = (self.buffer.read(1) or b"\x00")[0]
is_black = bool(value & (1 << (7 - (y % 8))))
return is_black
def set_pixel(self, x: int, y: int, color: Literal[True, False, 0, 1]) -> None:
"""
Sets a pixel to given coordinates.
Setting color to True will paint a black color.
"""
self._raise_if_out_bounds(x, y)
# Get the byte containing the pixel value of given coordinates.
x_offset = x * Canvas.BYTES_PER_LINE
y_offset = Canvas.BYTES_PER_LINE - 1 - math.floor(y / 8)
self.buffer.seek(x_offset + y_offset)
# Get the one of four slices in line in the given coordinates. Add the bit in
# given location if color is black, otherwise exclude the bit to make it white.
curr = self.buffer.read(1)
value = (curr or b"\x00")[0]
if color:
value = value | (1 << (7 - (y % 8)))
else:
value = value & ~(1 << (7 - (y % 8)))
# Change the current byte with modified one, if not exists, append a new one.
self.buffer.seek(0 if not curr else -1, 1)
self.buffer.write(bytes([value]))
def stretch(self, factor: int = 2) -> "Canvas":
"""
Stretches image to the non-fixed direction in N times, and returns a new
Canvas containing the stretched image. Factor 1 results in the same image.
"""
if factor < 1:
raise ValueError("Stretch factor must be at least 1!")
canvas = Canvas()
for x in range(self.width):
for y in range(self.height):
pixel = self.get_pixel(x, y)
start_x = (x * factor)
for i in range(factor):
canvas.set_pixel(start_x + i, y, pixel)
return canvas
def _get_byte_size(self):
self.buffer.seek(0, SEEK_END)
return self.buffer.tell()
def _get_unfixed_pixels(self):
return math.ceil(self._get_byte_size() / self.BYTES_PER_LINE)
def _get_fixed_pixels(self):
return self.BYTES_PER_LINE * 8
def _raise_if_out_bounds(self, x: int, y: int):
if (x < 0) or (y < 0):
raise ValueError("Canvas positions can't be negative.")
bits = Canvas.BYTES_PER_LINE * 8
maxbits = Canvas.MAX_LENGTH * 8
if y >= bits:
raise ValueError(f"Canvas can't be or exceed {bits} pixels in height.")
elif x >= maxbits:
raise ValueError(f"Canvas can't be or exceed {maxbits} pixels in width.")
@property
def height(self) -> int:
"""
Gets the height of the image.
"""
return self._get_fixed_pixels()
@property
def width(self) -> int:
"""
Gets the width of the image.
"""
return self._get_unfixed_pixels()
@property
def size(self) -> Tuple[int, int]:
"""
Gets the width and height of the image in tuple.
"""
return (self.width, self.height, )
def get_image(self) -> bytes:
"""
Gets the created image with added blank padding.
"""
self.buffer.seek(0)
image = self.buffer.read()
return image + (b"\x00" * (self.buffer.tell() % self.BYTES_PER_LINE))
def empty(self):
"""
Makes all pixels in the canvas in blank (= white). Canvas size won't be changed.
"""
size = self._get_byte_size()
self.buffer.seek(0)
self.buffer.truncate(0)
self.buffer.seek(size)
self.buffer.write(b"\x00")
def copy(self) -> "Canvas":
"""
Creates a copy of this canvas.
"""
canvas = Canvas()
self.buffer.seek(0)
canvas.buffer.write(self.buffer.read())
return canvas
def clear(self):
"""
Clears the canvas. Canvas size will be changed to 0.
"""
self.buffer.seek(0)
self.buffer.truncate(0)
def revert(self):
"""
Returns a new Canvas with the image is flipped in color; all unfilled
pixels are filled, and all filled pixels are unfilled.
"""
self.buffer.seek(0)
copied = Canvas()
copied.buffer.seek(0)
while (byt := self.buffer.read(1)):
copied.buffer.write(bytes([byt[0] ^ 0xFF]))
return copied
def fill(self, to_left: int, to_right: int) -> "Canvas":
"""
Returns a new Canvas with blank (= white) spacing added to both sides.
"""
canv = Canvas()
canv.buffer.seek(0)
canv.buffer.write(bytes(to_left * self.BYTES_PER_LINE))
canv.buffer.write(self.get_image())
canv.buffer.seek(0, SEEK_END)
canv.buffer.write(bytes(to_right * self.BYTES_PER_LINE))
return canv
def pad(self, until: int) -> "Canvas":
"""
Adds blank (= white) spacing to both sides of the image, until the image reaches
to the given width in pixels. Returns a new Canvas containing the modified image.
If image is already longer than given width, returns the copy of the same image.
"""
if self.width >= until:
return self.copy()
side = math.ceil((until - self.width) / 2)
return self.fill(side, side)
def text(self, in_quad: bool = True, blank_char: int = 0x20, frame: bool = True) -> str:
"""
Converts the image to a string of unicode block symbols, so it can be printed
out to the terminal. If `in_quad` is True, quarter block characters will be used,
so the image will be 2 times smaller, making each 4 pixel to take up a single
character. Empty pixels will be represented as `blank_char` (default is SPACE).
"""
lines = []
frame_length = 0
if frame:
frame_length = self.width if not in_quad else math.ceil(self.width / 2)
if frame_length:
lines.append(chr(0x250C) + (frame_length * chr(0x2500)) + chr(0x2510))
if in_quad:
for h in range(0, self.height, 2):
line = ""
for w in range(0, self.width, 2):
corners = \
self.get_pixel(w, h), \
self.get_pixel(w + 1, h), \
self.get_pixel(w, h + 1), \
self.get_pixel(w + 1, h + 1)
corners = sum([1 << x if corners[x] else 0 for x in range(4)])
line += chr(quartet_to_char(corners) or blank_char)
if frame:
line = chr(0x2502) + line + chr(0x2502)
lines.append(line)
else:
lines = []
for h in range(0, self.height):
line = ""
for w in range(0, self.width):
line += chr(0x2588 if self.get_pixel(w, h) else blank_char)
if frame:
line = chr(0x2502) + line + chr(0x2502)
lines.append(line)
if frame_length:
lines.append(chr(0x2514) + (frame_length * chr(0x2500)) + chr(0x2518))
return "\n".join(lines)
def __len__(self):
return self._get_byte_size()
def __str__(self) -> str:
return self.text(in_quad = True)
def __eq__(self, value: object, /) -> bool:
if not isinstance(value, Canvas):
return False
return value.get_image() == self.get_image()
def __repr__(self) -> str:
w, h = self.size
return f"<{self.__class__.__name__} size={w}x{h} length={self.__len__()}>"
def quartet_to_char(char: int):
"""
Gets a unicode block symbol code for a 4 bit value (0x0 to 0xF), each bit representing
a corner of the 4 pixels, top left (1 << 0), top right (1 << 1), bottom left (1 << 2)
and bottom right (1 << 3) respectively.
"""
if (char > 0xF) or (char < 0x0):
raise ValueError("Invalid character.")
# https://en.wikipedia.org/wiki/Block_Elements
data = {
0x0: 0x0000, # Blank
0x1: 0x2598, # 0001
0x2: 0x259D, # 0010
0x3: 0x2580, # 0011
0x4: 0x2596, # 0100
0x5: 0x258C, # 0101
0x6: 0x259E, # 0110
0x7: 0x259B, # 0111
0x8: 0x2597, # 1000
0x9: 0x259A, # 1001
0xA: 0x2590, # 1010
0xB: 0x259C, # 1011
0xC: 0x2584, # 1100
0xD: 0x2599, # 1101
0xE: 0x259F, # 1110
0xF: 0x2588, # 1111
}
return data[char]
class DirectiveBuilder:
"""
Builds directives for the printer.
"""
@staticmethod
def start():
# [154, 2, 0, 0] is the "job ID".
# Without that, printer won't print anything but a small blank label.
# This the only "job ID" that the printer uses in start directive, it is not
# related with some sort of queue or anything else, just a constant value.
return bytes([*DirectiveCommand.START.to_bytes(), 154, 2, 0, 0])
@staticmethod
def media_type(value: int):
return bytes([*DirectiveCommand.MEDIA_TYPE.to_bytes(), value])
@staticmethod
def form_feed():
return bytes(DirectiveCommand.FORM_FEED.to_bytes())
@staticmethod
def status():
return bytes(DirectiveCommand.STATUS.to_bytes())
@staticmethod
def end():
return bytes(DirectiveCommand.END.to_bytes())
@staticmethod
def print(
data: Sequence[int],
image_width: int,
image_height: int,
bits_per_pixel: int,
alignment: int
):
size = \
image_width.to_bytes(4, "little") + \
image_height.to_bytes(4, "little")
return bytes([
*DirectiveCommand.PRINT_DATA.to_bytes(),
bits_per_pixel,
alignment,
*size,
*data
])
def create_payload(data: Sequence[int], is_print: bool = False):
"""
Creates a final payload to be sent to the printer from the input data.
Each iteration of this generator will yield the bytes that needs to be sent over
the Bluetooth for each GATT write transaction.
"""
# Longer inputs needs to be splitted.
CHUNK_SIZE = 500
# Magic value.
MAGIC = [18, 52]
# Length of the data in 4 bytes.
length = len(data).to_bytes(4, "little")
# byte[9] = [255, 240, 18, 52, ...LENGTH{4}, CHECKSUM]
header = bytearray([
255, # Preamble
240, # Flags
*MAGIC,
*length
])
# For checksum, we get the sum of all bytes, then get the first byte of the sum.
checksum = sum(header) & 0xFF
header.append(checksum)
assert len(header) == 9, "Header must be 9 bytes"
# Payloads other than writing doesn't require chunking,
# so the input data can be added as-is.
if not is_print:
header.extend(data)
yield header
# However, write payloads must be chunked properly.
else:
# First yield the header.
yield header
# Split chunk in each 500 bytes.
for index, step in enumerate(range(0, len(data), CHUNK_SIZE)):
current_chunk = bytearray()
chunk = data[step: step + CHUNK_SIZE]
# TODO: Not sure what is the purpose of the this, but the original
# vendor app skips this index, so we do the same here.
chunk_index = index + 1 if index >= 27 else index
current_chunk.append(chunk_index)
current_chunk.extend(chunk)
# If this is the last chunk, append MAGIC to the end.
if (step + CHUNK_SIZE) >= len(data):
current_chunk.extend(MAGIC)
yield current_chunk
def command_print(
canvas: Canvas
):
"""
Creates a print directive.
"""
payload = bytearray()
payload.extend(DirectiveBuilder.start())
payload.extend(DirectiveBuilder.print(
canvas.get_image(),
canvas.width,
canvas.height,
bits_per_pixel = 1,
alignment = 2
))
payload.extend(DirectiveBuilder.form_feed())
payload.extend(DirectiveBuilder.status())
payload.extend(DirectiveBuilder.end())
return create_payload(payload, is_print = True)
def command_casette(
media_type: int
):
"""
Creates a casette directive.
"""
payload = bytearray()
payload.extend(DirectiveBuilder.start())
payload.extend(DirectiveBuilder.media_type(media_type))
payload.extend(DirectiveBuilder.end())
return create_payload(payload)