from gradio import ChatMessage from langchain_mcp_adapters.client import MultiServerMCPClient from langchain.agents import create_agent from langgraph.checkpoint.memory import InMemorySaver from langchain_core.tools import tool import pandas as pd from youtube_search import YoutubeSearch import gradio as gr import asyncio import re import uuid import json from pathlib import Path from dtc_display import DTCDisplay from tools import search_youtube_video, decode_vin, hex_to_decimal, calculate_obd_value, combine_bytes from prompts import system_prompt from hackathon_detail import create_hackathon_detail_tab from demo_video import create_demo_video_tab from mcp_server_detail import create_mcp_server_tab # from langchain_anthropic import ChatAnthropic # model = ChatAnthropic(model="claude-haiku-4-5-20251001", temperature=0, streaming=True) # streaming=True is required for token streaming from langchain_nebius import ChatNebius model = ChatNebius( model="Qwen/Qwen3-Coder-480B-A35B-Instruct", # model = "Qwen/Qwen3-235B-A22B-Instruct-2507", # model="meta-llama/Llama-3.3-70B-Instruct", # Choose from available models # model="deepseek-ai/DeepSeek-R1-0528", # Choose from available models temperature=0, top_p=0.95, streaming=True, ) # Global variables agent = None mcp_error = None checkpointer = None historic_data = None dtc_component = None async def setup_agent(): global agent, mcp_error, checkpointer try: client = MultiServerMCPClient( { "diagnostic": { "transport": "streamable_http", # "url": "http://192.168.10.69/mcp", # Actual physical dongle MCP server "url": "https://castlebbs-elm327-simulator.hf.space/gradio_api/mcp/", # Simulator MCP server on gradio "timeout": 20.0, # Increased timeout for multiple calls } } ) tools = await client.get_tools() print("Available MCP tools:", [tool.name for tool in tools]) # Add YouTube search tool to the tools list all_tools = list(tools) + [search_youtube_video, decode_vin, hex_to_decimal, calculate_obd_value, combine_bytes] # Create the checkpointer checkpointer = InMemorySaver() # Create the agent with the retrieved tools agent = create_agent( model=model, system_prompt=system_prompt, tools=all_tools, checkpointer=checkpointer, ) print("✅ MCP server connected successfully") return agent except Exception as e: mcp_error = str(e) print(f"⚠️ Warning: Could not connect to MCP server: {e}") print("The app will start but MCP tools will not be available.") return None # Try to setup agent, but don't fail if it doesn't work try: agent = asyncio.run(setup_agent()) except Exception as e: print(f"Failed to initialize agent: {e}") async def interact_with_langchain_agent(user_prompt, messages, session_id, historic_data_state, vin_data_state, dtc_data_state): global agent, mcp_error # Skip if input is empty or only whitespace if not user_prompt or not user_prompt.strip(): yield messages, historic_data_state, vin_data_state, dtc_data_state return print("> ", user_prompt) messages.append(ChatMessage(role="user", content=user_prompt)) yield messages, historic_data_state, vin_data_state, dtc_data_state # Check if agent is available if agent is None: error_msg = f"❌ MCP server is not available. {mcp_error or 'Connection failed.'}\n\nPlease ensure MCP server is reachable and try again." messages.append(ChatMessage(role="assistant", content=error_msg)) yield messages, historic_data_state, vin_data_state, dtc_data_state return # print("Agent is available, processing the request...") try: assistant_response = "" # Collect assistant's full response current_message_index = None # Track which message we're streaming to tool_call_tracker = {} # Track tool calls: {tool_call_id: message_index} tool_args_accumulator = {} # NEW: Accumulate streaming tool arguments current_tool_call_id = None # Track the active tool call being streamed dtc_extracted = False # Track if we've already extracted DTC data # Stream with messages mode for token-by-token streaming async for message_chunk, metadata in agent.astream( {"messages": [{"role": "user", "content": user_prompt}]}, {"configurable": {"thread_id": session_id}}, stream_mode="messages" ): node = metadata.get("langgraph_node", "unknown") # print(f"Token from node {node}: {message_chunk}") # Only process chunks from the model node (AI responses) if node == "model": # Check if this is the last chunk in a tool call sequence is_last_chunk = hasattr(message_chunk, "response_metadata") and message_chunk.response_metadata.get("finish_reason") == "tool_calls" # Check if this is a tool call or text content if hasattr(message_chunk, "tool_call_chunks") and message_chunk.tool_call_chunks: # Handle tool call chunks for tool_chunk in message_chunk.tool_call_chunks: tool_call_id = tool_chunk.get("id") tool_name = tool_chunk.get("name") tool_args = tool_chunk.get("args") # Determine active ID using pending key for chunks without ID if not tool_call_id: # No ID yet, use default key active_id = "pending_tool_0" else: # We have an ID now - this is a new tool call active_id = tool_call_id # If this is a brand new tool call with a name, reset for fresh start if tool_name and tool_call_id not in tool_call_tracker: # Clear any stale pending data if "pending_tool_0" in tool_args_accumulator: del tool_args_accumulator["pending_tool_0"] # Initialize fresh accumulator for this tool call tool_args_accumulator[tool_call_id] = "" current_tool_call_id = tool_call_id # Migrate any pending args to the real ID elif "pending_tool_0" in tool_args_accumulator: if tool_call_id not in tool_args_accumulator: tool_args_accumulator[tool_call_id] = "" tool_args_accumulator[tool_call_id] = tool_args_accumulator["pending_tool_0"] + tool_args_accumulator.get(tool_call_id, "") del tool_args_accumulator["pending_tool_0"] current_tool_call_id = tool_call_id # Initialize accumulator if needed if active_id not in tool_args_accumulator: tool_args_accumulator[active_id] = "" # Accumulate arguments if tool_args: tool_args_accumulator[active_id] += tool_args # Create message when we first see a tool call with name and ID if tool_call_id and tool_name: # Check if this is a reused ID (same ID appearing again) if tool_call_id in tool_call_tracker: # This ID was used before - check if previous call is complete prev_msg_idx = tool_call_tracker[tool_call_id] if prev_msg_idx < len(messages) and messages[prev_msg_idx].content: # Previous call has results, this is a NEW call with reused ID # Reset the accumulator and create a new message tool_args_accumulator[tool_call_id] = "" messages.append(ChatMessage( role="assistant", content="", metadata={"title": f"⚙️ Calling {tool_name}...", "tool_id": tool_call_id, "tool_name": tool_name}, )) tool_call_tracker[tool_call_id] = len(messages) - 1 yield messages, historic_data_state, vin_data_state, dtc_data_state else: # Truly new tool call ID messages.append(ChatMessage( role="assistant", content="", metadata={"title": f"⚙️ Calling {tool_name}...", "tool_id": tool_call_id, "tool_name": tool_name}, )) tool_call_tracker[tool_call_id] = len(messages) - 1 yield messages, historic_data_state, vin_data_state, dtc_data_state # Update the message as args stream in OR on the last chunk # Check both active_id and current_tool_call_id in case pending args exist target_id = active_id if active_id in tool_call_tracker else current_tool_call_id if target_id and target_id in tool_call_tracker: # Try to get args from either active_id or pending accumulated_args = tool_args_accumulator.get(active_id, "") or tool_args_accumulator.get("pending_tool_0", "") if accumulated_args: try: # Try to parse accumulated args args_dict = json.loads(accumulated_args) # Get the first parameter value (works for any parameter name) param_value = "..." if args_dict: # Get the first non-empty value from the dict param_value = next((str(v) for v in args_dict.values() if v), "...") # Update the message title with the actual parameter value msg_index = tool_call_tracker[target_id] tool_name = messages[msg_index].metadata.get("tool_name", "") messages[msg_index].metadata["title"] = f"⚙️ Calling {tool_name}: {param_value}" yield messages, historic_data_state, vin_data_state, dtc_data_state except json.JSONDecodeError: # Args still streaming, don't update yet pass # Force update on the last chunk to ensure we catch the final args if is_last_chunk and current_tool_call_id and current_tool_call_id in tool_call_tracker: # Check both the tool ID accumulator and pending accumulated_args = tool_args_accumulator.get(current_tool_call_id, "") or tool_args_accumulator.get("pending_tool_0", "") if accumulated_args: try: args_dict = json.loads(accumulated_args) param_value = "..." if args_dict: param_value = next((str(v) for v in args_dict.values() if v), "...") msg_index = tool_call_tracker[current_tool_call_id] tool_name = messages[msg_index].metadata.get("tool_name", "") messages[msg_index].metadata["title"] = f"⚙️ Calling {tool_name}: {param_value}" yield messages, historic_data_state, vin_data_state, dtc_data_state except json.JSONDecodeError: pass elif hasattr(message_chunk, "content") and message_chunk.content: # Handle text streaming token by token text_content = message_chunk.content if isinstance(text_content, str) and text_content: # Accumulate the full response for DTC extraction if current_message_index is None: assistant_response = text_content else: assistant_response += text_content # Check for complete DTC data block and extract it (only once) if not dtc_extracted: dtc_match = re.search(r'\s*(\[.*?\])\s*', assistant_response, re.DOTALL) if dtc_match: try: # Extract and parse the DTC JSON dtc_json = json.loads(dtc_match.group(1)) dtc_data_state = dtc_json dtc_extracted = True # Mark as extracted print(f"✅ Extracted DTC data from LLM response: {dtc_json}") except json.JSONDecodeError as e: print(f"⚠️ Could not parse DTC data from LLM response: {e}") # Remove the DTC_DATA block from display text (only from accumulated response) display_response = re.sub(r'.*?', '', assistant_response, flags=re.DOTALL) # Stream the token (calculate what to add by comparing with previous content) if current_message_index is None: # Start a new assistant message with clean content messages.append(ChatMessage(role="assistant", content=display_response)) current_message_index = len(messages) - 1 else: # Update the message with the clean accumulated content messages[current_message_index].content = display_response yield messages, historic_data_state, vin_data_state, dtc_data_state elif isinstance(text_content, list): # Handle content blocks for block in text_content: if isinstance(block, dict) and block.get("type") == "text": text = block.get("text", "") if text: # Accumulate the full response for DTC extraction assistant_response += text # Check for complete DTC data block and extract it (only once) if not dtc_extracted: dtc_match = re.search(r'\s*(\[.*?\])\s*', assistant_response, re.DOTALL) if dtc_match: try: # Extract and parse the DTC JSON dtc_json = json.loads(dtc_match.group(1)) dtc_data_state = dtc_json dtc_extracted = True # Mark as extracted print(f"✅ Extracted DTC data from LLM response: {dtc_json}") except json.JSONDecodeError as e: print(f"⚠️ Could not parse DTC data from LLM response: {e}") # Remove the DTC_DATA block from display (only from accumulated response) display_response = re.sub(r'.*?', '', assistant_response, flags=re.DOTALL) if current_message_index is None: messages.append(ChatMessage(role="assistant", content=display_response)) current_message_index = len(messages) - 1 else: messages[current_message_index].content = display_response yield messages, historic_data_state, vin_data_state, dtc_data_state elif node == "tools": # Reset current message index when we move to tool execution current_message_index = None # Handle tool outputs if hasattr(message_chunk, "content"): tool_output = message_chunk.content tool_call_id = message_chunk.tool_call_id if hasattr(message_chunk, "tool_call_id") else None # Decode escaped newlines for proper display if isinstance(tool_output, str): tool_output = tool_output.replace('\\n', '\n') # Replace "?" with "NO DATA" if tool_output.strip() == "?": tool_output = "NO DATA" # Update the corresponding tool call message if tool_call_id and tool_call_id in tool_call_tracker: msg_index = tool_call_tracker[tool_call_id] # Check if this is historic data tool tool_metadata = messages[msg_index].metadata or {} tool_name = tool_metadata.get("tool_name", "") # Check if toolname ends with "get_elm327_history" if tool_name.endswith("get_elm327_history") and isinstance(tool_output, str): try: # Parse the JSON data historic_data_state = json.loads(tool_output) print(f"Captured historic data: {historic_data_state}") except json.JSONDecodeError: print(f"Could not parse historic data as JSON") # Check if this is VIN decode tool if tool_name == "decode_vin" and isinstance(tool_output, str): # Store the VIN decode output vin_data_state = tool_output print(f"Captured VIN data: {vin_data_state}") messages[msg_index].content = f"{tool_output}\n" else: # Fallback: add as new message messages.append(ChatMessage( role="assistant", content=f"{tool_output}\n", )) yield messages, historic_data_state, vin_data_state, dtc_data_state except ExceptionGroup as eg: # Handle Python 3.11+ ExceptionGroup from TaskGroup error_details = [] for e in eg.exceptions: error_details.append(f" - {type(e).__name__}: {str(e)}") error_message = f"❌ Multiple errors occurred:\n" + "\n".join(error_details[:3]) # Show first 3 if len(eg.exceptions) > 3: error_message += f"\n ... and {len(eg.exceptions) - 3} more" error_message += "\n\n💡 This usually means the elm327 device is unreachable or timing out." messages.append(ChatMessage(role="assistant", content=error_message)) yield messages, historic_data_state, vin_data_state, dtc_data_state except Exception as e: error_message = f"❌ Error: {type(e).__name__}: {str(e)}\n\n💡 Make sure MCP server is online and responding." messages.append(ChatMessage(role="assistant", content=error_message)) print(f"Error during agent interaction: {e}") yield messages, historic_data_state, vin_data_state, dtc_data_state def clear_history(): global checkpointer # Clear the checkpointer memory if it exists if checkpointer is not None: # InMemorySaver stores data in checkpointer.storage dict checkpointer.storage.clear() print("✅ Checkpointer memory cleared") # Return empty states for chatbot, historic_data, vin_data, dtc_data, plot, and vin_dataframe empty_vin_df = pd.DataFrame({ 'Property': ["NO DATA"], 'Value': [''] }) return [], None, None, None, None, empty_vin_df, [] def update_vin_dataframe(vin_data_state): """Update the VIN dataframe from the state data.""" if not vin_data_state: # Return empty dataframe with default structure return pd.DataFrame({ 'Property': [ "NO DATA" ], 'Value': [ '' ] }) # Parse the VIN data (it comes as a formatted string from the tool) # Extract property-value pairs from the string df_data = { 'Property': [ 'Make', 'Model', 'Model Year', 'Vehicle Type', 'Body Class', 'Manufacturer Name', 'Plant City', 'Plant State', 'Plant Country', 'Trim', 'Engine Number of Cylinders', 'Displacement (L)', 'Engine Model', 'Fuel Type - Primary', 'Fuel Type - Secondary', 'Electrification Level', 'Transmission Style', 'Drive Type', 'Number of Doors', 'Number of Seats', 'Gross Vehicle Weight Rating From', 'Error Code', 'Error Text' ], 'Value': [''] * 23 } # Parse the VIN data string and populate values for line in vin_data_state.split('\n'): if ':' in line: key, value = line.split(':', 1) key = key.strip() value = value.strip() if key in df_data['Property']: idx = df_data['Property'].index(key) df_data['Value'][idx] = value return pd.DataFrame(df_data) #method to create a plot def create_plot(data): if data is None or len(data) == 0: # print("No historic data to plot.") return None print("Creating plot with data:", data) # Create DataFrame from actual historic data df = pd.DataFrame(data) # Create long format DataFrame with all three metrics df_combined = pd.DataFrame({ 'time': list(df['time']) + list(df['time']) + list(df['time']), 'value': list(df['rpm']) + list(df['speed']) + list(df['coolant_temp']), 'metric': ['RPM'] * len(df) + ['Speed'] * len(df) + ['Coolant Temp'] * len(df) }) # plot = gr.LinePlot( # value=df_combined, # x="time", # y="value", # color="metric", # x_title="Time (ms)", # y_title="Value", # title="Vehicle Telemetry", # colors_in_legend=["RPM", "Speed", "Coolant Temp"], # color_map={"RPM": "red", "Speed": "blue", "Coolant Temp": "green"} # ) # return plot return df_combined def update_dtc(dtc_data): if dtc_data is None or len(dtc_data) == 0: # print("No DTC data to update.") return [] print("Updating DTC component with data:", dtc_data) return dtc_data # Gradio UI setup with gr.Blocks() as demo: input = None chatbot = None submit_btn = None clear_btn = None examples = None plot_output = None vin_dataframe = None # Create a unique session ID for each user session session_id = gr.State(lambda: str(uuid.uuid4())) historic_data_state = gr.State(None) # State to hold historic data dtc_data_state = gr.State(None) # State to hold DTC data vin_data_state = gr.State(None) # State to hold VIN decode data with gr.Row(equal_height=True, elem_classes="vertical-center"): gr.Image("mechanic_s.png", height=60, width=60, show_label=False, container=False, buttons=[""], scale=0, min_width=0, elem_classes="logo-image") with gr.Column(scale=10, elem_classes="big-title"): gr.Markdown("## Vehicle Diagnostic Assistant 🚗 🛠️") with gr.Row( elem_classes="ask-me"): gr.Markdown("AI agent that connects to your car via an embedded MCP server to provide real-time diagnostics and insights.") with gr.Tabs(): with gr.Tab("Main Dashboard"): with gr.Row(): with gr.Column(scale=6, min_width=600): with gr.Row(): chatbot = gr.Chatbot( label="Diagnostic Agent", avatar_images=( None, "mechanic_s.png", ), height=400, group_consecutive_messages=False, buttons=[""], ) with gr.Row(): input = gr.Textbox( lines=1, label="Ask a question", placeholder="e.g., Can you check for any faults? What's the engine RPM?", show_label=True, buttons=[""], ) with gr.Row(): submit_btn = gr.Button("Run", size="lg", variant="primary", elem_classes="button_color") clear_btn = gr.Button("Clear History", size="lg", variant="secondary") # Example questions gr.Examples( examples=[ ["Can you see if there are any faults using OBD-II?"], ["What's the vehicle VIN?"], ["Check the engine RPM and speed"], ["Read the temperature sensor"], ["Get system information"], ["Get the last 10 historic data"], ], inputs=input ) with gr.Column(scale=4, min_width=400, elem_classes="scroll-column"): with gr.Column(min_width=400): # Added extra Column for scroll with gr.Row(): with gr.Column(): gr.Markdown("### 📊 Historic Vehicle Data", elem_classes="section-title") plot_output = gr.LinePlot( value=None, x="time", y="value", color="metric", x_title="Time (ms)", y_title="Value", title="Vehicle Telemetry", colors_in_legend=["RPM", "Speed", "Coolant Temp"], color_map={"RPM": "red", "Speed": "blue", "Coolant Temp": "green"}, label="Vehicle Telemetry" ) with gr.Row(): with gr.Column(): dtc_component = DTCDisplay( label="Diagnostic Trouble Codes", value=[], scale=1 ) with gr.Row(): with gr.Column(): gr.Markdown("### 🚘 Vehicle Information (VIN Decoded)", elem_classes="section-title") vin_dataframe = gr.DataFrame( value=pd.DataFrame({ 'Property': [ "NO DATA" ], 'Value': [ '' ] }), headers=["Property", "Value"], datatype=["str", "str"], column_count=2, interactive=False, column_widths=["30%", "60%"], max_height="100%", wrap=True ) with gr.Tab("Demo Video"): create_demo_video_tab() with gr.Tab("Hackathon project detail"): create_hackathon_detail_tab() with gr.Tab("MCP Server on embedded device"): create_mcp_server_tab() # Event handlers (must be outside the Row context but can reference components) input.submit( interact_with_langchain_agent, [input, chatbot, session_id, historic_data_state, vin_data_state, dtc_data_state], [chatbot, historic_data_state, vin_data_state, dtc_data_state], trigger_mode="always_last" ).then( lambda:"", None, input ).then( create_plot, historic_data_state, plot_output ).then( update_vin_dataframe, vin_data_state, vin_dataframe ).then( update_dtc, dtc_data_state, dtc_component) submit_btn.click( interact_with_langchain_agent, [input, chatbot, session_id, historic_data_state, vin_data_state, dtc_data_state], [chatbot, historic_data_state, vin_data_state, dtc_data_state], trigger_mode="always_last" ).then( lambda:"", None, input ).then( create_plot, historic_data_state, plot_output ).then( update_vin_dataframe, vin_data_state, vin_dataframe ).then( update_dtc, dtc_data_state, dtc_component) clear_btn.click(clear_history, None, [chatbot, historic_data_state, vin_data_state, dtc_data_state, plot_output, vin_dataframe, dtc_component]) gr.set_static_paths(paths=[Path.cwd().absolute()/"assets"]) demo.launch(css_paths=["styles.css"])