titularjahaziel@gmail.com

Next-Gen Intelligent Traffic Signal Control System

1. Problem Statement

Congestion in urban areas is a growing issue, leading to wasted time, increased fuel consumption, and environmental pollution. Traditional traffic signal systems rely on fixed schedules that don't adapt to real-time traffic conditions. The goal of this project is to build an Intelligent Traffic Signal Control System using Deep Reinforcement Learning (DRL) to dynamically optimize traffic signals in real time, reducing delays, congestion, and emissions.


2. Overview of the Task and Its Significance

  • Task Description:
    Design and train a DRL agent to control traffic lights at intersections by optimizing vehicle flow based on real-time traffic data. The agent learns strategies to minimize the average waiting time, queue length, and fuel consumption of vehicles.

  • Purpose:
    A DRL-based traffic control system can replace static traffic signal timings with adaptive decision-making. This is particularly important in urban areas where traffic patterns change dynamically due to varying population density, peak hours, accidents, or weather.

  • Significance:
    The system can be deployed in smart cities to save millions of hours in commuting, improve fuel efficiency, and reduce carbon emissions. The project demonstrates mastery of Deep Reinforcement Learning applied to real-world systems with high impact.


3. Data and Preprocessing Requirements

  • Type of Data:
    Synthetic or real-world traffic data consisting of:

    • Number of vehicles arriving at each intersection over time.
    • Vehicle speeds, waiting times, and queue lengths.
    • Traffic light states (red, yellow, green).
  • Datasets and Sources:

    • Synthetic Traffic Data: Use traffic simulators like SUMO (Simulation of Urban Mobility) to generate intersection-level traffic flow.
    • Real-world Traffic Data: Datasets like PeMS (Caltrans Performance Measurement System) for highway traffic or Open Traffic for urban roads.
  • Preprocessing Steps:

    1. Generate a virtual city with intersections in SUMO (or import real-world city maps via OpenStreetMap).
    2. Configure simulation parameters such as vehicle density, traffic signal states, and road network layout.
    3. Extract features like average waiting time, queue length, and fuel consumption per vehicle for training the DRL agent.

4. Model Architecture and Optimization

  • Reinforcement Learning Approach:
    Use Deep Q-Learning (DQN) or Proximal Policy Optimization (PPO), two widely used DRL algorithms.

  • State Space:
    The current state of the intersection, including:

    • Traffic density: Number of vehicles on each road segment.
    • Traffic light state: Current phase (red, green, or yellow) for each direction.
    • Waiting time: Average waiting time of vehicles.
    • Queue length: Length of vehicle queues on each road.
  • Action Space:

    • Change the current phase of traffic lights. Example actions:
      • Extend the current green light.
      • Switch to the next phase.
      • Add a yellow light phase.
  • Reward Function:
    Design a reward function to encourage optimal traffic flow. Examples:

    • Negative reward for long waiting times or large queues.
    • Positive reward for reducing the average queue length or vehicle delay.
    • Penalty for unnecessary traffic light switches (to reduce confusion).
  • Neural Network Architecture (for DQN or PPO):

    • Input Layer: Accepts the state space (e.g., traffic density, light phases).
    • Hidden Layers: Two or three fully connected layers with ReLU activations.
    • Output Layer: Outputs Q-values (for DQN) or policy probabilities (for PPO) for each possible action.

Example for DQN:

Input -> Dense(128) -> ReLU -> Dense(128) -> ReLU -> Output(Q-values)
  • Optimization Techniques:
    • Replay Buffer: Use experience replay for stable learning.
    • Target Network: Use a separate target network for DQN updates to reduce instability.
    • Exploration Strategy: Use epsilon-greedy exploration for DQN or entropy regularization for PPO.

5. Evaluation and Validation Techniques

  • Metrics:
    Evaluate the performance of the trained DRL agent using:

    • Average Waiting Time: Average time vehicles spend at the intersection.
    • Queue Length: Total length of vehicle queues.
    • Fuel Consumption: Estimated fuel wasted due to idling vehicles.
    • Throughput: Total number of vehicles that pass through the intersection in a given time.
  • Validation Strategy:

    • Split simulation scenarios into training, validation, and test sets based on varying traffic densities (e.g., light traffic, rush hour).
    • Use cross-validation to test the model on unseen traffic patterns.

