class MCPAgent:
"""Advanced MCP Agent with evolved capabilities - Jupyter Compatible"""
def __init__(self, agent_id: str, role: AgentRole, api_key: str = None):
self.agent_id = agent_id
self.role = role
self.api_key = api_key
self.memory = []
self.context = AgentContext(
agent_id=agent_id,
role=role,
capabilities=self._init_capabilities(),
memory=[],
tools=self._init_tools()
)
self.model = None
if GEMINI_AVAILABLE and api_key:
try:
genai.configure(api_key=api_key)
self.model = genai.GenerativeModel('gemini-pro')
print(f"✅ Agent {agent_id} initialized with Gemini API")
except Exception as e:
print(f"⚠️ Gemini configuration failed: {e}")
print("💡 Running in demo mode with simulated responses")
else:
print(f"🎭 Agent {agent_id} running in demo mode")
def _init_capabilities(self) -> List[str]:
"""Initialize role-specific capabilities"""
capabilities_map = {
AgentRole.COORDINATOR: ["task_decomposition", "agent_orchestration", "priority_management"],
AgentRole.RESEARCHER: ["data_gathering", "web_search", "information_synthesis"],
AgentRole.ANALYZER: ["pattern_recognition", "data_analysis", "insight_generation"],
AgentRole.EXECUTOR: ["action_execution", "result_validation", "output_formatting"]
}
return capabilities_map.get(self.role, [])
def _init_tools(self) -> List[str]:
"""Initialize available tools based on role"""
tools_map = {
AgentRole.COORDINATOR: ["task_splitter", "agent_selector", "progress_tracker"],
AgentRole.RESEARCHER: ["search_engine", "data_extractor", "source_validator"],
AgentRole.ANALYZER: ["statistical_analyzer", "pattern_detector", "visualization_tool"],
AgentRole.EXECUTOR: ["code_executor", "file_handler", "api_caller"]
}
return tools_map.get(self.role, [])
def process_message(self, message: str, context: Optional[Dict] = None) -> Dict[str, Any]:
"""Process incoming message with context awareness - Synchronous version"""
msg = Message(
role="user",
content=message,
timestamp=datetime.now(),
metadata=context
)
self.memory.append(msg)
prompt = self._generate_contextual_prompt(message, context)
try:
if self.model:
response = self._generate_response_gemini(prompt)
else:
response = self._generate_demo_response(message)
response_msg = Message(
role="assistant",
content=response,
timestamp=datetime.now(),
metadata={"agent_id": self.agent_id, "role": self.role.value}
)
self.memory.append(response_msg)
return {
"agent_id": self.agent_id,
"role": self.role.value,
"response": response,
"capabilities_used": self._analyze_capabilities_used(message),
"next_actions": self._suggest_next_actions(response),
"timestamp": datetime.now().isoformat()
}
except Exception as e:
logger.error(f"Error processing message: {e}")
return {"error": str(e)}
def _generate_response_gemini(self, prompt: str) -> str:
"""Generate response using Gemini API - Synchronous"""
try:
response = self.model.generate_content(prompt)
return response.text
except Exception as e:
logger.error(f"Gemini API error: {e}")
return self._generate_demo_response(prompt)
def _generate_demo_response(self, message: str) -> str:
"""Generate simulated response for demo purposes"""
role_responses = {
AgentRole.COORDINATOR: f"As coordinator, I'll break down the task: '{message[:50]}...' into manageable components and assign them to specialized agents.",
AgentRole.RESEARCHER: f"I'll research information about: '{message[:50]}...' using my data gathering and synthesis capabilities.",
AgentRole.ANALYZER: f"Analyzing the patterns and insights from: '{message[:50]}...' to provide data-driven recommendations.",
AgentRole.EXECUTOR: f"I'll execute the necessary actions for: '{message[:50]}...' and validate the results."
}
base_response = role_responses.get(self.role, f"Processing: {message[:50]}...")
time.sleep(0.5)
additional_context = {
AgentRole.COORDINATOR: " I've identified 3 key subtasks and will coordinate their execution across the agent team.",
AgentRole.RESEARCHER: " My research indicates several relevant sources and current trends in this area.",
AgentRole.ANALYZER: " The data shows interesting correlations and actionable insights for decision making.",
AgentRole.EXECUTOR: " I've completed the requested actions and verified the outputs meet quality standards."
}
return base_response + additional_context.get(self.role, "")
def _generate_contextual_prompt(self, message: str, context: Optional[Dict]) -> str:
"""Generate context-aware prompt based on agent role"""
base_prompt = f"""
You are an advanced AI agent with the role: {self.role.value}
Your capabilities: {', '.join(self.context.capabilities)}
Available tools: {', '.join(self.context.tools)}
Recent conversation context:
{self._get_recent_context()}
Current request: {message}
"""
role_instructions = {
AgentRole.COORDINATOR: """
Focus on breaking down complex tasks, coordinating with other agents,
and maintaining overall project coherence. Consider dependencies and priorities.
Provide clear task decomposition and agent assignments.
""",
AgentRole.RESEARCHER: """
Prioritize accurate information gathering, source verification,
and comprehensive data collection. Synthesize findings clearly.
Focus on current trends and reliable sources.
""",
AgentRole.ANALYZER: """
Focus on pattern recognition, data interpretation, and insight generation.
Provide evidence-based conclusions and actionable recommendations.
Highlight key correlations and implications.
""",
AgentRole.EXECUTOR: """
Concentrate on practical implementation, result validation,
and clear output delivery. Ensure actions are completed effectively.
Focus on quality and completeness of execution.
"""
}
return base_prompt + role_instructions.get(self.role, "")
def _get_recent_context(self, limit: int = 3) -> str:
"""Get recent conversation context"""
if not self.memory:
return "No previous context"
recent = self.memory[-limit:]
context_str = ""
for msg in recent:
context_str += f"{msg.role}: {msg.content[:100]}...\n"
return context_str
def _analyze_capabilities_used(self, message: str) -> List[str]:
"""Analyze which capabilities were likely used"""
used_capabilities = []
message_lower = message.lower()
capability_keywords = {
"task_decomposition": ["break down", "divide", "split", "decompose"],
"data_gathering": ["research", "find", "collect", "gather"],
"pattern_recognition": ["analyze", "pattern", "trend", "correlation"],
"action_execution": ["execute", "run", "implement", "perform"],
"agent_orchestration": ["coordinate", "manage", "organize", "assign"],
"information_synthesis": ["synthesize", "combine", "merge", "integrate"]
}
for capability, keywords in capability_keywords.items():
if capability in self.context.capabilities:
if any(keyword in message_lower for keyword in keywords):
used_capabilities.append(capability)
return used_capabilities
def _suggest_next_actions(self, response: str) -> List[str]:
"""Suggest logical next actions based on response"""
suggestions = []
response_lower = response.lower()
if "need more information" in response_lower or "research" in response_lower:
suggestions.append("delegate_to_researcher")
if "analyze" in response_lower or "pattern" in response_lower:
suggestions.append("delegate_to_analyzer")
if "implement" in response_lower or "execute" in response_lower:
suggestions.append("delegate_to_executor")
if "coordinate" in response_lower or "manage" in response_lower:
suggestions.append("initiate_multi_agent_collaboration")
if "subtask" in response_lower or "break down" in response_lower:
suggestions.append("task_decomposition_required")
return suggestions if suggestions else ["continue_conversation"]