How to Build an Asynchronous AI Agent Network Using Gemini for Research, Analysis, and Validation Tasks

byrn
By byrn
11 Min Read


In this tutorial, we introduce the Gemini Agent Network Protocol, a powerful and flexible framework designed to enable intelligent collaboration among specialized AI agents. Leveraging Google’s Gemini models, the protocol facilitates dynamic communication between agents, each equipped with distinct roles: Analyzer, Researcher, Synthesizer, and Validator. Users will learn to set up and configure an asynchronous agent network, enabling automated task distribution, collaborative problem-solving, and enriched dialogue management. Ideal for scenarios such as in-depth research, complex data analysis, and information validation, this framework empowers users to harness collective AI intelligence efficiently.

import asyncio
import json
import random
from dataclasses import dataclass, asdict
from typing import Dict, List, Optional, Any
from enum import Enum
import google.generativeai as genai

We leverage asyncio for concurrent execution, dataclasses for structured message management, and Google’s Generative AI (google.generativeai) to facilitate interactions among multiple AI-driven agents. It includes utilities for dynamic message handling and structured agent roles, enhancing scalability and flexibility in collaborative AI tasks.

API_KEY = None


try:
    import google.colab
    IN_COLAB = True
except ImportError:
    IN_COLAB = False

We initialize the API_KEY and detect whether the code is running in a Colab environment. If the google.colab module is successfully imported, the IN_COLAB flag is set to True; otherwise, it defaults to False, allowing the script to adjust behavior accordingly.

class AgentType(Enum):
    ANALYZER = "analyzer"
    RESEARCHER = "researcher"
    SYNTHESIZER = "synthesizer"
    VALIDATOR = "validator"


@dataclass
class Message:
    sender: str
    receiver: str
    content: str
    msg_type: str
    metadata: Dict = None

Check out the Notebook

We define the core structures for agent interaction. The AgentType enum categorizes agents into four distinct roles, Analyzer, Researcher, Synthesizer, and Validator, each with a specific function in the collaborative network. The Message dataclass represents the format for inter-agent communication, encapsulating sender and receiver IDs, message content, type, and optional metadata.

class GeminiAgent:
    def __init__(self, agent_id: str, agent_type: AgentType, network: 'AgentNetwork'):
        self.id = agent_id
        self.type = agent_type
        self.network = network
        self.model = genai.GenerativeModel('gemini-2.0-flash')
        self.inbox = asyncio.Queue()
        self.context_memory = []
       
        self.system_prompts = {
            AgentType.ANALYZER: "You are a data analyzer. Break down complex problems into components and identify key patterns.",
            AgentType.RESEARCHER: "You are a researcher. Gather information and provide detailed context on topics.",
            AgentType.SYNTHESIZER: "You are a synthesizer. Combine information from multiple sources into coherent insights.",
            AgentType.VALIDATOR: "You are a validator. Check accuracy and consistency of information and conclusions."
        }
   
    async def process_message(self, message: Message):
        """Process incoming message and generate response"""
        if not API_KEY:
            return "❌ API key not configured. Please set API_KEY variable."
           
        prompt = f"""
        {self.system_prompts[self.type]}
       
        Context from previous interactions: {json.dumps(self.context_memory[-3:], indent=2)}
       
        Message from {message.sender}: {message.content}
       
        Provide a focused response (max 100 words) that adds value to the network discussion.
        """
       
        try:
            response = await asyncio.to_thread(
                self.model.generate_content, prompt
            )
            return response.text.strip()
        except Exception as e:
            return f"Error processing: {str(e)}"
   
    async def send_message(self, receiver_id: str, content: str, msg_type: str = "task"):
        """Send message to another agent"""
        message = Message(self.id, receiver_id, content, msg_type)
        await self.network.route_message(message)
   
    async def broadcast(self, content: str, exclude_self: bool = True):
        """Broadcast message to all agents in network"""
        for agent_id in self.network.agents:
            if exclude_self and agent_id == self.id:
                continue
            await self.send_message(agent_id, content, "broadcast")
   
    async def run(self):
        """Main agent loop"""
        while True:
            try:
                message = await asyncio.wait_for(self.inbox.get(), timeout=1.0)
               
                response = await self.process_message(message)
               
                self.context_memory.append({
                    "from": message.sender,
                    "content": message.content,
                    "my_response": response
                })
               
                if len(self.context_memory) > 10:
                    self.context_memory = self.context_memory[-10:]
               
                print(f"🤖 {self.id} ({self.type.value}): {response}")
               
                if random.random() < 0.3:  
                    other_agents = [aid for aid in self.network.agents.keys() if aid != self.id]
                    if other_agents:
                        target = random.choice(other_agents)
                        await self.send_message(target, f"Building on that: {response[:50]}...")
               
            except asyncio.TimeoutError:
                continue
            except Exception as e:
                print(f"❌ Error in {self.id}: {e}")

