added functionality for a second fan on pin 10

This commit is contained in:
Lucca Pirovano
2025-11-15 19:12:31 -05:00
parent 81f4a6d1c0
commit 9646403938
3 changed files with 156 additions and 111 deletions
+35 -42
View File
@@ -1,15 +1,10 @@
#!/usr/bin/env python3
"""
Flask web UI for ultra-responsive fan control.
Works with the Arduino sketch above (pin 11 PWM).
Flask Dual Fan Control Pin 11 (A), Pin 10 (B)
"""
from flask import Flask, render_template, jsonify, request
import serial
import serial.tools.list_ports
import threading
import time
import sys
from flask import Flask, render_template, jsonify, Response
import serial, serial.tools.list_ports, threading, time, sys
from collections import deque
app = Flask(__name__)
@@ -24,58 +19,60 @@ def find_arduino():
PORT = find_arduino()
BAUD = 115200
# Global serial
ser = None
serial_lock = threading.Lock()
# -------------------------------------------------
# Serial init with retry on "busy"
# -------------------------------------------------
def init_serial():
global ser
for attempt in range(6):
try:
ser = serial.Serial(PORT, BAUD, timeout=0)
time.sleep(2) # Arduino reset
time.sleep(2)
print(f"[OK] Connected to {PORT}")
return
except serial.SerialException as e:
if 'Device or resource busy' in str(e):
print(f"[BUSY] Port busy, retry {attempt+1}/6 in 1s...")
print(f"[BUSY] Retry {attempt+1}/6...")
time.sleep(1)
else:
print(f"[ERROR] Serial: {e}")
print(f"[ERROR] {e}")
sys.exit(1)
sys.exit("[FATAL] Could not open serial port after retries")
sys.exit("[FATAL] Port open failed")
init_serial()
# -------------------------------------------------
# Safe send
# Send command: 'a5', 'bf', 'as', etc.
# -------------------------------------------------
def send_command(cmd: str):
def send_command(fan: str, cmd: str):
"""fan = 'a' or 'b', cmd = '0'-'9','f','s'"""
if fan not in ('a', 'b') or cmd not in '0123456789fs':
return
with serial_lock:
if ser and ser.is_open:
try:
ser.write(fan.encode())
time.sleep(0.001)
ser.write(cmd.encode())
ser.flush()
except Exception as e:
print(f"[WRITE FAIL] {e}")
# -------------------------------------------------
# Background reader pushes feedback to browser via SSE
# Background reader SSE
# -------------------------------------------------
from collections import deque
feedback_buffer = deque(maxlen=50) # last 50 messages
feedback_buffer = deque(maxlen=50)
def serial_reader():
buffer = ""
while True:
if ser and ser.in_waiting:
try:
line = ser.readline().decode(errors='ignore').strip()
if line:
feedback_buffer.append(line)
buffer += ser.read(ser.in_waiting).decode(errors='ignore')
while '\n' in buffer:
line, buffer = buffer.split('\n', 1)
line = line.strip()
if line:
feedback_buffer.append(line)
except:
pass
time.sleep(0.005)
@@ -89,18 +86,15 @@ threading.Thread(target=serial_reader, daemon=True).start()
def index():
return render_template('index.html')
@app.route('/cmd/<key>')
def command(key):
if len(key) == 1 and key.lower() in '0123456789fs':
send_command(key)
return jsonify(status="ok", key=key)
@app.route('/cmd/<fan>/<key>')
def command(fan, key):
"""fan = a|b, key = 0-9,f,s"""
if fan in ('a', 'b') and len(key) == 1 and key.lower() in '0123456789fs':
send_command(fan, key.lower())
return jsonify(status="ok", fan=fan.upper(), key=key.upper())
return jsonify(status="invalid"), 400
# -------------------------------------------------
# Server-Sent Events (SSE) live Arduino feedback
# -------------------------------------------------
from flask import Response
# SSE Stream
@app.route('/stream')
def stream():
def event_stream():
@@ -113,11 +107,10 @@ def stream():
time.sleep(0.05)
return Response(event_stream(), mimetype="text/event-stream")
# -------------------------------------------------
# Run
# -------------------------------------------------
if __name__ == '__main__':
# Accessible from phone/tablet on same WiFi
print(f"\nWeb UI: http://YOUR_IP:5000")
print(" (Find YOUR_IP with: ip addr show | grep inet)")
ip = [l.split()[1] for l in __import__('subprocess')
.check_output(["ip", "route"]).decode().splitlines()
if 'default' in l][0].split('/')[0]
print(f"\nWeb UI: http://{ip}:5000")
app.run(host='0.0.0.0', port=5000, debug=False, threaded=True)