Files
ShutdownESPReader/main.py
2026-04-24 19:19:10 +05:30

169 lines
5.1 KiB
Python

import serial
import requests
import os
from dotenv import load_dotenv
import logging
import signal
import time
load_dotenv()
##########################################
#Logging
if os.environ['logging_level'] == "DEBUG":
loglevel = logging.DEBUG
elif os.environ['logging_level'] == "INFO":
loglevel = logging.INFO
logger = logging.getLogger('Power Monitor')
logger.setLevel(loglevel)
#Console Handler
ch = logging.StreamHandler()
ch.setLevel(loglevel)
#Formatter
formatter = logging.Formatter('[%(asctime)s] [%(levelname)s] %(message)s')
formatter.default_msec_format = None
ch.setFormatter(formatter)
if not logger.handlers:
logger.addHandler(ch)
##########################################
ntfy_token = os.environ['NTFY_Token']
device_name = os.environ['device_name']
shutdown_timer = int(os.environ['shutdown_timer'])
ha_token = os.environ['HA_Token']
####################################################
#Globals
running = True
inverter_online = True
last_online = time.monotonic()
grid_online = True
def handle_shutdown(signum, frame):
global running
logger.info("Received shutdown signal, exiting...")
running = False
signal.signal(signal.SIGTERM, handle_shutdown)
signal.signal(signal.SIGINT, handle_shutdown)
def send_message(title: str, text: str):
try:
requests.post("https://ntfy.fieryeagle.org/Internet-Alerts",
data=text.encode("utf-8"),
headers={
"Title": title,
"Authorization": f"Bearer {ntfy_token}"
},
timeout=5)
except Exception as e:
logger.error(f"Failed to send notification: {e}")
def check_inverter(status: bool):
global inverter_online
global last_online
if status:
logger.debug("Inverter Online")
if not inverter_online:
logger.info("Inverter restored")
inverter_online = True
last_online = time.monotonic()
return True
else:
if inverter_online:
logger.info("Running on UPS reserve")
inverter_online = False
return False
def check_grid(status: bool):
global grid_online
if status:
logger.debug("Grid Online")
if not grid_online:
logger.info("Grid restored")
send_message("Grid back online", "Systems restored")
try:
requests.post("https://home.fieryeagle.org/api/states/binary_sensor.grid_power",
headers={
"Authorization": f"Bearer {ha_token}",
"Content-Type": "application/json"
},
json={"state": "on"},
timeout=5
)
except Exception as e:
logger.error(f"Could not update HA sensor: {e}")
grid_online = True
return True
else:
if grid_online:
logger.info("Running on Inverter reserve")
send_message("Grid power offline", "Running on Inverter reserve")
try:
requests.post("https://home.fieryeagle.org/api/states/binary_sensor.grid_power",
headers={
"Authorization": f"Bearer {ha_token}",
"Content-Type": "application/json"
},
json={"state": "off"},
timeout=5
)
except Exception as e:
logger.error(f"Could not update HA sensor: {e}")
grid_online = False
return False
def main():
shutdown_triggered = False
try:
with serial.Serial(device_name, 115200, timeout=1) as ser:
global last_online
last_logged = -1
while running:
line = ser.readline().decode(errors="ignore").strip()
if line == "":
logger.warning("No serial data")
continue
if line not in ("00", "01", "10", "11"):
logger.warning(f"Invalid serial data: {line}")
continue
inverter_status = line[0] == "0"
grid_status = line[1] == "0"
check_inverter(inverter_status)
check_grid(grid_status)
if not inverter_online:
sec = int(time.monotonic() - last_online)
if sec != last_logged:
logger.info(f"Offline for {sec}s")
last_logged = sec
if sec >= shutdown_timer:
shutdown_triggered = True
break
except serial.SerialException as e:
logger.error(f"Serial error: {e}")
if shutdown_triggered:
logger.info(f"Inverter offline for {shutdown_timer} seconds")
send_message("Inverter offline", "Hydrogen running on reserve power, shutting down")
os.system("shutdown now")
else:
logger.info("Exited cleanly (service stop)")
if __name__ == "__main__":
main()