Agentic + LLM Systems

Agents and Multi-Agent Frameworks in Python

A practical guide to building agents and multi-agent systems in Python, including environments, goal-oriented behavior, communication, cooperation, BDI agents, learning agents, and real-world automation patterns.

Share this article
Comments
Share:
Table of Contents

Introduction to Agents and Multi-Agent Frameworks

Agents are software entities that perceive an environment, make decisions, and take actions based on goals, rules, policies, or learned behavior. Multi-agent systems extend this idea by allowing multiple agents to interact, coordinate, compete, or collaborate within a shared environment.

For production AI and automation systems, agents should not be treated as magic wrappers around prompts. A reliable agent design needs clear state, bounded actions, observability, escalation paths, and testable decision logic.

The following example shows a minimal agent with perception, decision, and action steps.

import random

class SimpleAgent:
    def __init__(self, name):
        self.name = name
    
    def perceive(self, environment):
        return environment
    
    def decide(self, perception):
        return random.choice(["move", "stay"])
    
    def act(self, decision):
        print(f"{self.name} decided to {decision}")

# Usage
agent = SimpleAgent("Agent1")
environment = {"obstacle": False, "goal": True}
perception = agent.perceive(environment)
decision = agent.decide(perception)
agent.act(decision)

Setting Up the Environment

The environment defines the state space in which agents operate. It exposes what agents can perceive, which actions are valid, and how the world changes after each action. In production systems, the environment may be a workflow engine, API surface, database, document store, simulator, or external application.

The following example creates a simple grid-based environment.

import numpy as np

class GridEnvironment:
    def __init__(self, width, height):
        self.grid = np.zeros((height, width))
        self.width = width
        self.height = height
    
    def add_obstacle(self, x, y):
        self.grid[y, x] = 1
    
    def is_valid_position(self, x, y):
        return 0 <= x < self.width and 0 <= y < self.height and self.grid[y, x] == 0

    def display(self):
        for row in self.grid:
            print(" ".join(["#" if cell == 1 else "." for cell in row]))

# Create and display a 5x5 grid environment
env = GridEnvironment(5, 5)
env.add_obstacle(2, 2)
env.display()

Implementing a Basic Agent

After the environment is defined, the agent can use perception to inspect available moves, decision logic to select an action, and an action method to update its position. This pattern is simple, but it mirrors the control loop used in more advanced agent systems.

The following example implements a grid-navigation agent.

import random

class GridAgent:
    def __init__(self, x, y, environment):
        self.x = x
        self.y = y
        self.environment = environment
    
    def perceive(self):
        return {
            "up": self.environment.is_valid_position(self.x, self.y - 1),
            "down": self.environment.is_valid_position(self.x, self.y + 1),
            "left": self.environment.is_valid_position(self.x - 1, self.y),
            "right": self.environment.is_valid_position(self.x + 1, self.y)
        }
    
    def decide(self, perception):
        valid_moves = [move for move, is_valid in perception.items() if is_valid]
        return random.choice(valid_moves) if valid_moves else None
    
    def act(self, decision):
        if decision == "up":
            self.y -= 1
        elif decision == "down":
            self.y += 1
        elif decision == "left":
            self.x -= 1
        elif decision == "right":
            self.x += 1
        print(f"Agent moved {decision} to position ({self.x}, {self.y})")

# Usage
env = GridEnvironment(5, 5)
env.add_obstacle(2, 2)
agent = GridAgent(0, 0, env)

for _ in range(5):
    perception = agent.perceive()
    decision = agent.decide(perception)
    agent.act(decision)

Implementing a Goal-Oriented Agent

A goal-oriented agent selects actions based on progress toward a target state. Instead of moving randomly, it evaluates valid actions and chooses the one that moves it closer to the goal.

The following example adds goal-directed movement to the grid agent.

import math

