andito HF Staff commited on
Commit
6228a3e
·
verified ·
1 Parent(s): 9eede3b

Update app.py

Browse files
Files changed (1) hide show
  1. app.py +81 -53
app.py CHANGED
@@ -4,6 +4,7 @@ import asyncio
4
  import subprocess
5
  import sys
6
  import threading
 
7
  import time
8
  from dataclasses import dataclass
9
  from typing import List, Optional
@@ -18,12 +19,15 @@ from reachy_mini.utils import create_head_pose
18
 
19
  robot_ws = None # type: Optional[WebSocket]
20
  robot_loop = None # type: Optional[asyncio.AbstractEventLoop]
21
- # Add after imports
22
  _black_frame = np.zeros((640, 640, 3), dtype=np.uint8)
23
- # Encode it to JPEG immediately
24
  _, _buffer = cv2.imencode('.jpg', _black_frame)
25
- # Set the global variable to bytes
26
- latest_test_frame = _buffer.tobytes()
 
 
 
 
27
 
28
  @dataclass
29
  class Movement:
@@ -154,17 +158,28 @@ class ReachyController:
154
  time.sleep(0.04)
155
 
156
  def generate_mjpeg_stream(self):
157
- global latest_test_frame
158
- while True:
159
- # Get the raw bytes (which you said are already JPEG)
160
- frame_bytes = latest_test_frame
161
 
162
- if frame_bytes is not None and len(frame_bytes) > 0:
163
- # Create the MJPEG boundary format
164
- yield (b'--frame\r\n'
165
- b'Content-Type: image/jpeg\r\n\r\n' + frame_bytes + b'\r\n')
166
-
167
- time.sleep(0.04) # Cap at ~25 FPS
 
 
 
 
 
 
 
 
 
 
 
 
 
168
 
169
  def auto_start(self):
170
  """Auto-start daemon and robot connection"""
@@ -642,6 +657,7 @@ def send_robot_command(cmd: dict):
642
  return
643
 
644
  async def _send():
 
645
  try:
646
  send_ws_start = time.perf_counter()
647
  await robot_ws.send_json(cmd) # or send_text(json.dumps(cmd)) if you prefer
@@ -649,6 +665,7 @@ def send_robot_command(cmd: dict):
649
  print(f"[WebSocket /robot] Send time: {send_ws_time*1000:.2f}ms")
650
  except Exception as e:
651
  print("[Space] Error sending command to robot:", e)
 
652
 
653
  # Schedule coroutine on the main event loop from this worker thread
654
  fut = asyncio.run_coroutine_threadsafe(_send(), robot_loop)
@@ -660,43 +677,46 @@ def send_robot_command(cmd: dict):
660
  print(f"[WebSocket /robot] Total send time (including scheduling): {total_send_time*1000:.2f}ms")
661
  except Exception as e:
662
  print("[Space] Command failed:", e)
 
663
 
664
  @app.websocket("/mujoco_stream")
665
  async def robot_socket(ws: WebSocket):
666
- global latest_test_frame
667
  await ws.accept()
668
  print("[Space] MuJoCo stream connected")
669
 
670
- receive_count = 0
671
- receive_start_time = time.perf_counter()
672
-
673
  try:
674
  while True:
675
- receive_start = time.perf_counter()
676
  msg = await ws.receive()
677
- receive_time = time.perf_counter() - receive_start
678
 
679
  if msg["type"] == "websocket.receive":
680
- if msg["bytes"] is not None:
681
- frame_update_start = time.perf_counter()
682
- latest_test_frame = msg["bytes"]
683
- frame_update_time = time.perf_counter() - frame_update_start
684
- receive_count += 1
685
-
686
- # Print stats every 30 frames
687
- if receive_count % 10 == 0:
688
- elapsed = time.perf_counter() - receive_start_time
689
- fps = receive_count / elapsed
690
- print(f"[WebSocket /mujoco_stream] Receive FPS: {fps:.2f} | Last receive: {receive_time*1000:.2f}ms | Frame update: {frame_update_time*1000:.2f}ms")
691
- receive_count = 0
692
- receive_start_time = time.perf_counter()
693
- elif msg["text"] is not None:
694
- print(f"[WebSocket /mujoco_stream] JSON received (receive time: {receive_time*1000:.2f}ms):", msg["text"])
 
 
 
 
 
 
695
  elif msg["type"] == "websocket.disconnect":
