188 lines
6.6 KiB
Python

from enum import IntEnum
from os import name, listdir
import serial
from ipkvm.app import logger
from ipkvm.util.profiles import profile_manager
import threading
from queue import Queue
import json
import networkx as nx
import time
from collections.abc import Mapping
# from .post_codes import POSTTextDef, POSTHex7Segment
from .scancodes import ASCII2JS, HIDKeyCode
from ipkvm.util.types import MultiDiGraph, OverclockingDict
class GPIO(IntEnum):
LOW = 0
HIGH = 1
class Esp32Serial(threading.Thread):
def __init__(self):
super().__init__()
# self.post_code_queue: Queue[str] = Queue()
self.mkb_queue: Queue[Mapping[str, int | str | Mapping[str, int]]] = Queue()
# self._power_status = False
self._usb_status = False
self._last_usb_status = False
# self._last_post_code = "00"
# self.notify_code: str
# self.active_notification_request = threading.Event()
# self.post_code_notify = threading.Event()
self._key_delay = 0.2
self.start()
def run(self):
profile_manager.restart_serial.wait()
while True:
profile_manager.restart_serial.clear()
self.do_work()
def do_work(self):
device = self.get_device()
with device as ser:
while not profile_manager.restart_serial.is_set():
while not self.mkb_queue.empty():
msg = self.mkb_queue.get()
ser.write(json.dumps(msg).encode())
while ser.in_waiting > 0:
try:
line = json.loads(ser.readline().decode().strip())
# self._power_status = line["pwr"]
self._usb_status = line["usb"]
if self._usb_status != self._last_usb_status:
if self._usb_status:
logger.info("Client machine cleared POST.")
else:
logger.info("Client machine powered off.")
self._last_usb_status = self._usb_status
# elif "post_code" in line:
# self._last_post_code = POSTHex7Segment[line["post_code"]]
#
# ui.emit("update_seven_segment", POSTHex7Segment[line["post_code"]])
# ui.emit("update_post_log", f"{POSTTextDef[line["post_code"]]}: {POSTHex7Segment[line["post_code"]]}")
#
# if self.active_notification_request.is_set():
# if self._last_post_code == self.notify_code:
# self.post_code_notify.set()
# self.active_notification_request.clear()
#
# print(f"{POSTTextDef[line["post_code"]]}: {POSTHex7Segment[line["post_code"]]}")
except json.JSONDecodeError:
continue
except UnicodeDecodeError:
continue
time.sleep(0.01)
def get_device(self):
if name == "posix":
assert isinstance(profile_manager.profile["server"], dict)
return serial.Serial(f"/dev/serial/by-id/{profile_manager.profile["server"]["esp32_serial"]}", 115200,
bytesize=serial.EIGHTBITS, parity=serial.PARITY_NONE, stopbits=serial.STOPBITS_ONE)
else:
raise RuntimeError("Your OS is unsupported at this time!")
def ez_press_key(self, key: str):
msg = msg = {
"key_down": HIDKeyCode[key].value
}
self.mkb_queue.put(msg)
msg = msg = {
"key_up": HIDKeyCode[key].value
}
self.mkb_queue.put(msg)
def get_device_list(self):
if name == "posix":
serial_devices = listdir("/dev/serial/by-id/")
else:
serial_devices = []
return serial_devices
def _traverse_path(self, graph: MultiDiGraph, node_a: str, node_b: str):
path = nx.shortest_path(graph, node_a, node_b)
path_edges = list(zip(path[:-1], path[1:]))
edge_path= [(u, v, graph[u][v]) for u, v in path_edges]
for step in edge_path:
if "initial_keypath" in step[2][0] and step[2][0]["visited"] == "false":
keys = step[2][0]["initial_keypath"].split(',')
# Type checker is simply wrong! This is the correct usage!
graph.edges[step[0], step[1], 0]["visited"] = "true" # type: ignore
else:
keys = step[2][0]["keypath"].split(',')
for key in keys:
time.sleep(self._key_delay)
self.ez_press_key(key)
def _apply_setting(self, graph: MultiDiGraph, setting_node: str, new_value: str):
if graph.nodes[setting_node]["option_type"] == "list":
possible_values = graph.nodes[setting_node]["options"].split(',')
key = graph.nodes[setting_node]["traversal_key"]
time.sleep(self._key_delay)
self.ez_press_key("Enter")
for value in possible_values:
time.sleep(self._key_delay)
if value == new_value:
self.ez_press_key("Enter")
break
else:
self.ez_press_key(key)
elif graph.nodes[setting_node]["option_type"] == "field":
for key in new_value:
time.sleep(self._key_delay)
self.ez_press_key(ASCII2JS[key])
time.sleep(self._key_delay)
self.ez_press_key("Enter")
logger.info(f"Changed {setting_node} from {graph.nodes[setting_node]["value"]} to {new_value}!")
graph.nodes[setting_node]["value"] = new_value
def apply_all_settings(self, settings: OverclockingDict, graph: MultiDiGraph, current_node: str):
for category in settings:
for setting_node in settings[category]:
if graph.nodes[setting_node]["value"] != settings[category][setting_node]:
self._traverse_path(graph, current_node, setting_node)
current_node = setting_node
self._apply_setting(graph, setting_node, settings[category][setting_node])
return current_node
# @property
# def power_status(self):
# return self._power_status
@property
def usb_status(self):
return self._usb_status
# @property
# def last_post_code(self):
# return self._last_post_code