# MIT License # # Copyright (c) 2024 ysfchn / Yusuf Cihan # # Permission is hereby granted, free of charge, to any person obtaining a copy # of this software and associated documentation files (the "Software"), to deal # in the Software without restriction, including without limitation the rights # to use, copy, modify, merge, publish, distribute, sublicense, and/or sell # copies of the Software, and to permit persons to whom the Software is # furnished to do so, subject to the following conditions: # # The above copyright notice and this permission notice shall be included in all # copies or substantial portions of the Software. # # THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR # IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, # FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE # AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER # LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, # OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE # SOFTWARE. from enum import Enum from io import SEEK_END, BytesIO from typing import Literal, Sequence, Tuple import math class DirectiveCommand(Enum): """ Directives that accepted by the printer. """ START = "s" MEDIA_TYPE = "M" PRINT_DENSITY = "C" # Unused PRINT_DATA = "D" FORM_FEED = "E" STATUS = "A" END = "Q" def to_bytes(self) -> bytes: # 27 = ASCII escape sequence return bytes((27, ord(self.value), )) class Result(Enum): # Printing has been completed (supposedly). This may not always mean that a # label has been printed out from the printer. See FAILED_NO_CASETTE status # below for details. SUCCESS = 0 # PRINTING (or SUCCESS) is 1 # Print failed due to some unknown reason. FAILED = 2 # Printing has been completed but battery is low. SUCCESS_LOW_BATTERY = 3 # Print failed due to being cancelled. FAILED_CANCEL = 4 # FAILED (with a different status value) is 5 # Print failed due to the low batteries. FAILED_LOW_BATTERY = 6 # Failed due to casette not inserted. On my tests, printer never sends this # status, instead it will spin its gear even casette is not inserted, and # will send a SUCCESS status instead, so there is no way to check if # casette is actually inserted. FAILED_NO_CASETTE = 7 @classmethod def from_bytes(cls, data: Sequence[int]): if data[0] != 27: raise ValueError("Not a valid result value; 1st byte must be 0x1b (27)") if chr(data[1]) != "R": raise ValueError("Not a valid result value; 2nd byte must be 0x52 (82)") # There is a value 5, which also means printing has completed but has # a different status value, so we return FAILED since that's what it means. if data[2] == 5: return cls(Result.FAILED.value) # There is a value 1, which also means printing has completed but has # a different status value, so we return SUCCESS since that's what it means. if data[2] == 1: return cls(Result.SUCCESS.value) return cls(data[2]) class Canvas: """ An implementation of 1-bit monochrome horizontal images, sequentially ordered and stored in big-endian for each pixel. 1 byte equals 8 bits, and each bit specifies if pixel should be filled (= 1) or not (= 0). """ __slots__ = ("buffer", ) # Each line takes 4 bytes, equals to 32 pixels (1 byte = 8 bits). BYTES_PER_LINE = 4 # Maximum count of bytes that the image can extend into the fixed direction. MAX_LENGTH = 1024 def __init__(self) -> None: self.buffer = BytesIO() def get_pixel(self, x: int, y: int) -> bool: """ Gets the pixel in given coordinates. Returns True if pixel is filled (= black), otherwise False. """ self._raise_if_out_bounds(x, y) # Get the byte containing the pixel value of given coordinates. x_offset = x * Canvas.BYTES_PER_LINE y_offset = Canvas.BYTES_PER_LINE - 1 - math.floor(y / 8) self.buffer.seek(x_offset + y_offset) # Check if there is a bit value in given line. value = (self.buffer.read(1) or b"\x00")[0] is_black = bool(value & (1 << (7 - (y % 8)))) return is_black def set_pixel(self, x: int, y: int, color: Literal[True, False, 0, 1]) -> None: """ Sets a pixel to given coordinates. Setting color to True will paint a black color. """ self._raise_if_out_bounds(x, y) # Get the byte containing the pixel value of given coordinates. x_offset = x * Canvas.BYTES_PER_LINE y_offset = Canvas.BYTES_PER_LINE - 1 - math.floor(y / 8) self.buffer.seek(x_offset + y_offset) # Get the one of four slices in line in the given coordinates. Add the bit in # given location if color is black, otherwise exclude the bit to make it white. curr = self.buffer.read(1) value = (curr or b"\x00")[0] if color: value = value | (1 << (7 - (y % 8))) else: value = value & ~(1 << (7 - (y % 8))) # Change the current byte with modified one, if not exists, append a new one. self.buffer.seek(0 if not curr else -1, 1) self.buffer.write(bytes([value])) def stretch(self, factor: int = 2) -> "Canvas": """ Stretches image to the non-fixed direction in N times, and returns a new Canvas containing the stretched image. Factor 1 results in the same image. """ if factor < 1: raise ValueError("Stretch factor must be at least 1!") canvas = Canvas() for x in range(self.width): for y in range(self.height): pixel = self.get_pixel(x, y) start_x = (x * factor) for i in range(factor): canvas.set_pixel(start_x + i, y, pixel) return canvas def _get_byte_size(self): self.buffer.seek(0, SEEK_END) return self.buffer.tell() def _get_unfixed_pixels(self): return math.ceil(self._get_byte_size() / self.BYTES_PER_LINE) def _get_fixed_pixels(self): return self.BYTES_PER_LINE * 8 def _raise_if_out_bounds(self, x: int, y: int): if (x < 0) or (y < 0): raise ValueError("Canvas positions can't be negative.") bits = Canvas.BYTES_PER_LINE * 8 maxbits = Canvas.MAX_LENGTH * 8 if y >= bits: raise ValueError(f"Canvas can't be or exceed {bits} pixels in height.") elif x >= maxbits: raise ValueError(f"Canvas can't be or exceed {maxbits} pixels in width.") @property def height(self) -> int: """ Gets the height of the image. """ return self._get_fixed_pixels() @property def width(self) -> int: """ Gets the width of the image. """ return self._get_unfixed_pixels() @property def size(self) -> Tuple[int, int]: """ Gets the width and height of the image in tuple. """ return (self.width, self.height, ) def get_image(self) -> bytes: """ Gets the created image with added blank padding. """ self.buffer.seek(0) image = self.buffer.read() return image + (b"\x00" * (self.buffer.tell() % self.BYTES_PER_LINE)) def empty(self): """ Makes all pixels in the canvas in blank (= white). Canvas size won't be changed. """ size = self._get_byte_size() self.buffer.seek(0) self.buffer.truncate(0) self.buffer.seek(size) self.buffer.write(b"\x00") def copy(self) -> "Canvas": """ Creates a copy of this canvas. """ canvas = Canvas() self.buffer.seek(0) canvas.buffer.write(self.buffer.read()) return canvas def clear(self): """ Clears the canvas. Canvas size will be changed to 0. """ self.buffer.seek(0) self.buffer.truncate(0) def revert(self): """ Returns a new Canvas with the image is flipped in color; all unfilled pixels are filled, and all filled pixels are unfilled. """ self.buffer.seek(0) copied = Canvas() copied.buffer.seek(0) while (byt := self.buffer.read(1)): copied.buffer.write(bytes([byt[0] ^ 0xFF])) return copied def fill(self, to_left: int, to_right: int) -> "Canvas": """ Returns a new Canvas with blank (= white) spacing added to both sides. """ canv = Canvas() canv.buffer.seek(0) canv.buffer.write(bytes(to_left * self.BYTES_PER_LINE)) canv.buffer.write(self.get_image()) canv.buffer.seek(0, SEEK_END) canv.buffer.write(bytes(to_right * self.BYTES_PER_LINE)) return canv def pad(self, until: int) -> "Canvas": """ Adds blank (= white) spacing to both sides of the image, until the image reaches to the given width in pixels. Returns a new Canvas containing the modified image. If image is already longer than given width, returns the copy of the same image. """ if self.width >= until: return self.copy() side = math.ceil((until - self.width) / 2) return self.fill(side, side) def text(self, in_quad: bool = True, blank_char: int = 0x20, frame: bool = True) -> str: """ Converts the image to a string of unicode block symbols, so it can be printed out to the terminal. If `in_quad` is True, quarter block characters will be used, so the image will be 2 times smaller, making each 4 pixel to take up a single character. Empty pixels will be represented as `blank_char` (default is SPACE). """ lines = [] frame_length = 0 if frame: frame_length = self.width if not in_quad else math.ceil(self.width / 2) if frame_length: lines.append(chr(0x250C) + (frame_length * chr(0x2500)) + chr(0x2510)) if in_quad: for h in range(0, self.height, 2): line = "" for w in range(0, self.width, 2): corners = \ self.get_pixel(w, h), \ self.get_pixel(w + 1, h), \ self.get_pixel(w, h + 1), \ self.get_pixel(w + 1, h + 1) corners = sum([1 << x if corners[x] else 0 for x in range(4)]) line += chr(quartet_to_char(corners) or blank_char) if frame: line = chr(0x2502) + line + chr(0x2502) lines.append(line) else: lines = [] for h in range(0, self.height): line = "" for w in range(0, self.width): line += chr(0x2588 if self.get_pixel(w, h) else blank_char) if frame: line = chr(0x2502) + line + chr(0x2502) lines.append(line) if frame_length: lines.append(chr(0x2514) + (frame_length * chr(0x2500)) + chr(0x2518)) return "\n".join(lines) def __len__(self): return self._get_byte_size() def __str__(self) -> str: return self.text(in_quad = True) def __eq__(self, value: object, /) -> bool: if not isinstance(value, Canvas): return False return value.get_image() == self.get_image() def __repr__(self) -> str: w, h = self.size return f"<{self.__class__.__name__} size={w}x{h} length={self.__len__()}>" def quartet_to_char(char: int): """ Gets a unicode block symbol code for a 4 bit value (0x0 to 0xF), each bit representing a corner of the 4 pixels, top left (1 << 0), top right (1 << 1), bottom left (1 << 2) and bottom right (1 << 3) respectively. """ if (char > 0xF) or (char < 0x0): raise ValueError("Invalid character.") # https://en.wikipedia.org/wiki/Block_Elements data = { 0x0: 0x0000, # Blank 0x1: 0x2598, # 0001 0x2: 0x259D, # 0010 0x3: 0x2580, # 0011 0x4: 0x2596, # 0100 0x5: 0x258C, # 0101 0x6: 0x259E, # 0110 0x7: 0x259B, # 0111 0x8: 0x2597, # 1000 0x9: 0x259A, # 1001 0xA: 0x2590, # 1010 0xB: 0x259C, # 1011 0xC: 0x2584, # 1100 0xD: 0x2599, # 1101 0xE: 0x259F, # 1110 0xF: 0x2588, # 1111 } return data[char] class DirectiveBuilder: """ Builds directives for the printer. """ @staticmethod def start(): # [154, 2, 0, 0] is the "job ID". # Without that, printer won't print anything but a small blank label. # This the only "job ID" that the printer uses in start directive, it is not # related with some sort of queue or anything else, just a constant value. return bytes([*DirectiveCommand.START.to_bytes(), 154, 2, 0, 0]) @staticmethod def media_type(value: int): return bytes([*DirectiveCommand.MEDIA_TYPE.to_bytes(), value]) @staticmethod def form_feed(): return bytes(DirectiveCommand.FORM_FEED.to_bytes()) @staticmethod def status(): return bytes(DirectiveCommand.STATUS.to_bytes()) @staticmethod def end(): return bytes(DirectiveCommand.END.to_bytes()) @staticmethod def print( data: Sequence[int], image_width: int, image_height: int, bits_per_pixel: int, alignment: int ): size = \ image_width.to_bytes(4, "little") + \ image_height.to_bytes(4, "little") return bytes([ *DirectiveCommand.PRINT_DATA.to_bytes(), bits_per_pixel, alignment, *size, *data ]) def create_payload(data: Sequence[int], is_print: bool = False): """ Creates a final payload to be sent to the printer from the input data. Each iteration of this generator will yield the bytes that needs to be sent over the Bluetooth for each GATT write transaction. """ # Longer inputs needs to be splitted. CHUNK_SIZE = 500 # Magic value. MAGIC = [18, 52] # Length of the data in 4 bytes. length = len(data).to_bytes(4, "little") # byte[9] = [255, 240, 18, 52, ...LENGTH{4}, CHECKSUM] header = bytearray([ 255, # Preamble 240, # Flags *MAGIC, *length ]) # For checksum, we get the sum of all bytes, then get the first byte of the sum. checksum = sum(header) & 0xFF header.append(checksum) assert len(header) == 9, "Header must be 9 bytes" # Payloads other than writing doesn't require chunking, # so the input data can be added as-is. if not is_print: header.extend(data) yield header # However, write payloads must be chunked properly. else: # First yield the header. yield header # Split chunk in each 500 bytes. for index, step in enumerate(range(0, len(data), CHUNK_SIZE)): current_chunk = bytearray() chunk = data[step: step + CHUNK_SIZE] # TODO: Not sure what is the purpose of the this, but the original # vendor app skips this index, so we do the same here. chunk_index = index + 1 if index >= 27 else index current_chunk.append(chunk_index) current_chunk.extend(chunk) # If this is the last chunk, append MAGIC to the end. if (step + CHUNK_SIZE) >= len(data): current_chunk.extend(MAGIC) yield current_chunk def command_print( canvas: Canvas ): """ Creates a print directive. """ payload = bytearray() payload.extend(DirectiveBuilder.start()) payload.extend(DirectiveBuilder.print( canvas.get_image(), canvas.width, canvas.height, bits_per_pixel = 1, alignment = 2 )) payload.extend(DirectiveBuilder.form_feed()) payload.extend(DirectiveBuilder.status()) payload.extend(DirectiveBuilder.end()) return create_payload(payload, is_print = True) def command_casette( media_type: int ): """ Creates a casette directive. """ payload = bytearray() payload.extend(DirectiveBuilder.start()) payload.extend(DirectiveBuilder.media_type(media_type)) payload.extend(DirectiveBuilder.end()) return create_payload(payload)