696
  print("[Space] MuJoCo stream disconnected")
697
  break
 
698
  except Exception as e:
699
- print("[Space] MuJoCo stream disconnected", e)
700
 
701
  @app.get("/video_feed")
702
  def video_feed():
@@ -713,33 +733,35 @@ async def robot_socket(ws: WebSocket):
713
  robot_loop = asyncio.get_running_loop()
714
  print("[Space] Robot connected")
715
 
716
- receive_count = 0
717
- receive_start_time = time.perf_counter()
 
 
 
 
 
 
 
 
 
 
 
 
718
 
719
  try:
720
  while True:
721
- receive_start = time.perf_counter()
722
  msg = await ws.receive()
723
- receive_time = time.perf_counter() - receive_start
724
-
725
  if msg["type"] == "websocket.receive":
726
  if msg["text"] is not None:
727
- receive_count += 1
728
- print(f"[WebSocket /robot] JSON received (receive time: {receive_time*1000:.2f}ms):", msg["text"])
729
-
730
- # Print stats every 10 messages
731
- if receive_count % 10 == 0:
732
- elapsed = time.perf_counter() - receive_start_time
733
- msg_rate = receive_count / elapsed
734
- print(f"[WebSocket /robot] Message rate: {msg_rate:.2f} msg/s | Avg receive time: {(elapsed/receive_count)*1000:.2f}ms")
735
- receive_count = 0
736
- receive_start_time = time.perf_counter()
737
  elif msg["type"] == "websocket.disconnect":
738
  print("[Space] Robot disconnected")
739
  break
740
  except Exception as e:
741
- print("[Space] Robot disconnected", e)
742
  finally:
 
743
  if robot_ws is ws:
744
  robot_ws = None
745
 
@@ -747,4 +769,10 @@ app = gr.mount_gradio_app(app, demo, path="/")
747
 
748
  if __name__ == "__main__":
749
  import uvicorn
750
- uvicorn.run(app, host="0.0.0.0", port=7860)
 
 
 
 
 
 
 
4
  import subprocess
5
  import sys
6
  import threading
7
+ import json
8
  import time
9
  from dataclasses import dataclass
10
  from typing import List, Optional
 
19
 
20
  robot_ws = None # type: Optional[WebSocket]
21
  robot_loop = None # type: Optional[asyncio.AbstractEventLoop]
22
+
23
  _black_frame = np.zeros((640, 640, 3), dtype=np.uint8)
 
24
  _, _buffer = cv2.imencode('.jpg', _black_frame)
25
+
26
+ frame_lock = threading.Lock()
27
+ latest_frame_data = {
28
+ "bytes": _buffer.tobytes(),
29
+ "timestamp": time.time()
30
+ }
31
 
32
  @dataclass
33
  class Movement:
 
158
  time.sleep(0.04)
159
 
160
  def generate_mjpeg_stream(self):
161
+ global latest_frame_data, frame_lock
162
+ last_timestamp = 0.0
 
 
163
 
164
+ while True:
165
+ # 1. Check if frame has changed
166
+ with frame_lock:
167
+ current_bytes = latest_frame_data["bytes"]
168
+ current_timestamp = latest_frame_data["timestamp"]
169
+
170
+ # 2. Only yield if this is a new frame
171
+ if current_timestamp > last_timestamp:
172
+ last_timestamp = current_timestamp
173
+ if current_bytes is not None:
174
+ yield (b'--frame\r\n'
175
+ b'Content-Type: image/jpeg\r\n\r\n' + current_bytes + b'\r\n')
176
+ else:
177
+ # If no new frame, sleep a bit longer to save CPU
178
+ time.sleep(0.02)
179
+ continue
180
+
181
+ # Cap FPS slightly to prevent saturation
182
+ time.sleep(0.02)
183
 
184
  def auto_start(self):
185
  """Auto-start daemon and robot connection"""
 
657
  return
658
 
659
  async def _send():
