working web stream (laggy)

This commit is contained in:
rawhide kobayashi 2025-02-19 16:05:37 -06:00
parent cd6a06de65
commit 4b61cdd98f
Signed by: rawhide_k
GPG Key ID: E71F77DDBC513FD7
8 changed files with 345 additions and 30 deletions

1
profiles/hong.json Normal file

@ -0,0 +1 @@
{"video_device": {"friendly_name": "WARRKY USB 3.0", "format": "mjpeg", "resolution": "1920x1080", "fps": "60.000"}, "esp32_serial": "usb-Espressif_USB_JTAG_serial_debug_unit_CC:8D:A2:0F:C0:08-if00"}

@ -1,5 +1,98 @@
from os import name, listdir
from flask import Flask
import json
import logging
ui = Flask(__name__)
logger = ui.logger
logger.setLevel(logging.INFO)
from ipkvm import routes
def new_profile():
device_list = video.create_device_list()
print(f"Detected {len(device_list)} video devices on your system.")
print("Please enter the number of your preferred video device.")
for i, device in enumerate(device_list):
print(f"{i + 1}. {device.friendly_name}")
device = int(input("> ")) - 1
if len(device_list[device].video_formats) > 1:
print("Please enter your preferred video input format: ")
for i, format in enumerate(device_list[device].video_formats):
print(f"{i + 1}. {format}")
format = list(device_list[device].video_formats.keys())[int(input("> ")) - 1]
else:
format = next(iter(device_list[device].video_formats))
print(f"Video input format auto-detected as {format}!")
print("Please enter the number of your preferred video resolution.")
for i, resolution in enumerate(device_list[device].video_formats[format]):
print(f"{i + 1}. {resolution}")
resolution = list(device_list[device].video_formats[format].keys())[int(input("> ")) - 1]
print("Please enter the number of your preferred video refresh rate.")
for i, fps in enumerate(device_list[device].video_formats[format][resolution]):
print(f"{i + 1}. {fps}")
fps = str(device_list[device].video_formats[format][resolution][int(input("> ")) - 1])
if name == "posix":
serial_devices = listdir("/dev/serial/by-id/")
else:
serial_devices = []
if len(serial_devices) > 1:
print("Please enter the number of your preferred ESP32 serial device.")
for i, serial_device in enumerate(serial_devices):
print(f"{i + 1}. {serial_device}")
serial_device = serial_devices[int(input("> ")) - 1]
elif len(serial_devices) == 1:
print(f"ESP32 auto-detected as {serial_devices[0]}!")
serial_device = serial_devices[0]
else:
raise RuntimeError("No valid ESP32 devices connected!")
print("Please enter your new profile name.")
profile_name = input("> ")
profile: dict[str, str | dict[str, str]] = {
"video_device": {
"friendly_name": device_list[device].friendly_name,
"format": format,
"resolution": resolution,
"fps": fps
},
"esp32_serial": serial_device
}
with open(f"profiles/{profile_name}.json", 'w') as file:
json.dump(profile, file)
return profile
if len(listdir("profiles")) == 0:
print("No profiles found, entering runtime profile configuration...")
profile = new_profile()
elif len(listdir("profiles")) == 1:
print(f"Only one profile found, autoloading {listdir("profiles")[0]}...")
with open(f"profiles/{listdir("profiles")[0]}", 'r') as file:
profile = json.load(file)
print(profile)
from ipkvm.util import video
from ipkvm import feed
frame_buffer = feed.FrameBuffer()
from ipkvm import routes

80
webui/ipkvm/feed.py Normal file