class GoalOrientedAgent(GridAgent):
    def __init__(self, x, y, environment, goal_x, goal_y):
        super().__init__(x, y, environment)
        self.goal_x = goal_x
        self.goal_y = goal_y
    
    def decide(self, perception):
        valid_moves = [move for move, is_valid in perception.items() if is_valid]
        if not valid_moves:
            return None
        
        # Calculate distances to the goal for each valid move
        distances = {
            "up": math.sqrt((self.x - self.goal_x)**2 + (self.y - 1 - self.goal_y)**2),
            "down": math.sqrt((self.x - self.goal_x)**2 + (self.y + 1 - self.goal_y)**2),
            "left": math.sqrt((self.x - 1 - self.goal_x)**2 + (self.y - self.goal_y)**2),
            "right": math.sqrt((self.x + 1 - self.goal_x)**2 + (self.y - self.goal_y)**2)
        }
        
        # Choose the move that brings the agent closest to the goal
        return min((move for move in valid_moves), key=lambda m: distances[m])

# Usage
env = GridEnvironment(5, 5)
env.add_obstacle(2, 2)
agent = GoalOrientedAgent(0, 0, env, 4, 4)

for _ in range(10):
    perception = agent.perceive()
    decision = agent.decide(perception)
    agent.act(decision)
    if agent.x == agent.goal_x and agent.y == agent.goal_y:
        print("Goal reached!")
        break

Introducing Multi-Agent Systems

Multi-agent systems involve multiple agents operating in the same environment. They may share goals, compete for resources, coordinate tasks, or exchange information to improve outcomes.

The following example creates a shared environment with two goal-oriented agents.

class MultiAgentEnvironment(GridEnvironment):
    def __init__(self, width, height):
        super().__init__(width, height)
        self.agents = []
    
    def add_agent(self, agent):
        self.agents.append(agent)
    
    def step(self):
        for agent in self.agents:
            perception = agent.perceive()
            decision = agent.decide(perception)
            agent.act(decision)
    
    def display(self):
        grid_ = self.grid.()
        for i, agent in enumerate(self.agents):
            grid_[agent.y, agent.x] = i + 2
        for row in grid_:
            print(" ".join(["#" if cell == 1 else f"A{int(cell-1)}" if cell > 1 else "." for cell in row]))

# Usage
env = MultiAgentEnvironment(7, 7)
env.add_obstacle(3, 3)
agent1 = GoalOrientedAgent(0, 0, env, 6, 6)
agent2 = GoalOrientedAgent(6, 0, env, 0, 6)
env.add_agent(agent1)
env.add_agent(agent2)

for _ in range(15):
    env.step()
    env.display()
    print("\n")

Implementing Agent Communication

Communication allows agents to share observations, status, constraints, or task results. In enterprise agent systems, communication should be structured, logged, and validated to avoid inconsistent state or uncontrolled behavior.

The following example implements a simple message-passing mechanism.

class CommunicatingAgent(GoalOrientedAgent):
    def __init__(self, x, y, environment, goal_x, goal_y, name):
        super().__init__(x, y, environment, goal_x, goal_y)
        self.name = name
        self.messages = []
    
    def send_message(self, recipient, content):
        recipient.receive_message(self.name, content)
    
    def receive_message(self, sender, content):
        self.messages.append((sender, content))
    
    def process_messages(self):
        for sender, content in self.messages:
            print(f"{self.name} received message from {sender}: {content}")
        self.messages.clear()

# Modify MultiAgentEnvironment to include communication
class CommunicatingEnvironment(MultiAgentEnvironment):
    def step(self):
        for agent in self.agents:
            agent.process_messages()
        super().step()

# Usage
env = CommunicatingEnvironment(7, 7)
agent1 = CommunicatingAgent(0, 0, env, 6, 6, "Agent1")
agent2 = CommunicatingAgent(6, 0, env, 0, 6, "Agent2")
env.add_agent(agent1)
env.add_agent(agent2)

agent1.send_message(agent2, "Hello from Agent1!")
agent2.send_message(agent1, "Greetings, Agent1!")

env.step()

Implementing Cooperative Behavior

Cooperative behavior allows agents to share partial knowledge and improve decisions across the system. This is useful for distributed search, routing, scheduling, monitoring, and workflow coordination.