660
+ global robot_ws
661
  try:
662
  send_ws_start = time.perf_counter()
663
  await robot_ws.send_json(cmd) # or send_text(json.dumps(cmd)) if you prefer
 
665
  print(f"[WebSocket /robot] Send time: {send_ws_time*1000:.2f}ms")
666
  except Exception as e:
667
  print("[Space] Error sending command to robot:", e)
668
+ robot_ws = None
669
 
670
  # Schedule coroutine on the main event loop from this worker thread
671
  fut = asyncio.run_coroutine_threadsafe(_send(), robot_loop)
 
677
  print(f"[WebSocket /robot] Total send time (including scheduling): {total_send_time*1000:.2f}ms")
678
  except Exception as e:
679
  print("[Space] Command failed:", e)
680
+ robot_ws = None
681
 
682
  @app.websocket("/mujoco_stream")
683
  async def robot_socket(ws: WebSocket):
684
+ global latest_frame_data, frame_lock
685
  await ws.accept()
686
  print("[Space] MuJoCo stream connected")
687
 
 
 
 
688
  try:
689
  while True:
 
690
  msg = await ws.receive()
 
691
 
692
  if msg["type"] == "websocket.receive":
693
+ # 1. Safe retrieval using .get() to avoid KeyError
694
+ payload_bytes = msg.get("bytes")
695
+ payload_text = msg.get("text")
696
+
697
+ # 2. Handle Binary Frame
698
+ if payload_bytes is not None:
699
+ with frame_lock:
700
+ latest_frame_data["bytes"] = payload_bytes
701
+ latest_frame_data["timestamp"] = time.time()
702
+
703
+ # 3. Handle Text Message (The Ping)
704
+ elif payload_text is not None:
705
+ try:
706
+ data = json.loads(payload_text)
707
+ if data.get("type") == "ping":
708
+ # Optional: Print debug only if you need to verify it's working
709
+ print("[Space] Frame keep-alive received")
710
+ pass
711
+ except json.JSONDecodeError:
712
+ print(f"[Space] Received invalid JSON: {payload_text}")
713
+
714
  elif msg["type"] == "websocket.disconnect":
715
  print("[Space] MuJoCo stream disconnected")
716
  break
717
+
718
  except Exception as e:
719
+ print(f"[Space] MuJoCo stream disconnected error: {e}")
720
 
721
  @app.get("/video_feed")
722
  def video_feed():
 
733
  robot_loop = asyncio.get_running_loop()
734
  print("[Space] Robot connected")
735
 
736
+ # --- HEARTBEAT MECHANISM ---
737
+ # Define a keep-alive task that runs in the background
738
+ async def keep_alive():
739
+ try:
740
+ while True:
741
+ await asyncio.sleep(30) # Send ping every 30s
742
+ if robot_ws:
743
+ # Send a lightweight ping
744
+ await robot_ws.send_json({"type": "ping"})
745
+ except Exception as e:
746
+ print(f"[Space] Heartbeat stopped: {e}")
747
+
748
+ # Start the heartbeat
749
+ heartbeat_task = asyncio.create_task(keep_alive())
750
 
751
  try:
752
  while True:
 
753
  msg = await ws.receive()
 
 
754
  if msg["type"] == "websocket.receive":
755
  if msg["text"] is not None:
756
+ # Handle robot messages here
757
+ pass
 
 
 
 
 
 
 
 
758
  elif msg["type"] == "websocket.disconnect":
759
  print("[Space] Robot disconnected")
760
  break
761
  except Exception as e:
762
+ print("[Space] Robot disconnected (Error)", e)
763
  finally:
764
+ heartbeat_task.cancel() # Stop the heartbeat when connection dies
765
  if robot_ws is ws:
766
  robot_ws = None
767
 
 
769
 
770
  if __name__ == "__main__":
771
  import uvicorn
772
+ uvicorn.run(
773
+ app,
774
+ host="0.0.0.0",
775
+ port=7860,
776
+ proxy_headers=True, # Tell Uvicorn to trust the proxy's headers
777
+ forwarded_allow_ips="*" # Trust headers from any IP (the internal HF proxy)
778
+ )