@ -0,0 +1,80 @@
from os import name
import threading
import av, av.container
import cv2
from ipkvm import profile
from ipkvm.util import video
from ipkvm import logger
import time
class FrameBuffer(threading.Thread):
def __init__(self):
super().__init__()
self.buffer_lock = threading.Lock()
self.cur_frame = None
self.new_frame = threading.Event()
self.start()
def run(self):
self.capture_feed()
def capture_feed(self):
device = self.acquire_device()
print(device)
time.sleep(5)
while True:
# try:
# for frame in device.decode(video=0):
# frame = frame.to_ndarray(format='rgb24')
# ret, self.cur_frame = cv2.imencode('.jpg', frame)
# print(ret)
# cv2.imwrite("test.jpg", frame)
# except av.BlockingIOError:
# pass
success, frame = device.read()
if not success:
break
else:
ret, buffer = cv2.imencode('.jpg', frame)
self.cur_frame = buffer.tobytes()
self.new_frame.set()
# def acquire_device(self):
# device_list = video.create_device_list()
# device_path = ""
# for device in device_list:
# if device.friendly_name == profile["video_device"]["friendly_name"]:
# device_path = device.path
#
# if name == "posix":
# return av.open(device_path, format="video4linux2", container_options={
# "framerate": profile["video_device"]["fps"],
# "video_size": profile["video_device"]["resolution"],
# "input_format": profile["video_device"]["format"]
# })
#
# else:
# raise RuntimeError("We're on something other than Linux, and that's not yet supported!")
def acquire_device(self):
device_list = video.create_device_list()
device_path = ""
for device in device_list:
if device.friendly_name == profile["video_device"]["friendly_name"]:
device_path = device.path
if name == "posix":
device = cv2.VideoCapture(device_path) # Use default webcam (index 0)
else:
raise RuntimeError("We're on something other than Linux, and that's not yet supported!")
device.set(cv2.CAP_PROP_FOURCC, cv2.VideoWriter_fourcc(*"MJPG"))
device.set(cv2.CAP_PROP_FRAME_WIDTH, int(profile["video_device"]["resolution"].split('x')[0]))
device.set(cv2.CAP_PROP_FRAME_HEIGHT, int(profile["video_device"]["resolution"].split('x')[1]))
device.set(cv2.CAP_PROP_FPS, float(profile["video_device"]["fps"]))
return device

@ -1,36 +1,14 @@
from ipkvm import ui
from ipkvm import frame_buffer
from flask import Response
import cv2
camera = cv2.VideoCapture(0) # Use default webcam (index 0)
# Get some basic properties
width = int(camera.get(cv2.CAP_PROP_FRAME_WIDTH))
height = int(camera.get(cv2.CAP_PROP_FRAME_HEIGHT))
fps = camera.get(cv2.CAP_PROP_FPS)
format = camera.get(cv2.CAP_PROP_FORMAT)
camera.set(cv2.CAP_PROP_FRAME_WIDTH, 1920)
camera.set(cv2.CAP_PROP_FRAME_HEIGHT, 1080)
camera.set(cv2.CAP_PROP_FPS, 60)
print(f"Current resolution: {width}x{height}")
print(f"Current FPS: {fps}")
print(f"Current format: {format}")
import time
def generate_frames():
while True:
success, frame = camera.read()
if not success:
break
else:
# Encode frame as JPEG
ret, buffer = cv2.imencode('.jpg', frame)
frame = buffer.tobytes()
yield (b'--frame\r\n'
b'Content-Type: image/jpeg\r\n\r\n' + frame + b'\r\n')
frame_buffer.new_frame.wait()
frame_buffer.new_frame.clear()
yield (b'--frame\r\n'
b'Content-Type: image/jpeg\r\n\r\n' + frame_buffer.cur_frame + b'\r\n')
@ui.route('/video_feed')
def video_feed():

@ -0,0 +1,22 @@
<!DOCTYPE html>
<html>
<head>
<title>MJPEG Stream Viewer</title>
<style>
.stream-container {
max-width: 800px;
margin: 0 auto;
padding: 20px;
}
.stream-view {
width: 100%;
height: auto;
}
</style>
</head>
<body>
<div class="stream-container">
<img src="{{ url_for('video_feed') }}" class="stream-view" />
</div>
</body>
</html>

@ -0,0 +1 @@

140
webui/ipkvm/util/video.py Normal file