The following example uses shared obstacle knowledge with A* pathfinding.

import heapq

class CooperativeAgent(CommunicatingAgent):
    def __init__(self, x, y, environment, goal_x, goal_y, name):
        super().__init__(x, y, environment, goal_x, goal_y, name)
        self.known_obstacles = set()
    
    def perceive(self):
        perception = super().perceive()
        for move, is_valid in perception.items():
            if not is_valid:
                if move == "up":
                    self.known_obstacles.add((self.x, self.y - 1))
                elif move == "down":
                    self.known_obstacles.add((self.x, self.y + 1))
                elif move == "left":
                    self.known_obstacles.add((self.x - 1, self.y))
                elif move == "right":
                    self.known_obstacles.add((self.x + 1, self.y))
        return perception
    
    def decide(self, perception):
        path = self.a_star((self.x, self.y), (self.goal_x, self.goal_y))
        if path:
            next_pos = path[1]
            if next_pos[0] > self.x:
                return "right"
            elif next_pos[0] < self.x:
                return "left"
            elif next_pos[1] > self.y:
                return "down"
            else:
                return "up"
        return None
    
    def a_star(self, start, goal):
        def heuristic(a, b):
            return abs(b[0] - a[0]) + abs(b[1] - a[1])
        
        neighbors = [(0,1), (0,-1), (1,0), (-1,0)]
        close_set = set()
        came_from = {}
        gscore = {start:0}
        fscore = {start:heuristic(start, goal)}
        oheap = []
        heapq.heappush(oheap, (fscore[start], start))
        
        while oheap:
            current = heapq.heappop(oheap)[1]
            if current == goal:
                path = []
                while current in came_from:
                    path.append(current)
                    current = came_from[current]
                path.append(start)
                path.reverse()
                return path
            
            close_set.add(current)
            for i, j in neighbors:
                neighbor = current[0] + i, current[1] + j
                tentative_g_score = gscore[current] + 1
                if 0 <= neighbor[0] < self.environment.width and 0 <= neighbor[1] < self.environment.height:
                    if neighbor in self.known_obstacles:
                        continue
                    if neighbor in close_set and tentative_g_score >= gscore.get(neighbor, 0):
                        continue
                    if tentative_g_score < gscore.get(neighbor, 0) or neighbor not in [i[1]for i in oheap]:
                        came_from[neighbor] = current
                        gscore[neighbor] = tentative_g_score
                        fscore[neighbor] = gscore[neighbor] + heuristic(neighbor, goal)
                        heapq.heappush(oheap, (fscore[neighbor], neighbor))
        return None

    def process_messages(self):
        for sender, content in self.messages:
            if isinstance(content, set):
                self.known_obstacles.update(content)
            print(f"{self.name} received obstacles from {sender}")
        self.messages.clear()
    
    def act(self, decision):
        super().act(decision)
        for other_agent in self.environment.agents:
            if other_agent != self:
                self.send_message(other_agent, self.known_obstacles)

# Usage
env = CommunicatingEnvironment(10, 10)
env.add_obstacle(5, 5)
env.add_obstacle(5, 6)
env.add_obstacle(6, 5)
agent1 = CooperativeAgent(0, 0, env, 9, 9, "Agent1")
agent2 = CooperativeAgent(9, 0, env, 0, 9, "Agent2")
env.add_agent(agent1)
env.add_agent(agent2)

for _ in range(20):
    env.step()
    env.display()
    print("\n")

Implementing a Belief-Desire-Intention Agent

The Belief-Desire-Intention architecture separates what the agent believes about the world, what it wants to achieve, and what it intends to execute. This separation is useful when agent behavior needs to be explainable and structured.

The following example implements a simple BDI loop.

