import serial import json from rich import print import time def map_fan_range(air_temp, water_temp, max_water_temp=40, min_fan_spd=90, max_fan_spd=250): if water_temp >= max_water_temp: return max_fan_spd temp_diff = water_temp - (air_temp + 2.4) print(temp_diff) if water_temp >= max_water_temp: return max_fan_spd else: return round(min(max(temp_diff * (max_fan_spd - min_fan_spd) / 4 + min_fan_spd, min_fan_spd), max_fan_spd)) class fan_PID: def __init__(self): self.kp = 80.0 self.ki = 0.08 self.kd = 8.0 self.integral = 0 self.last_error = 0 self.last_time = time.time() self.max_water_temp = 40 self.min_fan_spd = 90 self.max_fan_spd = 250 self.temp_tgt_offset = 3 self.fan_spd_offset = 10 def update(self, air_temp, water_temp): if water_temp >= self.max_water_temp: return self.max_fan_spd else: error = water_temp - (min(air_temp + self.temp_tgt_offset, self.max_water_temp)) delta = time.time() - self.last_time if control_data['f'] != 250: self.integral = self.integral + (error * delta) if air_temp + self.temp_tgt_offset > water_temp - 1: self.integral = 0 derivative = (error - self.last_error) / delta output = self.kp * error + self.ki * self.integral + self.kd * derivative self.last_error = error print(self.integral, derivative, error) return round(min(max(output + self.min_fan_spd, self.min_fan_spd), self.max_fan_spd)) # very abbreviated to fit in the arduino uno's 64 byte serial receive buffer # the arduino will crash if the fan speed goes to 70 or below... why? god only knows... # in addition, running the fans at speeds of over 250 will break the tachometer... control_data = { "f": 90, "p": 255, "lp": 300, "hp": 500 } PID = fan_PID() while True: try: serial_port = serial.Serial('/dev/ttyS0', baudrate=115200) while True: try: # msg = json.dumps(control_data) # print(msg.encode('ascii')) # serial_port.write(msg.encode('ascii')) monitor_stats = json.loads(serial_port.readline()) print(monitor_stats) time.sleep(1) # control_data['f'] = map_fan_range(monitor_stats['temps']['air'], monitor_stats['temps']['water']) # control_data['f'] = PID.update(monitor_stats['temps']['air'], monitor_stats['temps']['water']) except Exception as e: print(e) break except Exception as e: print(e) continue