import serial import requests import os from dotenv import load_dotenv import logging import signal load_dotenv() ########################################## #Logging if os.environ['logging_level'] == "DEBUG": loglevel = logging.DEBUG elif os.environ['logging_level'] == "INFO": loglevel = logging.INFO logger = logging.getLogger('Power Monitor') logger.setLevel(loglevel) #Console Handler ch = logging.StreamHandler() ch.setLevel(loglevel) #Formatter formatter = logging.Formatter('[%(asctime)s] [%(levelname)s] %(message)s') formatter.default_msec_format = None ch.setFormatter(formatter) if not logger.handlers: logger.addHandler(ch) ########################################## ntfy_token = os.environ['NTFY_Token'] device_name = os.environ['device_name'] shutdown_timer = int(os.environ['shutdown_timer']) running = True def handle_shutdown(signum, frame): global running logger.info("Received shutdown signal, exiting...") running = False signal.signal(signal.SIGTERM, handle_shutdown) signal.signal(signal.SIGINT, handle_shutdown) def main(): with serial.Serial(device_name, 115200, timeout=1) as ser: inverter_offline_counter = 0 shutdown_triggered = False while running: line = ser.readline().decode(errors="ignore").strip() if line == "0": logger.debug("Inverter Online") inverter_offline_counter = 0 elif inverter_offline_counter >= shutdown_timer: shutdown_triggered = True break else: inverter_offline_counter += 1 logger.info(f"Running on UPS reserve, offline for {inverter_offline_counter}s") if shutdown_triggered: logger.info(f"Inverter offline for {shutdown_timer} seconds") try: requests.post( "https://ntfy.fieryeagle.org/Internet-Alerts", data="Inverter offline, shutting down".encode("utf-8"), headers={ "Title": "Hydrogen running on reserve power", "Authorization": f"Bearer {ntfy_token}" }, timeout=5 ) except Exception as e: logger.error(f"Failed to send notification: {e}") os.system("shutdown now") else: logger.info("Exited cleanly (service stop)") if __name__ == "__main__": main()