class BDIAgent:
    def __init__(self, name, environment):
        self.name = name
        self.environment = environment
        self.beliefs = set()
        self.desires = set()
        self.intentions = []
    
    def update_beliefs(self, perception):
        # Update beliefs based on current perception
        self.beliefs = set(perception.items())
    
    def generate_options(self):
        # Generate possible desires based on current beliefs
        self.desires = set()
        for belief, value in self.beliefs:
            if value:
                self.desires.add(belief)
    
    def filter_intentions(self):
        # Choose intentions from desires
        self.intentions = list(self.desires)[:2]  # Limit to top 2 intentions
    
    def execute(self):
        # Execute the current intentions
        for intention in self.intentions:
            print(f"{self.name} is executing intention: {intention}")
    
    def bdi_loop(self):
        perception = self.environment.get_perception()
        self.update_beliefs(perception)
        self.generate_options()
        self.filter_intentions()
        self.execute()

class SimpleEnvironment:
    def __init__(self):
        self.state = {"move": True, "eat": False, "sleep": True}
    
    def get_perception(self):
        return self.state

# Usage
env = SimpleEnvironment()
agent = BDIAgent("BDIAgent", env)

for _ in range(3):
    agent.bdi_loop()
    # Change environment state
    env.state["eat"] = not env.state["eat"]
    print("\n")

Implementing a Learning Agent

A learning agent improves its behavior based on rewards and observed outcomes. Q-learning is a basic reinforcement learning method that estimates the value of actions in different states.

The following example implements a simple Q-learning agent.

import random

class LearningAgent:
    def __init__(self, states, actions, learning_rate=0.1, discount_factor=0.9, epsilon=0.1):
        self.q_table = {state: {action: 0 for action in actions} for state in states}
        self.learning_rate = learning_rate
        self.discount_factor = discount_factor
        self.epsilon = epsilon
        self.actions = actions
    
    def choose_action(self, state):
        if random.uniform(0, 1) < self.epsilon:
            return random.choice(self.actions)
        else:
            return max(self.q_table[state], key=self.q_table[state].get)
    
    def learn(self, state, action, reward, next_state):
        current_q = self.q_table[state][action]
        max_next_q = max(self.q_table[next_state].values())
        new_q = (1 - self.learning_rate) * current_q + self.learning_rate * (reward + self.discount_factor * max_next_q)
        self.q_table[state][action] = new_q

# Example usage
states = ['A', 'B', 'C']
actions = ['left', 'right']
agent = LearningAgent(states, actions)

# Simulate learning
for _ in range(100):
    state = random.choice(states)
    action = agent.choose_action(state)
    next_state = random.choice(states)
    reward = 1 if next_state == 'C' else 0
    agent.learn(state, action, reward, next_state)

print("Q-table after learning:")
for state in states:
    print(f"{state}: {agent.q_table[state]}")

Implementing a Rule-Based Agent

Rule-based agents make decisions using predefined rules. They are useful when the decision policy is deterministic, auditable, and easier to encode as explicit logic than learned behavior.

The following example implements a traffic-light control agent.

class TrafficLightAgent:
    def __init__(self):
        self.state = "red"
        self.timer = 0
    
    def update(self):
        self.timer += 1
        if self.state == "red" and self.timer >= 30:
            self.state = "green"
            self.timer = 0
        elif self.state == "green" and self.timer >= 20:
            self.state = "yellow"
            self.timer = 0
        elif self.state == "yellow" and self.timer >= 5:
            self.state = "red"
            self.timer = 0
    
    def get_state(self):
        return self.state

# Simulation
agent = TrafficLightAgent()
for _ in range(100):
    agent.update()
    print(f"Current state: {agent.get_state()}, Timer: {agent.timer}")

Implementing a Reactive Agent

Reactive agents respond directly to current perception without maintaining long-term internal state. They are simple and fast, but limited when tasks require planning, memory, or long-horizon reasoning.

The following example implements a basic reactive vacuum agent.

import random

class ReactiveVacuumAgent:
    def sense(self, environment):
        return environment.is_dirty()
    
    def act(self, perception):
        if perception:
            return "clean"
        else:
            return random.choice(["move_left", "move_right"])

class Environment:
    def __init__(self):
        self.locations = [True, False]  # True means dirty
    
    def is_dirty(self):
        return self.locations[0]  # Check only the current location
    
    def clean(self):
        self.locations[0] = False
    
    def move_dirt(self):
        self.locations[0] = random.choice([True, False])

