Congestion in urban areas is a growing issue, leading to wasted time, increased fuel consumption, and environmental pollution. Traditional traffic signal systems rely on fixed schedules that don't adapt to real-time traffic conditions. The goal of this project is to build an Intelligent Traffic Signal Control System using Deep Reinforcement Learning (DRL) to dynamically optimize traffic signals in real time, reducing delays, congestion, and emissions.
Task Description:
Design and train a DRL agent to control traffic lights at intersections by optimizing vehicle flow based on real-time traffic data. The agent learns strategies to minimize the average waiting time, queue length, and fuel consumption of vehicles.
Purpose:
A DRL-based traffic control system can replace static traffic signal timings with adaptive decision-making. This is particularly important in urban areas where traffic patterns change dynamically due to varying population density, peak hours, accidents, or weather.
Significance:
The system can be deployed in smart cities to save millions of hours in commuting, improve fuel efficiency, and reduce carbon emissions. The project demonstrates mastery of Deep Reinforcement Learning applied to real-world systems with high impact.
Type of Data:
Synthetic or real-world traffic data consisting of:
Datasets and Sources:
Preprocessing Steps:
Reinforcement Learning Approach:
Use Deep Q-Learning (DQN) or Proximal Policy Optimization (PPO), two widely used DRL algorithms.
State Space:
The current state of the intersection, including:
Action Space:
Reward Function:
Design a reward function to encourage optimal traffic flow. Examples:
Neural Network Architecture (for DQN or PPO):
Example for DQN:
Metrics:
Evaluate the performance of the trained DRL agent using:
Validation Strategy:
Week 1-2: Environment Setup
Week 3-4: DRL Environment and Baseline Model
Week 5-6: Train the DRL Agent
Week 7-8: Testing and Evaluation
Week 9-10: Deployment and Visualization
Multi-Agent Distributed Control:
IoT & Real-Time Traffic Data Integration:
Cloud-Based Deployment & Edge Processing:
Hybrid Learning with Graph Neural Networks (GNNs):
Smart City Integration (Deployment-Ready):
"""
📌 AI-Driven Smart City Traffic Control System 🚦
Hierarchical Multi-Agent PPO (HPPO) + Graph Neural Networks (GNNs) + IoT Integration
Author: Jahaziel Titular
"""
# 🚀 Step 1: Import Libraries
import os
import torch
import torch.nn as nn
import torch.multiprocessing as mp
import traci # SUMO Traffic Simulation
import numpy as np
import gym
from gym import spaces
import networkx as nx # For GNN-based city-wide coordination
from torch_geometric.nn import GATConv # Graph Attention Network
from collections import deque
from torch.distributions import Categorical
from kafka import KafkaConsumer # Real-time vehicle data from IoT
import requests # Cloud API for traffic monitoring
import fastapi
import streamlit as st # Smart City Dashboard
# 🚦 Step 2: Define Smart City Traffic Environment
class SmartCityTrafficEnv(gym.Env):
""" Multi-agent SUMO environment with IoT-enhanced real-time traffic sensing. """
def __init__(self, sumo_cfg_path, intersections=["A1", "B2", "C3", "D4"], max_steps=5000, use_gui=False):
super(SmartCityTrafficEnv, self).__init__()
self.sumo_cfg_path, self.max_steps, self.use_gui = sumo_cfg_path, max_steps, use_gui
self.intersections = intersections
self.num_intersections = len(intersections)
self.observation_space = spaces.Box(low=0, high=100, shape=(self.num_intersections * 12,), dtype=np.float32)
self.action_space = spaces.MultiDiscrete([4] * self.num_intersections)
self.sumo_binary = checkBinary("sumo-gui" if self.use_gui else "sumo")
self.sumo_cmd = [self.sumo_binary, "-c", self.sumo_cfg_path, "--no-step-log", "true", "--start"]
def reset(self):
""" Reset SUMO simulation """
traci.start(self.sumo_cmd)
return self._get_state()
def step(self, actions):
""" Apply actions, run SUMO, and collect rewards. """
for i, action in enumerate(actions):
self._set_traffic_light_phase(self.intersections[i], action)
traci.simulationStep()
return self._get_state(), self._calculate_reward(), self.step_count >= self.max_steps, {}
def _get_state(self):
""" Extract real-time IoT-enhanced traffic state. """
state = []
for intersection in self.intersections:
lanes = traci.trafficlight.getControlledLanes(intersection)
traffic_density = [traci.lane.getLastStepVehicleNumber(lane) for lane in lanes]
queue_length = [traci.lane.getLastStepHaltingNumber(lane) for lane in lanes]
traffic_light_phase = [traci.trafficlight.getPhase(intersection)]
iot_data = self._get_iot_traffic_data(intersection) # IoT sensor data
state.extend(traffic_density + queue_length + traffic_light_phase + iot_data)
return np.array(state, dtype=np.float32)
def _calculate_reward(self):
""" Reward: - Queue Length, - Waiting Time, + Throughput, + IoT-based anomaly detection """
reward = 0
for intersection in self.intersections:
lanes = traci.trafficlight.getControlledLanes(intersection)
queue_lengths = [traci.lane.getLastStepHaltingNumber(lane) for lane in lanes]
throughput = traci.simulation.getDepartedNumber()
anomaly_penalty = self._detect_anomalies(intersection) # IoT-based reward penalty
reward += -0.6 * sum(queue_lengths) + 0.3 * throughput - anomaly_penalty
return reward
def _set_traffic_light_phase(self, intersection, phase):
""" Set traffic light phase in SUMO. """
traci.trafficlight.setPhase(intersection, phase)
def _get_iot_traffic_data(self, intersection):
""" Fetch real-time traffic data from Kafka/MQTT. """
consumer = KafkaConsumer(f"traffic_{intersection}", bootstrap_servers="localhost:9092", auto_offset_reset="latest")
return [int(msg.value.decode()) for msg in consumer][-3:]
def _detect_anomalies(self, intersection):
""" Use IoT sensor anomalies to adjust rewards (e.g., accidents, congestion). """
response = requests.get(f"http://iot-traffic-api.com/{intersection}")
data = response.json()
return 10 if data["accident_detected"] else 0
def close(self):
""" Close SUMO simulation. """
traci.close()
# 🔥 Step 3: Define GNN-Based Multi-Agent PPO 🚀
class GNN_PPO(nn.Module):
""" Hierarchical Graph Neural Network + PPO for multi-agent control. """
def __init__(self, state_size, action_size):
super(GNN_PPO, self).__init__()
self.gnn = GATConv(state_size, 256) # Graph Attention Network
self.actor = nn.Sequential(nn.Linear(256, 128), nn.ReLU(), nn.Linear(128, action_size), nn.Softmax(dim=-1))
self.critic = nn.Sequential(nn.Linear(256, 128), nn.ReLU(), nn.Linear(128, 1))
self.optimizer = torch.optim.Adam(self.parameters(), lr=0.0003)
def forward(self, state, edge_index):
state = self.gnn(state, edge_index)
return self.actor(state), self.critic(state)
# 🚀 Step 4: Smart City Control System
if __name__ == "__main__":
env = SmartCityTrafficEnv(sumo_cfg_path="./city_config.sumocfg")
agent = GNN_PPO(state_size=env.observation_space.shape[0], action_size=env.action_space.n)
for episode in range(5000):
state = env.reset()
total_reward = 0
for step in range(env.max_steps):
edge_index = torch.tensor([[0, 1], [1, 2], [2, 3]], dtype=torch.long)
action, _ = agent(state, edge_index)
next_state, reward, done, _ = env.step(action)
agent.optimizer.zero_grad()
total_reward += reward
if done: break
print(f"Episode {episode}, Reward: {total_reward}")
env.close()