import serial import requests import os from dotenv import load_dotenv import logging import signal import time load_dotenv() ########################################## #Logging if os.environ['logging_level'] == "DEBUG": loglevel = logging.DEBUG elif os.environ['logging_level'] == "INFO": loglevel = logging.INFO logger = logging.getLogger('Power Monitor') logger.setLevel(loglevel) #Console Handler ch = logging.StreamHandler() ch.setLevel(loglevel) #Formatter formatter = logging.Formatter('[%(asctime)s] [%(levelname)s] %(message)s') formatter.default_msec_format = None ch.setFormatter(formatter) if not logger.handlers: logger.addHandler(ch) ########################################## ntfy_token = os.environ['NTFY_Token'] device_name = os.environ['device_name'] shutdown_timer = int(os.environ['shutdown_timer']) ha_token = os.environ['HA_Token'] #################################################### #Globals running = True inverter_online = True last_online = time.monotonic() grid_online = True def handle_shutdown(signum, frame): global running logger.info("Received shutdown signal, exiting...") running = False signal.signal(signal.SIGTERM, handle_shutdown) signal.signal(signal.SIGINT, handle_shutdown) def send_message(title: str, text: str): try: requests.post("https://ntfy.fieryeagle.org/Internet-Alerts", data=text.encode("utf-8"), headers={ "Title": title, "Authorization": f"Bearer {ntfy_token}" }, timeout=5) except Exception as e: logger.error(f"Failed to send notification: {e}") def check_inverter(status: bool): global inverter_online global last_online if status: logger.debug("Inverter Online") inverter_online = True last_online = time.monotonic() return True else: if inverter_online: logger.info("Running on UPS reserve") inverter_online = False return False def check_grid(status: bool): global grid_online if status: logger.debug("Grid Online") if not grid_online: logger.info("Grid restored") send_message("Grid back online", "Systems restored") try: requests.post("https://home.fieryeagle.org/api/states/binary_sensor.grid_power", headers={ "Authorization": f"Bearer {ha_token}", "Content-Type": "application/json" }, json={"state": "on"}, timeout=5 ) except Exception as e: logger.error(f"Could not update HA sensor: {e}") grid_online = True return True else: if grid_online: logger.info("Running on Inverter reserve") send_message("Grid power offline", "Running on Inverter reserve") try: requests.post("https://home.fieryeagle.org/api/states/binary_sensor.grid_power", headers={ "Authorization": f"Bearer {ha_token}", "Content-Type": "application/json" }, json={"state": "off"}, timeout=5 ) except Exception as e: logger.error(f"Could not update HA sensor: {e}") grid_online = False return False def main(): shutdown_triggered = False try: with serial.Serial(device_name, 115200, timeout=1) as ser: global last_online last_logged = -1 while running: line = ser.readline().decode(errors="ignore").strip() if line == "": logger.warning("No serial data") continue if line not in ("00", "01", "10", "11"): logger.warning(f"Invalid serial data: {line}") continue inverter_status = line[0] == "0" grid_status = line[1] == "0" check_inverter(inverter_status) check_grid(grid_status) if not inverter_online: sec = int(time.monotonic() - last_online) if sec != last_logged: logger.info(f"Offline for {sec}s") last_logged = sec if sec >= shutdown_timer: shutdown_triggered = True break except serial.SerialException as e: logger.error(f"Serial error: {e}") if shutdown_triggered: logger.info(f"Inverter offline for {shutdown_timer} seconds") send_message("Inverter offline", "Hydrogen running on reserve power, shutting down") os.system("shutdown now") else: logger.info("Exited cleanly (service stop)") if __name__ == "__main__": main()