Building Advanced MCP (Model Context Protocol) Agents with Multi-Agent Coordination, Context Awareness, and Gemini Integration

byrn
By byrn
6 Min Read


class MCPAgent:
   """Advanced MCP Agent with evolved capabilities - Jupyter Compatible"""
  
   def __init__(self, agent_id: str, role: AgentRole, api_key: str = None):
       self.agent_id = agent_id
       self.role = role
       self.api_key = api_key
       self.memory = []
       self.context = AgentContext(
           agent_id=agent_id,
           role=role,
           capabilities=self._init_capabilities(),
           memory=[],
           tools=self._init_tools()
       )
      
       self.model = None
       if GEMINI_AVAILABLE and api_key:
           try:
               genai.configure(api_key=api_key)
               self.model = genai.GenerativeModel('gemini-pro')
               print(f"✅ Agent {agent_id} initialized with Gemini API")
           except Exception as e:
               print(f"⚠️  Gemini configuration failed: {e}")
               print("💡 Running in demo mode with simulated responses")
       else:
           print(f"🎭 Agent {agent_id} running in demo mode")
      
   def _init_capabilities(self) -> List[str]:
       """Initialize role-specific capabilities"""
       capabilities_map = {
           AgentRole.COORDINATOR: ["task_decomposition", "agent_orchestration", "priority_management"],
           AgentRole.RESEARCHER: ["data_gathering", "web_search", "information_synthesis"],
           AgentRole.ANALYZER: ["pattern_recognition", "data_analysis", "insight_generation"],
           AgentRole.EXECUTOR: ["action_execution", "result_validation", "output_formatting"]
       }
       return capabilities_map.get(self.role, [])
  
   def _init_tools(self) -> List[str]:
       """Initialize available tools based on role"""
       tools_map = {
           AgentRole.COORDINATOR: ["task_splitter", "agent_selector", "progress_tracker"],
           AgentRole.RESEARCHER: ["search_engine", "data_extractor", "source_validator"],
           AgentRole.ANALYZER: ["statistical_analyzer", "pattern_detector", "visualization_tool"],
           AgentRole.EXECUTOR: ["code_executor", "file_handler", "api_caller"]
       }
       return tools_map.get(self.role, [])
  
   def process_message(self, message: str, context: Optional[Dict] = None) -> Dict[str, Any]:
       """Process incoming message with context awareness - Synchronous version"""
      
       msg = Message(
           role="user",
           content=message,
           timestamp=datetime.now(),
           metadata=context
       )
       self.memory.append(msg)
      
       prompt = self._generate_contextual_prompt(message, context)
      
       try:
           if self.model:
               response = self._generate_response_gemini(prompt)
           else:
               response = self._generate_demo_response(message)
          
           response_msg = Message(
               role="assistant",
               content=response,
               timestamp=datetime.now(),
               metadata={"agent_id": self.agent_id, "role": self.role.value}
           )
           self.memory.append(response_msg)
          
           return {
               "agent_id": self.agent_id,
               "role": self.role.value,
               "response": response,
               "capabilities_used": self._analyze_capabilities_used(message),
               "next_actions": self._suggest_next_actions(response),
               "timestamp": datetime.now().isoformat()
           }
          
       except Exception as e:
           logger.error(f"Error processing message: {e}")
           return {"error": str(e)}
  
   def _generate_response_gemini(self, prompt: str) -> str:
       """Generate response using Gemini API - Synchronous"""
       try:
           response = self.model.generate_content(prompt)
           return response.text
       except Exception as e:
           logger.error(f"Gemini API error: {e}")
           return self._generate_demo_response(prompt)
  
   def _generate_demo_response(self, message: str) -> str:
       """Generate simulated response for demo purposes"""
       role_responses = {
           AgentRole.COORDINATOR: f"As coordinator, I'll break down the task: '{message[:50]}...' into manageable components and assign them to specialized agents.",
           AgentRole.RESEARCHER: f"I'll research information about: '{message[:50]}...' using my data gathering and synthesis capabilities.",
           AgentRole.ANALYZER: f"Analyzing the patterns and insights from: '{message[:50]}...' to provide data-driven recommendations.",
           AgentRole.EXECUTOR: f"I'll execute the necessary actions for: '{message[:50]}...' and validate the results."
       }
      
       base_response = role_responses.get(self.role, f"Processing: {message[:50]}...")
      
       time.sleep(0.5) 
      
       additional_context = {
           AgentRole.COORDINATOR: " I've identified 3 key subtasks and will coordinate their execution across the agent team.",
           AgentRole.RESEARCHER: " My research indicates several relevant sources and current trends in this area.",
           AgentRole.ANALYZER: " The data shows interesting correlations and actionable insights for decision making.",
           AgentRole.EXECUTOR: " I've completed the requested actions and verified the outputs meet quality standards."
       }
      
       return base_response + additional_context.get(self.role, "")
  
   def _generate_contextual_prompt(self, message: str, context: Optional[Dict]) -> str:
       """Generate context-aware prompt based on agent role"""
      
       base_prompt = f"""
       You are an advanced AI agent with the role: {self.role.value}
       Your capabilities: {', '.join(self.context.capabilities)}
       Available tools: {', '.join(self.context.tools)}
      
       Recent conversation context:
       {self._get_recent_context()}
      
       Current request: {message}
       """
      
       role_instructions = {
           AgentRole.COORDINATOR: """
           Focus on breaking down complex tasks, coordinating with other agents,
           and maintaining overall project coherence. Consider dependencies and priorities.
           Provide clear task decomposition and agent assignments.
           """,
           AgentRole.RESEARCHER: """
           Prioritize accurate information gathering, source verification,
           and comprehensive data collection. Synthesize findings clearly.
           Focus on current trends and reliable sources.
           """,
           AgentRole.ANALYZER: """
           Focus on pattern recognition, data interpretation, and insight generation.
           Provide evidence-based conclusions and actionable recommendations.
           Highlight key correlations and implications.
           """,
           AgentRole.EXECUTOR: """
           Concentrate on practical implementation, result validation,
           and clear output delivery. Ensure actions are completed effectively.
           Focus on quality and completeness of execution.
           """
       }
      
       return base_prompt + role_instructions.get(self.role, "")
  
   def _get_recent_context(self, limit: int = 3) -> str:
       """Get recent conversation context"""
       if not self.memory:
           return "No previous context"
      
       recent = self.memory[-limit:]
       context_str = ""
       for msg in recent:
           context_str += f"{msg.role}: {msg.content[:100]}...\n"
       return context_str
  
   def _analyze_capabilities_used(self, message: str) -> List[str]:
       """Analyze which capabilities were likely used"""
       used_capabilities = []
       message_lower = message.lower()
      
       capability_keywords = {
           "task_decomposition": ["break down", "divide", "split", "decompose"],
           "data_gathering": ["research", "find", "collect", "gather"],
           "pattern_recognition": ["analyze", "pattern", "trend", "correlation"],
           "action_execution": ["execute", "run", "implement", "perform"],
           "agent_orchestration": ["coordinate", "manage", "organize", "assign"],
           "information_synthesis": ["synthesize", "combine", "merge", "integrate"]
       }
      
       for capability, keywords in capability_keywords.items():
           if capability in self.context.capabilities:
               if any(keyword in message_lower for keyword in keywords):
                   used_capabilities.append(capability)
      
       return used_capabilities
  
   def _suggest_next_actions(self, response: str) -> List[str]:
       """Suggest logical next actions based on response"""
       suggestions = []
       response_lower = response.lower()
      
       if "need more information" in response_lower or "research" in response_lower:
           suggestions.append("delegate_to_researcher")
       if "analyze" in response_lower or "pattern" in response_lower:
           suggestions.append("delegate_to_analyzer") 
       if "implement" in response_lower or "execute" in response_lower:
           suggestions.append("delegate_to_executor")
       if "coordinate" in response_lower or "manage" in response_lower:
           suggestions.append("initiate_multi_agent_collaboration")
       if "subtask" in response_lower or "break down" in response_lower:
           suggestions.append("task_decomposition_required")
          
       return suggestions if suggestions else ["continue_conversation"]



Source link

Share This Article
Leave a Comment

Leave a Reply

Your email address will not be published. Required fields are marked *