Check out the Notebook

The GeminiAgent class defines the behavior and capabilities of each agent in the network. Upon initialization, it assigns a unique ID, role type, and a reference to the agent network and loads the Gemini 2.0 Flash model. It uses role-specific system prompts to generate intelligent responses based on incoming messages, which are processed asynchronously through a queue. Each agent maintains a context memory to retain recent interactions and can either respond directly, send targeted messages, or broadcast insights to others. The run() method continuously processes messages, promotes collaboration by occasionally initiating responses to other agents, and manages message handling in a non-blocking loop.

class AgentNetwork:
    def __init__(self):
        self.agents: Dict[str, GeminiAgent] = {}
        self.message_log = []
        self.running = False
   
    def add_agent(self, agent_type: AgentType, agent_id: Optional[str] = None):
        """Add new agent to network"""
        if not agent_id:
            agent_id = f"{agent_type.value}_{len(self.agents)+1}"
       
        agent = GeminiAgent(agent_id, agent_type, self)
        self.agents[agent_id] = agent
        print(f"✅ Added {agent_id} to network")
        return agent_id
   
    async def route_message(self, message: Message):
        """Route message to target agent"""
        self.message_log.append(asdict(message))
       
        if message.receiver in self.agents:
            await self.agents[message.receiver].inbox.put(message)
        else:
            print(f"⚠️  Agent {message.receiver} not found")
   
    async def initiate_task(self, task: str):
        """Start a collaborative task"""
        print(f"🚀 Starting task: {task}")
       
        analyzer_agents = [aid for aid, agent in self.agents.items()
                          if agent.type == AgentType.ANALYZER]
       
        if analyzer_agents:
            initial_message = Message("system", analyzer_agents[0], task, "task")
            await self.route_message(initial_message)
   
    async def run_network(self, duration: int = 30):
        """Run the agent network for specified duration"""
        self.running = True
        print(f"🌐 Starting agent network for {duration} seconds...")
       
        agent_tasks = [agent.run() for agent in self.agents.values()]
       
        try:
            await asyncio.wait_for(asyncio.gather(*agent_tasks), timeout=duration)
        except asyncio.TimeoutError:
            print("⏰ Network session completed")
        finally:
            self.running = False

Check out the Notebook

The AgentNetwork class manages the coordination and communication between all agents in the system. It allows dynamic addition of agents with unique IDs and specified roles, maintains a log of all exchanged messages, and facilitates message routing to the correct recipient. The network can initiate a collaborative task by sending the starting message to an Analyzer agent, and runs the full asynchronous event loop for a specified duration, enabling agents to operate concurrently and interactively within a shared environment.