# Simulation
env = Environment()
agent = ReactiveVacuumAgent()

for _ in range(10):
    perception = agent.sense(env)
    action = agent.act(perception)
    print(f"Perception: {perception}, Action: {action}")
    
    if action == "clean":
        env.clean()
    env.move_dirt()  # Simulate changing environment

Implementing a Goal-Based Agent

Goal-based agents select actions that move the system toward a target condition. Pathfinding is a useful example because the agent must evaluate possible future states rather than only react to the current one.

The following example uses A* search to find a path through a maze.

import heapq

class MazeAgent:
    def __init__(self, maze, start, goal):
        self.maze = maze
        self.start = start
        self.goal = goal
    
    def heuristic(self, a, b):
        return abs(b[0] - a[0]) + abs(b[1] - a[1])
    
    def get_neighbors(self, pos):
        neighbors = [(0, 1), (0, -1), (1, 0), (-1, 0)]
        return [(pos[0] + dx, pos[1] + dy) for dx, dy in neighbors
                if 0 <= pos[0] + dx < len(self.maze) and
                   0 <= pos[1] + dy < len(self.maze[0]) and
                   self.maze[pos[0] + dx][pos[1] + dy] != '#']
    
    def find_path(self):
        queue = [(0, self.start)]
        came_from = {}
        cost_so_far = {self.start: 0}
        
        while queue:
            _, current = heapq.heappop(queue)
            
            if current == self.goal:
                path = []
                while current in came_from:
                    path.append(current)
                    current = came_from[current]
                path.append(self.start)
                return path[::-1]
            
            for next in self.get_neighbors(current):
                new_cost = cost_so_far[current] + 1
                if next not in cost_so_far or new_cost < cost_so_far[next]:
                    cost_so_far[next] = new_cost
                    priority = new_cost + self.heuristic(self.goal, next)
                    heapq.heappush(queue, (priority, next))
                    came_from[next] = current
        
        return None  # No path found

# Example usage
maze = [
    "S...#",
    ".##..",
    "...##",
    ".#..G"
]
start = (0, 0)
goal = (3, 4)

agent = MazeAgent(maze, start, goal)
path = agent.find_path()
print("Path found:", path)

Use Case: Smart Home Automation

Smart home automation is a practical multi-agent example because separate agents can manage temperature, lighting, motion, and energy usage while operating within a shared environment.

The following example coordinates temperature and lighting agents.

import random

class TemperatureAgent:
    def __init__(self, name, ideal_temp):
        self.name = name
        self.ideal_temp = ideal_temp
    
    def sense(self, current_temp):
        return current_temp
    
    def act(self, sensed_temp):
        if sensed_temp < self.ideal_temp:
            return "increase"
        elif sensed_temp > self.ideal_temp:
            return "decrease"
        else:
            return "maintain"

class LightingAgent:
    def __init__(self, name):
        self.name = name
    
    def sense(self, is_daytime, motion_detected):
        return (is_daytime, motion_detected)
    
    def act(self, sensed_data):
        is_daytime, motion_detected = sensed_data
        if not is_daytime and motion_detected:
            return "turn_on"
        elif is_daytime or (not is_daytime and not motion_detected):
            return "turn_off"
        else:
            return "no_action"

class SmartHome:
    def __init__(self):
        self.temperature = 22
        self.is_daytime = True
        self.motion_detected = False
    
    def update_environment(self):
        self.temperature += random.uniform(-0.5, 0.5)
        self.is_daytime = random.choice([True, False])
        self.motion_detected = random.choice([True, False])
    
    def adjust_temperature(self, action):
        if action == "increase":
            self.temperature += 0.5
        elif action == "decrease":
            self.temperature -= 0.5
    
    def adjust_lighting(self, action):
        print(f"Lighting: {action}")

# Simulation
home = SmartHome()
temp_agent = TemperatureAgent("TempAgent", 23)
light_agent = LightingAgent("LightAgent")