6. Suggested Implementation Roadmap

Week 1-2: Environment Setup

  • Install and configure the SUMO traffic simulator.
  • Design road intersections with varying levels of complexity (e.g., single 4-way intersection, multi-intersection grids).
  • Simulate and extract traffic data, including waiting times and vehicle counts.

Week 3-4: DRL Environment and Baseline Model

  • Build a custom OpenAI Gym environment to simulate the intersection.
  • Define state space, action space, and reward function for DRL.
  • Implement a baseline model with rule-based traffic control for comparison.

Week 5-6: Train the DRL Agent

  • Train the DRL agent (e.g., DQN or PPO) using the Gym environment.
  • Tune hyperparameters such as learning rate, discount factor (γ), and batch size.
  • Monitor training using evaluation metrics like average waiting time.

Week 7-8: Testing and Evaluation

  • Test the trained model on unseen traffic patterns (e.g., peak hours).
  • Compare the DRL agent's performance to the baseline model.
  • Optimize the reward function for better results.

Week 9-10: Deployment and Visualization

  • Deploy the trained agent to SUMO for live simulations.
  • Visualize traffic flows, waiting times, and fuel consumption using libraries like Matplotlib or Seaborn.
  • Create a report or presentation summarizing the results.

Key Upgrades and Next-Level Features

Multi-Agent Distributed Control:

  • Hierarchical PPO (HPPO) handles both high-level policy (citywide optimization) and low-level decisions (per-intersection phase control).
  • Intersections communicate with each other to coordinate adaptive traffic flow.

IoT & Real-Time Traffic Data Integration:

  • Uses live traffic sensor feeds from camera-based vehicle detection and GPS data.
  • Kafka & MQTT used for real-time vehicle streaming to enhance model adaptability.

Cloud-Based Deployment & Edge Processing:

  • Model runs on Google Cloud / AWS Greengrass for edge-AI inference at each traffic intersection.
  • Uses 5G-connected traffic lights for near-instantaneous phase switching.

Hybrid Learning with Graph Neural Networks (GNNs):

  • Graph Attention Networks (GATs) model multi-intersection dependencies for efficient learning.
  • GNNs + PPO create superhuman-level intersection control policies.

Smart City Integration (Deployment-Ready):

  • Uses FastAPI for real-time inference API to smart city dashboards.
  • Streamlit-based analytics dashboard visualizes live intersection metrics

"""
📌 AI-Driven Smart City Traffic Control System 🚦  
Hierarchical Multi-Agent PPO (HPPO) + Graph Neural Networks (GNNs) + IoT Integration  
Author: Jahaziel Titular
"""

# 🚀 Step 1: Import Libraries  
import os  
import torch  
import torch.nn as nn  
import torch.multiprocessing as mp  
import traci  # SUMO Traffic Simulation  
import numpy as np  
import gym  
from gym import spaces  
import networkx as nx  # For GNN-based city-wide coordination  
from torch_geometric.nn import GATConv  # Graph Attention Network  
from collections import deque  
from torch.distributions import Categorical  
from kafka import KafkaConsumer  # Real-time vehicle data from IoT  
import requests  # Cloud API for traffic monitoring  
import fastapi  
import streamlit as st  # Smart City Dashboard  