async def demo_agent_network():
    """Demonstrate the Gemini Agent Network Protocol"""
   
    network = AgentNetwork()
   
    network.add_agent(AgentType.ANALYZER, "deep_analyzer")
    network.add_agent(AgentType.RESEARCHER, "info_gatherer")
    network.add_agent(AgentType.SYNTHESIZER, "insight_maker")
    network.add_agent(AgentType.VALIDATOR, "fact_checker")
   
    task = "Analyze the potential impact of quantum computing on cybersecurity"
   
    network_task = asyncio.create_task(network.run_network(20))
    await asyncio.sleep(1)  
    await network.initiate_task(task)
    await network_task
   
    print(f"\n📊 Network completed with {len(network.message_log)} messages exchanged")
    agent_participation = {aid: sum(1 for msg in network.message_log if msg['sender'] == aid)
                          for aid in network.agents}
    print("Agent participation:", agent_participation)


def setup_api_key():
    """Interactive API key setup"""
    global API_KEY
   
    if IN_COLAB:
        from google.colab import userdata
        try:
            API_KEY = userdata.get('GEMINI_API_KEY')
            genai.configure(api_key=API_KEY)
            print("✅ API key loaded from Colab secrets")
            return True
        except:
            print("💡 To use Colab secrets: Add 'GEMINI_API_KEY' in the secrets panel")
   
    print("🔑 Please enter your Gemini API key:")
    print("   Get it from: https://makersuite.google.com/app/apikey")
   
    try:
        if IN_COLAB:
            from google.colab import userdata
            API_KEY = input("Paste your API key here: ").strip()
        else:
            import getpass
            API_KEY = getpass.getpass("Paste your API key here: ").strip()
       
        if API_KEY and len(API_KEY) > 10:
            genai.configure(api_key=API_KEY)
            print("✅ API key configured successfully!")
            return True
        else:
            print("❌ Invalid API key")
            return False
    except KeyboardInterrupt:
        print("\n❌ Setup cancelled")
        return False

Check out the Notebook

The demo_agent_network() function orchestrates the entire agent workflow: it initializes an agent network, adds four role-specific agents, launches a cybersecurity task, and runs the network asynchronously for a fixed duration while tracking message exchanges and agent participation. Meanwhile, setup_api_key() provides an interactive mechanism to securely configure the Gemini API key, with tailored logic for both Colab and non-Colab environments, ensuring the AI agents can communicate with the Gemini model backend before the demo begins.

if __name__ == "__main__":
    print("🧠 Gemini Agent Network Protocol")
    print("=" * 40)
   
    if not setup_api_key():
        print("❌ Cannot run without valid API key")
        exit()
   
    print("\n🚀 Starting demo...")
   
    if IN_COLAB:
        import nest_asyncio
        nest_asyncio.apply()
        loop = asyncio.get_event_loop()
        loop.run_until_complete(demo_agent_network())
    else:
        asyncio.run(demo_agent_network())

Finally, the above code serves as the entry point for executing the Gemini Agent Network Protocol. It begins by prompting the user to set up the Gemini API key, exiting if not provided. Upon successful configuration, the demo is launched. If running in Google Colab, it applies nest_asyncio to handle Colab’s event loop restrictions; otherwise, it uses Python’s native asyncio.run() to execute the asynchronous demo of agent collaboration.

In conclusion, by completing this tutorial, users gain practical knowledge of implementing an AI-powered collaborative network using Gemini agents. The hands-on experience provided here demonstrates how autonomous agents can effectively break down complex problems, collaboratively generate insights, and ensure the accuracy of information through validation.


Check out the Notebook. All credit for this research goes to the researchers of this project. Also, feel free to follow us on Twitter and don’t forget to join our 99k+ ML SubReddit and Subscribe to our Newsletter.


Asif Razzaq is the CEO of Marktechpost Media Inc.. As a visionary entrepreneur and engineer, Asif is committed to harnessing the potential of Artificial Intelligence for social good. His most recent endeavor is the launch of an Artificial Intelligence Media Platform, Marktechpost, which stands out for its in-depth coverage of machine learning and deep learning news that is both technically sound and easily understandable by a wide audience. The platform boasts of over 2 million monthly views, illustrating its popularity among audiences.



Source link

Share This Article
Leave a Comment

Leave a Reply

Your email address will not be published. Required fields are marked *