for _ in range(10):
    home.update_environment()
    
    temp_action = temp_agent.act(temp_agent.sense(home.temperature))
    home.adjust_temperature(temp_action)
    
    light_action = light_agent.act(light_agent.sense(home.is_daytime, home.motion_detected))
    home.adjust_lighting(light_action)
    
    print(f"Temperature: {home.temperature:.1f}°C, Daytime: {home.is_daytime}, Motion: {home.motion_detected}")
    print(f"Temperature Action: {temp_action}")
    print("---")

Use Case: Traffic Management System

Traffic management is a classic multi-agent problem because multiple controllers must coordinate based on dynamic conditions, safety constraints, and throughput goals.

The following example simulates traffic-light agents at an intersection.

import random

class TrafficLightAgent:
    def __init__(self, name):
        self.name = name
        self.state = "red"
        self.timer = 0
    
    def update(self, traffic_density):
        self.timer += 1
        if self.state == "red" and self.timer >= 30:
            self.state = "green"
            self.timer = 0
        elif self.state == "green":
            if traffic_density > 0.7 and self.timer >= 45:
                self.state = "yellow"
                self.timer = 0
            elif traffic_density <= 0.7 and self.timer >= 30:
                self.state = "yellow"
                self.timer = 0
        elif self.state == "yellow" and self.timer >= 5:
            self.state = "red"
            self.timer = 0
    
    def get_state(self):
        return self.state

class Intersection:
    def __init__(self):
        self.north_south = TrafficLightAgent("North-South")
        self.east_west = TrafficLightAgent("East-West")
        self.traffic_density = {"north_south": 0.5, "east_west": 0.5}
    
    def update_traffic_density(self):
        self.traffic_density["north_south"] = random.uniform(0, 1)
        self.traffic_density["east_west"] = random.uniform(0, 1)
    
    def update(self):
        self.update_traffic_density()
        self.north_south.update(self.traffic_density["north_south"])
        self.east_west.update(self.traffic_density["east_west"])
    
    def display_state(self):
        print(f"North-South: {self.north_south.get_state()}, Traffic Density: {self.traffic_density['north_south']:.2f}")
        print(f"East-West: {self.east_west.get_state()}, Traffic Density: {self.traffic_density['east_west']:.2f}")
        print("---")

# Simulation
intersection = Intersection()

for _ in range(20):
    intersection.update()
    intersection.display_state()

Additional Resources

For those interested in diving deeper into the world of agents and multi-agent systems, here are some valuable resources:

  1. “An Introduction to MultiAgent Systems” by Michael Wooldridge ArXiv: https://arxiv.org/abs/1909.12201
  2. “Artificial Intelligence: A Modern Approach” by Stuart Russell and Peter Norvig (Contains chapters on agents and multi-agent systems)
  3. “Multiagent Systems: Algorithmic, Game-Theoretic, and Logical Foundations” by Yoav Shoham and Kevin Leyton-Brown ArXiv: https://arxiv.org/abs/0812.2041
  4. “The Foundation of Multi-Agent Learning: An Introduction” by Peter Stone ArXiv: https://arxiv.org/abs/2103.02373

These resources provide deeper background on agent architectures, multi-agent coordination, game-theoretic reasoning, and multi-agent learning. For high-visibility content, verify that each citation and link is current before publishing.

Closing Thoughts

Agents and multi-agent systems are useful when a problem requires perception, decision-making, action, coordination, or learning inside a changing environment. The same concepts apply across simulations, workflow automation, robotics, traffic systems, smart homes, and agentic LLM applications.

The production lesson is straightforward: start with simple rule-based or goal-oriented agents when behavior must be predictable, introduce learning only when reward signals are reliable, and use multi-agent coordination only when the problem genuinely requires distributed decision-making. More agents do not automatically create a better architecture; they create more coordination, testing, and governance requirements.

Enterprise AI Architecture

Want more enterprise AI architecture breakdowns?

Subscribe to SuperML.

Comments

Sign in to leave a comment

Back to Blog

Related Posts

View All Posts »