# 🚦 Step 2: Define Smart City Traffic Environment  
class SmartCityTrafficEnv(gym.Env):  
    """ Multi-agent SUMO environment with IoT-enhanced real-time traffic sensing. """  
    def __init__(self, sumo_cfg_path, intersections=["A1", "B2", "C3", "D4"], max_steps=5000, use_gui=False):  
        super(SmartCityTrafficEnv, self).__init__()  
        self.sumo_cfg_path, self.max_steps, self.use_gui = sumo_cfg_path, max_steps, use_gui  
        self.intersections = intersections  
        self.num_intersections = len(intersections)  
        self.observation_space = spaces.Box(low=0, high=100, shape=(self.num_intersections * 12,), dtype=np.float32)  
        self.action_space = spaces.MultiDiscrete([4] * self.num_intersections)  
        self.sumo_binary = checkBinary("sumo-gui" if self.use_gui else "sumo")  
        self.sumo_cmd = [self.sumo_binary, "-c", self.sumo_cfg_path, "--no-step-log", "true", "--start"]  

    def reset(self):  
        """ Reset SUMO simulation """  
        traci.start(self.sumo_cmd)  
        return self._get_state()  

    def step(self, actions):  
        """ Apply actions, run SUMO, and collect rewards. """  
        for i, action in enumerate(actions):  
            self._set_traffic_light_phase(self.intersections[i], action)  
        traci.simulationStep()  
        return self._get_state(), self._calculate_reward(), self.step_count >= self.max_steps, {}  

    def _get_state(self):  
        """ Extract real-time IoT-enhanced traffic state. """  
        state = []  
        for intersection in self.intersections:  
            lanes = traci.trafficlight.getControlledLanes(intersection)  
            traffic_density = [traci.lane.getLastStepVehicleNumber(lane) for lane in lanes]  
            queue_length = [traci.lane.getLastStepHaltingNumber(lane) for lane in lanes]  
            traffic_light_phase = [traci.trafficlight.getPhase(intersection)]  
            iot_data = self._get_iot_traffic_data(intersection)  # IoT sensor data  
            state.extend(traffic_density + queue_length + traffic_light_phase + iot_data)  
        return np.array(state, dtype=np.float32)  

    def _calculate_reward(self):  
        """ Reward: - Queue Length, - Waiting Time, + Throughput, + IoT-based anomaly detection """  
        reward = 0  
        for intersection in self.intersections:  
            lanes = traci.trafficlight.getControlledLanes(intersection)  
            queue_lengths = [traci.lane.getLastStepHaltingNumber(lane) for lane in lanes]  
            throughput = traci.simulation.getDepartedNumber()  
            anomaly_penalty = self._detect_anomalies(intersection)  # IoT-based reward penalty  
            reward += -0.6 * sum(queue_lengths) + 0.3 * throughput - anomaly_penalty  
        return reward  

    def _set_traffic_light_phase(self, intersection, phase):  
        """ Set traffic light phase in SUMO. """  
        traci.trafficlight.setPhase(intersection, phase)  

    def _get_iot_traffic_data(self, intersection):  
        """ Fetch real-time traffic data from Kafka/MQTT. """  
        consumer = KafkaConsumer(f"traffic_{intersection}", bootstrap_servers="localhost:9092", auto_offset_reset="latest")  
        return [int(msg.value.decode()) for msg in consumer][-3:]  

    def _detect_anomalies(self, intersection):  
        """ Use IoT sensor anomalies to adjust rewards (e.g., accidents, congestion). """  
        response = requests.get(f"http://iot-traffic-api.com/{intersection}")  
        data = response.json()  
        return 10 if data["accident_detected"] else 0  

    def close(self):  
        """ Close SUMO simulation. """  
        traci.close()  

# 🔥 Step 3: Define GNN-Based Multi-Agent PPO 🚀  
class GNN_PPO(nn.Module):  
    """ Hierarchical Graph Neural Network + PPO for multi-agent control. """  
    def __init__(self, state_size, action_size):  
        super(GNN_PPO, self).__init__()  
        self.gnn = GATConv(state_size, 256)  # Graph Attention Network  
        self.actor = nn.Sequential(nn.Linear(256, 128), nn.ReLU(), nn.Linear(128, action_size), nn.Softmax(dim=-1))  
        self.critic = nn.Sequential(nn.Linear(256, 128), nn.ReLU(), nn.Linear(128, 1))  
        self.optimizer = torch.optim.Adam(self.parameters(), lr=0.0003)  

    def forward(self, state, edge_index):  
        state = self.gnn(state, edge_index)  
        return self.actor(state), self.critic(state)  

# 🚀 Step 4: Smart City Control System  
if __name__ == "__main__":  
    env = SmartCityTrafficEnv(sumo_cfg_path="./city_config.sumocfg")  
    agent = GNN_PPO(state_size=env.observation_space.shape[0], action_size=env.action_space.n)  

    for episode in range(5000):  
        state = env.reset()  
        total_reward = 0  
        for step in range(env.max_steps):  
            edge_index = torch.tensor([[0, 1], [1, 2], [2, 3]], dtype=torch.long)  
            action, _ = agent(state, edge_index)  
            next_state, reward, done, _ = env.step(action)  
            agent.optimizer.zero_grad()  
            total_reward += reward  
            if done: break  
        print(f"Episode {episode}, Reward: {total_reward}")  

    env.close()