@ -0,0 +1,140 @@
"""os.name provides the type of the operating system!"""
from os import name, listdir
import dataclasses
import re
import subprocess
import threading
import av
import av.container
import cv2
from ipkvm import logger
from ipkvm import profile
@dataclasses.dataclass
class VideoDevice:
"""
Container for video input device data.
"""
friendly_name: str
path: str
video_formats: dict[str, dict[str, list[float]]]
def check_valid_device_linux(devices: list[str], valid_cameras: dict[str, str]):
"""
Uses v4l2-ctl to determine whether a video device actually provides video.
Takes list of /dev/videoX strings.
Returns a dictionary of /dev/videoX strings as keys and friendly device names as values.
"""
for device in devices:
cmd = ["v4l2-ctl", "-d", device, "--all"]
lines = subprocess.run(cmd, stdout=subprocess.PIPE, stderr=subprocess.PIPE,
check=True).stdout.decode().strip().splitlines()
if v4l2_check_capability(lines):
for line in lines:
if "Model" in line:
model_name = line.split(':')[1].strip()
valid_cameras.update({model_name: device})
return valid_cameras
def v4l2_check_capability(lines: list[str]):
"""
Checks if the provided stdout from v4l2-ctl identifies a device as a video capture device.
"""
for i, line in enumerate(lines):
if "Device Caps" in line:
x = i
while "Media Driver Info" not in lines[x]:
x += 1
if "Video Capture" in lines[x]:
return True
return False
def scan_devices():
"""
Creates a list of valid video devices and returns a dictionary of friendly names and device paths.
"""
valid_devices: dict[str, str] = {}
if name == "posix":
devices: list[str] = [f"/dev/{x}" for x in listdir("/dev/") if "video" in x]
valid_devices = check_valid_device_linux(devices, valid_devices)
elif name == "nt":
# implement camera acqisition for windows
pass
return valid_devices
def get_video_formats(device: str):
"""
Use v4l2-ctl (Linux) or FFplay (Windows) to get camera operating modes and sort by quality.
"""
video_formats: dict[str, dict[str, list[str]]] = {}
# Translates fourcc codec labels to the type that FFmpeg uses, and blacklists bad codecs.
fourcc_format_translation: dict[str, str | None] = {
"YUYV": None,
"MJPG": "mjpeg"
}
if name == "posix":
cmd = ["v4l2-ctl", "-d", device, "--list-formats-ext"]
output = subprocess.run(cmd, stdout=subprocess.PIPE, stderr=subprocess.PIPE, check=True).stdout.decode().strip()
matches = re.finditer(r"(?P<format>'\S*')|(?P<resolution>\d*x\d*)|(?P<fps>\d*.\d* fps)", output)
if matches:
video_format = None
resolution = None
fps = None
for match in matches:
if re.match(r"'\S*'", match[0]):
video_format = fourcc_format_translation[match[0].strip('\'')]
resolution = None
fps = None
elif re.match(r"\d*x\d*", match[0]):
resolution = match[0]
fps = None
elif re.match(r"\d*.\d* fps", match[0]):
fps = match[0].rstrip(" fps")
if video_format and resolution and fps:
if video_format not in video_formats:
video_formats.update({
video_format: {
resolution: [fps]
}
})
elif resolution not in video_formats[video_format]:
video_formats[video_format].update({
resolution: [fps]
})
else:
video_formats[video_format][resolution].append(fps)
return video_formats
def create_device_list():
"""
Create a complete device list including name, device ID, and available video formats.
"""
device_names = scan_devices()
device_list: list[VideoDevice] = []
for device_name in device_names:
device_list.append(VideoDevice(device_name, device_names[device_name], get_video_formats(device_names[device_name])))
if len(device_list[-1].video_formats) == 0:
device_list.pop()
if len(device_list) > 0:
logger.info(f"Found {len(device_list)} video devices.")
return device_list
else:
raise RuntimeError("No video devices found on this system!")
# EZ DEBUGGING
if __name__ == '__main__':
print(create_device_list())

@ -1,4 +1,4 @@
from ipkvm import ui
if __name__ == '__main__':
ui.run(host='0.0.0.0', port=5000, threaded=True)
ui.run(host='0.0.0.0', port=5000)