In this tutorial, we explore how to build a fully offline, multi-step reasoning agent that uses the Instructor library to generate structured outputs and reliably orchestrate complex tool calls. In this implementation, we design an agent capable of choosing the right tool, validating inputs, planning multi-stage workflows, and recovering from errors. We bring together Instructor, Transformers, and carefully crafted Pydantic schemas to create an intelligent, adaptive system that mirrors real-world agentic AI behavior. Check out the FULL CODES here.
import subprocess
import sys
def install_dependencies():
import torch
packages = [
"instructor",
"transformers>=4.35.0",
"torch",
"accelerate",
"pydantic>=2.0.0",
"numpy",
"pandas"
]
if torch.cuda.is_available():
packages.append("bitsandbytes")
print("✅ GPU detected - installing quantization support")
else:
print("⚠️ No GPU detected - will use CPU (slower but works)")
for package in packages:
subprocess.check_call([sys.executable, "-m", "pip", "install", "-q", package])
try:
import instructor
except ImportError:
print("📦 Installing dependencies...")
install_dependencies()
print("✅ Installation complete!")
from typing import Literal, Optional, List, Union, Dict, Any
from pydantic import BaseModel, Field, validator
from transformers import AutoTokenizer, AutoModelForCausalLM, pipeline
import instructor
import json
from datetime import datetime
import re
We set up our environment by installing all required dependencies and importing the core libraries. As we lay the foundation for the system, we ensure that everything, from the Instructor to the Transformers, is ready for offline execution. This lets us start with a clean and reliable base for building the agent. Check out the FULL CODES here.
class SQLQuery(BaseModel):
"""Complex SQL generation with validation"""
table: str
columns: List[str]
where_conditions: Optional[Dict[str, Any]] = None
joins: Optional[List[Dict[str, str]]] = None
aggregations: Optional[Dict[str, str]] = None
order_by: Optional[List[str]] = None
@validator('columns')
def validate_columns(cls, v):
if not v:
raise ValueError("Must specify at least one column")
return v
class DataTransformation(BaseModel):
"""Schema for complex data pipeline operations"""
operation: Literal["filter", "aggregate", "join", "pivot", "normalize"]
source_data: str = Field(description="Reference to data source")
parameters: Dict[str, Any]
output_format: Literal["json", "csv", "dataframe"]
class APIRequest(BaseModel):
"""Multi-endpoint API orchestration"""
endpoints: List[Dict[str, str]] = Field(description="List of endpoints to call")
authentication: Dict[str, str]
request_order: Literal["sequential", "parallel", "conditional"]
error_handling: Literal["stop", "continue", "retry"]
max_retries: int = Field(default=3, ge=0, le=10)
class CodeGeneration(BaseModel):
"""Generate and validate code snippets"""
language: Literal["python", "javascript", "sql", "bash"]
purpose: str
code: str = Field(description="The generated code")
dependencies: List[str] = Field(default_factory=list)
test_cases: List[Dict[str, Any]] = Field(default_factory=list)
@validator('code')
def validate_code_safety(cls, v, values):
dangerous = ['eval(', 'exec(', '__import__', 'os.system']
if values.get('language') == 'python':
if any(d in v for d in dangerous):
raise ValueError("Code contains potentially dangerous operations")
return v
class MultiToolPlan(BaseModel):
"""Plan for multi-step tool execution"""
goal: str
steps: List[Dict[str, Any]] = Field(description="Ordered list of tool calls")
dependencies: Dict[str, List[str]] = Field(description="Step dependencies")
fallback_strategy: Optional[str] = None
estimated_duration: float = Field(description="Seconds")
class ToolCall(BaseModel):
"""Enhanced tool selection with context"""
reasoning: str
confidence: float = Field(ge=0.0, le=1.0)
tool_name: Literal["sql_engine", "data_transformer", "api_orchestrator",
"code_generator", "planner", "none"]
tool_input: Optional[Union[SQLQuery, DataTransformation, APIRequest,
CodeGeneration, MultiToolPlan]] = None
requires_human_approval: bool = False
class ExecutionResult(BaseModel):
"""Rich result with metadata"""
success: bool
data: Any
execution_time: float
warnings: List[str] = Field(default_factory=list)
metadata: Dict[str, Any] = Field(default_factory=dict)
We define all the advanced Pydantic schemas that structure how our agent understands SQL queries, data pipelines, API calls, code generation, and multi-step plans. As we build these models, we give our agent strong validation, safety, and clarity in interpreting complex instructions. This becomes the backbone of our agent’s reasoning process. Check out the FULL CODES here.
def sql_engine_tool(params: SQLQuery) -> ExecutionResult:
import time
start = time.time()
mock_tables = {
"users": [
{"id": 1, "name": "Alice", "age": 30, "country": "USA"},
{"id": 2, "name": "Bob", "age": 25, "country": "UK"},
{"id": 3, "name": "Charlie", "age": 35, "country": "USA"},
],
"orders": [
{"id": 1, "user_id": 1, "amount": 100, "status": "completed"},
{"id": 2, "user_id": 1, "amount": 200, "status": "pending"},
{"id": 3, "user_id": 2, "amount": 150, "status": "completed"},
]
}
data = mock_tables.get(params.table, [])
if params.where_conditions:
data = [row for row in data if all(
row.get(k) == v for k, v in params.where_conditions.items()
)]
data = [{col: row.get(col) for col in params.columns} for row in data]
warnings = []
if params.aggregations:
warnings.append("Aggregation simplified in mock mode")
return ExecutionResult(
success=True,
data=data,
execution_time=time.time() - start,
warnings=warnings,
metadata={"rows_affected": len(data), "query_type": "SELECT"}
)
def data_transformer_tool(params: DataTransformation) -> ExecutionResult:
import time
start = time.time()
operations = {
"filter": lambda d, p: [x for x in d if x.get(p['field']) == p['value']],
"aggregate": lambda d, p: {"count": len(d), "operation": p.get('function', 'count')},
"normalize": lambda d, p: [{k: v/p.get('factor', 1) for k, v in x.items()} for x in d]
}
mock_data = [{"value": i, "category": "A" if i % 2 else "B"} for i in range(10)]
op_func = operations.get(params.operation)
if op_func:
result_data = op_func(mock_data, params.parameters)
else:
result_data = mock_data
return ExecutionResult(
success=True,
data=result_data,
execution_time=time.time() - start,
warnings=[],
metadata={"operation": params.operation, "input_rows": len(mock_data)}
)
def api_orchestrator_tool(params: APIRequest) -> ExecutionResult:
import time
start = time.time()
results = []
warnings = []
for i, endpoint in enumerate(params.endpoints):
if params.error_handling == "retry" and i == 1:
warnings.append(f"Endpoint {endpoint.get('url')} failed, retrying...")
results.append({
"endpoint": endpoint.get('url'),
"status": 200,
"data": f"Mock response from {endpoint.get('url')}"
})
return ExecutionResult(
success=True,
data=results,
execution_time=time.time() - start,
warnings=warnings,
metadata={"endpoints_called": len(params.endpoints), "order": params.request_order}
)
def code_generator_tool(params: CodeGeneration) -> ExecutionResult:
import time
start = time.time()
warnings = []
if len(params.code) > 1000:
warnings.append("Generated code is quite long, consider refactoring")
if not params.test_cases:
warnings.append("No test cases provided for generated code")
return ExecutionResult(
success=True,
data={"code": params.code, "language": params.language, "dependencies": params.dependencies},
execution_time=time.time() - start,
warnings=warnings,
metadata={"lines_of_code": len(params.code.split('\n'))}
)
def planner_tool(params: MultiToolPlan) -> ExecutionResult:
import time
start = time.time()
warnings = []
if len(params.steps) > 10:
warnings.append("Plan has many steps, consider breaking into sub-plans")
for step_id, deps in params.dependencies.items():
if step_id in deps:
warnings.append(f"Circular dependency detected in step {step_id}")
return ExecutionResult(
success=True,
data={"plan": params.steps, "estimated_time": params.estimated_duration},
execution_time=time.time() - start,
warnings=warnings,
metadata={"total_steps": len(params.steps)}
)
TOOLS = {
"sql_engine": sql_engine_tool,
"data_transformer": data_transformer_tool,
"api_orchestrator": api_orchestrator_tool,
"code_generator": code_generator_tool,
"planner": planner_tool
}
We implement the actual tools, SQL execution, data transformation, API orchestration, code validation, and planning. As we write these tool functions, we simulate realistic workflows with controlled outputs and error handling. This allows us to test the agent’s decision-making in an environment that mirrors real-world tasks. Check out the FULL CODES here.
class AdvancedToolAgent:
"""Agent with complex reasoning, error recovery, and multi-step planning"""
def __init__(self, model_name: str = "HuggingFaceH4/zephyr-7b-beta"):
import torch
print(f"🤖 Loading model: {model_name}")
model_kwargs = {"device_map": "auto"}
if torch.cuda.is_available():
print("💫 GPU detected - using 8-bit quantization")
from transformers import BitsAndBytesConfig
quantization_config = BitsAndBytesConfig(
load_in_8bit=True,
llm_int8_threshold=6.0
)
model_kwargs["quantization_config"] = quantization_config
else:
print("💻 CPU mode - using smaller model for better performance")
model_name = "google/flan-t5-base"
model_kwargs["torch_dtype"] = "auto"
self.tokenizer = AutoTokenizer.from_pretrained(model_name)
self.model = AutoModelForCausalLM.from_pretrained(
model_name,
**model_kwargs
)
self.pipe = pipeline(
"text-generation", model=self.model, tokenizer=self.tokenizer,
max_new_tokens=768, temperature=0.7, do_sample=True
)
self.client = instructor.from_pipe(self.pipe)
self.execution_history = []
print("✅ Agent initialized!")
def route_to_tool(self, user_query: str, context: Optional[str] = None) -> ToolCall:
tool_descriptions = """
Advanced Tools:
- sql_engine: Execute complex SQL queries with joins, aggregations, filtering
- data_transformer: Multi-step data pipelines (filter→aggregate→normalize)
- api_orchestrator: Call multiple APIs with dependencies, retries, error handling
- code_generator: Generate safe, validated code with tests in multiple languages
- planner: Create multi-step execution plans with dependency management
- none: Answer directly using reasoning
"""
prompt = f"""{tool_descriptions}
User query: {user_query}
{f'Context from previous steps: {context}' if context else ''}
Analyze the complexity and choose the appropriate tool. For multi-step tasks, use the planner."""
return self.client(prompt, response_model=ToolCall)
def execute_with_recovery(self, tool_call: ToolCall, max_retries: int = 2) -> ExecutionResult:
for attempt in range(max_retries + 1):
try:
if tool_call.tool_name == "none":
return ExecutionResult(
success=True, data="Direct response", execution_time=0.0,
warnings=[], metadata={}
)
tool_func = TOOLS.get(tool_call.tool_name)
if not tool_func:
return ExecutionResult(
success=False, data=None, execution_time=0.0,
warnings=[f"Tool {tool_call.tool_name} not found"], metadata={}
)
result = tool_func(tool_call.tool_input)
self.execution_history.append({
"tool": tool_call.tool_name,
"success": result.success,
"timestamp": datetime.now().isoformat()
})
return result
except Exception as e:
if attempt < max_retries:
print(f" ⚠️ Attempt {attempt + 1} failed, retrying...")
continue
return ExecutionResult(
success=False, data=None, execution_time=0.0,
warnings=[f"Failed after {max_retries + 1} attempts: {str(e)}"],
metadata={"error": str(e)}
)
We construct the agent itself, loading the model, building the routing pipeline, and implementing recovery logic. As we define methods for tool selection and execution, we give the agent the ability to understand queries, choose strategies, and gracefully handle failures. Check out the FULL CODES here.
def run(self, user_query: str, verbose: bool = True) -> Dict[str, Any]:
if verbose:
print(f"\n{'='*70}")
print(f"🎯 Complex Query: {user_query}")
print(f"{'='*70}")
if verbose:
print("\n🧠 Step 1: Analyzing query complexity & routing...")
tool_call = self.route_to_tool(user_query)
if verbose:
print(f" → Tool: {tool_call.tool_name}")
print(f" → Confidence: {tool_call.confidence:.2%}")
print(f" → Reasoning: {tool_call.reasoning}")
if tool_call.requires_human_approval:
print(f" ⚠️ Requires human approval!")
if verbose:
print("\n⚙️ Step 2: Executing tool with error recovery...")
result = self.execute_with_recovery(tool_call)
if verbose:
print(f" → Success: {result.success}")
print(f" → Execution time: {result.execution_time:.3f}s")
if result.warnings:
print(f" → Warnings: {', '.join(result.warnings)}")
print(f" → Data preview: {str(result.data)[:200]}...")
if verbose and result.metadata:
print(f"\n📊 Metadata:")
for key, value in result.metadata.items():
print(f" • {key}: {value}")
if verbose:
print(f"\n{'='*70}\n")
return {
"query": user_query,
"tool_used": tool_call.tool_name,
"result": result,
"history_length": len(self.execution_history)
}
def main():
agent = AdvancedToolAgent()
hard_queries = [
"Generate a SQL query to find all users from USA who have completed orders worth more than $150, and join with their order details",
"Create a data pipeline that filters records where category='A', then aggregates by count, and normalizes the results by a factor of 100",
"I need to call 3 APIs sequentially: first authenticate at /auth, then fetch user data at /users/{id}, and finally update preferences at /preferences. If any step fails, retry up to 3 times",
"Write a Python function that validates email addresses using regex, includes error handling, and has at least 2 test cases. Make sure it doesn't use any dangerous operations",
"Create a multi-step plan to: 1) Extract data from a database, 2) Transform it using pandas, 3) Generate a report, 4) Send via email. Show dependencies between steps"
]
print("\n" + "🔥 HARD MODE: COMPLEX QUERIES ".center(70, "=") + "\n")
for i, query in enumerate(hard_queries, 1):
print(f"\n{'#'*70}")
print(f"# CHALLENGE {i}/{len(hard_queries)}")
print(f"{'#'*70}")
try:
agent.run(query, verbose=True)
except Exception as e:
print(f"❌ Critical error: {e}\n")
print("\n" + f"✅ COMPLETED {len(agent.execution_history)} TOOL EXECUTIONS ".center(70, "=") + "\n")
print(f"📊 Success rate: {sum(1 for h in agent.execution_history if h['success']) / len(agent.execution_history) * 100:.1f}%")
if __name__ == "__main__":
main()
We tie everything together with a run() method and a demo main() function that executes multiple hard-mode queries. As we watch the agent analyze, route, execute, and report results, we see the full power of the architecture in action. This final step lets us experience how the system performs under complex, realistic scenarios.
In conclusion, we have built a powerful agent capable of understanding intricate instructions, routing execution across multiple tools, and gracefully recovering from errors, all within a compact, offline system. As we test it on challenging queries, we watch it plan, reason, and execute with clarity and structure. We now appreciate how modular schemas, validated tool calls, and layered execution logic allow us to create agents that behave reliably in complex environments.
Check out the FULL CODES here. Feel free to check out our GitHub Page for Tutorials, Codes and Notebooks. Also, feel free to follow us on Twitter and don’t forget to join our 100k+ ML SubReddit and Subscribe to our Newsletter. Wait! are you on telegram? now you can join us on telegram as well.
Asif Razzaq is the CEO of Marktechpost Media Inc.. As a visionary entrepreneur and engineer, Asif is committed to harnessing the potential of Artificial Intelligence for social good. His most recent endeavor is the launch of an Artificial Intelligence Media Platform, Marktechpost, which stands out for its in-depth coverage of machine learning and deep learning news that is both technically sound and easily understandable by a wide audience. The platform boasts of over 2 million monthly views, illustrating its popularity among audiences.
