"""
This module details the implementation of a ``Simulator`` object in charge of handling the connection between the traffic simulator and this interface. The connection with the traffic simulator is handled by an object called ``Connector`` which establishes a messaging protocol with the traffic simulator.
Example:
To use the ``Simulator`` declare in a string the ``path`` to the simulator ::
>>> from symupy.api import Simulator
>>> path_symuvia = "path/to/libSymuyVia.dylib"
>>> simulator = Simulator(library_path=path_symuvia)
Other parameters can also be send to the simulator in order to provide other configurations:
Example:
To send make increase the *buffer size* to a specific size:
>>> simulator = Simulator(bufferSize = 1000000)
To increase change the flag that traces the flow:
>>> simulator = Simulator(trace_flow = True)
"""
# ============================================================================
# STANDARD IMPORTS
# ============================================================================
import os
from itertools import repeat
from ctypes import (
cdll,
c_int,
byref,
c_double,
c_char_p,
)
import click
import platform
import typing
from typing import Union
# ============================================================================
# INTERNAL IMPORTS
# ============================================================================
# Error Handling
from symupy.utils.exceptions import (
SymupyLoadLibraryError,
SymupyFileLoadError,
SymupyVehicleCreationError,
SymupyDriveVehicleError,
SymupyWarning,
)
#
from symupy.runtime.api.scenario import Simulation
from symupy.utils.parser import SimulatorRequest
from symupy.utils.configurator import Configurator
from symupy.runtime.logic import RuntimeDevice
from symupy.tsc.vehicles import Vehicle, VehicleList
from symupy.utils.tools import timer_func, printer_time
import symupy.utils.constants as CT
# ============================================================================
# CLASS AND DEFINITIONS
# ============================================================================
# V2X Connectivity
# NetworkType = Union[V2INetwork, V2VNetwork]
TupleFloat = Union[float, tuple]
[docs]class Simulator(Configurator, RuntimeDevice):
"""
Simulator class for containing object to connect and command a simulation in SymuFlow
Example:
Call of the default simulator ::
>>> from symupy.api import Simulator
>>> simulator = Simulator()
:return: Symuvia simulator object with simulation parameters
:rtype: Simulator
You may also pass suplementary parameters to the object by specifying keys in the call:
Example:
To use the ``Simulator`` declare in a string the ``path`` to the simulator ::
>>> from symupy.api import Simulator
>>> path_symuvia = "path/to/libSymuyVia.dylib"
>>> simulator = Simulator(library_path=path_symuvia)
This object describes is a configurator manager for the interface between the traffic simulator and the python interface. For more details on the optinal keyword parameters please refer to :py:class:`~symupy.utils.configurator.Configurator` class.
:raises SymupyLoadLibraryError:
Error raised whenever the SymuFlow library is not found
:raises SymupyFileLoadError:
Error raised whenever the provided path for an scenario cannot be loaded into the Simulator
:raises SymupyVehicleCreationError:
Error raised when a vehicle cannot be created
:raises SymupyDriveVehicleError:
Error rased when a vehicle state cannot be imposed
:raises NotImplementedError:
Not implemented functionality
:return: Simulator manager object
:rtype: Simulator
"""
def __init__(self, **kwargs) -> None:
Configurator.__init__(self, **kwargs)
RuntimeDevice.__init__(self)
self._net = []
def __repr__(self):
return f"{self.__class__.__name__}({self.library_path})"
# =========================================================================
# LOADING METHODS
# =========================================================================
[docs] def load_symuvia(self):
"""Load SymuFlow shared library"""
try:
lib_symuvia = cdll.LoadLibrary(self.library_path)
except OSError:
raise SymupyLoadLibraryError("Library not found", self.library_path)
self.__library = lib_symuvia
[docs] def load_network(self) -> int:
"""Load SymuFlow Simulation File"""
if not hasattr(self, "_sim"):
raise SymupyFileLoadError("File not provided", "")
valid = self.__library.SymLoadNetworkEx(self.scenarioFilename("UTF8"))
if not valid:
raise SymupyFileLoadError("Simulation could not be loaded", "")
return valid
[docs] def register_simulation(self, scenario_path: str):
"""Register simulation file within the simulator"""
self._sim = Simulation(scenario_path)
# def register_network(self, network: NetworkType):
# # TODO: Impleement this connection. This is for V2V
# self._net.append(network)
# =========================================================================
# RUNTIME METHODS
# =========================================================================
[docs] @timer_func
def run_simulation(self, scenario_path: str = ""):
"""Run simulation in a single shot
Args:
sim_object (Simulation): Valid simulation scenario
"""
if scenario_path:
self.register_simulation(scenario_path)
self.load_symuvia()
self.__library.SymRunEx(self.scenarioFilename("UTF8"))
[docs] def run(self, scenario_path: str = ""):
"""Alias method to run simulation
Args:
scenario_path (Simulation): Valid simulation scenario
"""
self.run_simulation(scenario_path)
[docs] def request_answer(self):
"""Request simulator answer and maps the data locally"""
if self.step_launch_mode == "lite":
self._bContinue = self.__library.SymRunNextStepLiteEx(
self.write_xml, byref(self._b_end)
)
return
self._bContinue = self.__library.SymRunNextStepEx(
self.buffer_string, self.write_xml, byref(self._b_end)
)
self.request.query = self.buffer_string.value
self.vehicles.update_list()
@printer_time
def run_step(self) -> int:
"""Run simulation step by step
:returns it: Iteration step
:type it: int
"""
try:
self.request_answer()
self._c_iter = next(self._n_iter)
return self._c_iter
except StopIteration:
self._bContinue = False
return -1
[docs] def stop_step(self):
"""Stop current current step of running simulation"""
self._bContinue = False
[docs] def create_vehicle(
self,
vehtype: str,
origin: str,
destination: str,
lane: int = 1,
simid: int = 0,
) -> int:
"""Creates a vehicle within the network
:param vehtype: vehicle type according to simulation definitions
:type vehtype: str
:param origin: network endpoint nodeaccording to simulation
:type origin: str
:param destination: network endpoint nodeaccording to simulation
:type destination: str
:param lane: vehicle lane number, defaults to 1
:type lane: int
:param simid: simulation id, defaults to 0
:type simid: int
:returns vehid: Vehicle id of the vehicle created >0
:type vehid: int
Example:
One example to create a vehicle is as follows ::
>>> with symuflow as s:
>>> while s.do_next:
>>> s.request_answer() # Initialize
>>> s.request_answer() # Vehicle 0
>>> # Vehicle instantiation
>>> veh_id = s.create_vehicle("VL", "Ext_In", "Ext_Out")
>>> force_driven = s.request.is_vehicle_driven("1")
>>> s.request_answer()
"""
endpoints = self._sim.get_network_endpoints()
veh_data = self._sim.get_vehicletype_information()
vehid = tuple(v["id"] for v in veh_data)
# Consistency checks
if vehtype not in vehid:
raise SymupyVehicleCreationError(
"Unexisting Vehicle Class in File: ", self.scenarioFilename()
)
if (origin not in endpoints) or (destination not in endpoints):
raise SymupyVehicleCreationError(
"Unexisting Network Endpoint File: ", self.scenarioFilename()
)
# Vehicle creation
vehid = self.__library.SymCreateVehicleEx(
vehtype.encode("UTF8"),
origin.encode("UTF8"),
destination.encode("UTF8"),
c_int(lane),
c_double(self.simulationstep),
)
return vehid
[docs] def create_vehicle_with_route(
self,
vehtype: str,
origin: str,
destination: str,
lane: int = 1,
creation_time: float = 0,
route: str = "",
) -> int:
"""Creates a vehicle with a specific route
:param vehtype: vehicle type according to simulation definitions
:type vehtype: str
:param origin: network endpoint nodeaccording to simulation
:type origin: str
:param destination: network endpoint nodeaccording to simulation
:type destination: str
:param lane: vehicle lane number, defaults to 1
:type lane: int
:param route: route followed by the vehicle, defaults to ""
:type route: str
:return vehid: Vehicle id of the vehicle created >0
:type vehid: int
"""
if origin == destination:
return -1
endpoints = self._sim.get_network_endpoints()
veh_data = self._sim.get_vehicletype_information()
vehid = tuple(v["id"] for v in veh_data)
# Consistency checks
if vehtype not in vehid:
raise SymupyVehicleCreationError(
"Unexisting Vehicle Class in File: ", self.scenarioFilename()
)
if (origin not in endpoints) or (destination not in endpoints):
raise SymupyVehicleCreationError(
"Unexisting Network Endpoint File: ", self.scenarioFilename()
)
# Vehicle creation
vehid = self.__library.SymCreateVehicleWithRouteEx(
origin.encode("UTF8"),
destination.encode("UTF8"),
vehtype.encode("UTF8"),
c_int(lane),
c_double(creation_time - self.simulationstep),
route.encode("UTF8"),
)
return vehid
[docs] def drive_vehicle(
self, vehid: int, new_pos: float, destination: str = None, lane: str = 1
):
"""Drives a vehicle to a specific position
:param vehtype: vehicle type according to simulation definitions
:type vehtype: str, optional
:param new_pos: position to place the vehicle
:type new_pos: float
:param destination: link of destination, defaults to None
:type destination: str
:param lane: lane fo destination, defaults to 1
:type lane: int
:param route: route followed by the vehicle, defaults to ""
:type route: str
Example:
One example to drive a vehicle as follows ::
>>> with symuflow as s:
>>> while s.do_next:
>>> s.run_step()
>>> if s.request.is_vehicle_in_network("0"):
>>> drive_status = s.drive_vehicle(0, 1.0)
>>> force_driven = s.request.is_vehicle_driven("0")
"""
links = self._sim.get_network_links()
if not destination:
destination = self.request.filter_vehicle_property("link", vehid)[0]
if destination not in links:
raise SymupyDriveVehicleError(
"Unexisting Network Endpoint File: ", self.scenarioFilename()
)
# TODO: Validate that position do not overpass the max pos
dr_state = self.__library.SymDriveVehicleEx(
c_int(vehid),
destination.encode("UTF8"),
c_int(lane),
c_double(new_pos),
1,
)
self.request_answer()
return dr_state
[docs] def drive_vehicle_new_route(self, vehid: int, new_route: str) -> int:
"""Modifies the current path of a vehicle by stablishing the new route
Args:
vehid (int): vehicle id
new_route (str): string contained links separated by spaces with the path to be taken by the vehicle
Returns:
int: Value containing one of the following values
=========== =================================
**Value** **Description**
----------- ---------------------------------
0 The function is successfully executed
-1 No network loaded
-2 The vehicle doesn't exist
-3 The new route is empty
-4 A link of the new route not in network
-5 New route is unattainable links are not connected
-6 New route destination is different from original
-7 New route cannot be reached by the vehicle
=========== =================================
"""
return self.__library.SymAlterRouteEx(vehid, new_route.encode("UTF8"))
[docs] def drive_vehicle_with_control(
self, vehcontrol, vehid: int, destination: str = None, lane: str = 1
):
# TODO: Basic prototyping
vehcontrol.set_current_state(self.request)
new_pos = vehcontrol.new_position
return self.drive_vehicle(vehid, new_pos, destination, lane)
[docs] def init_symbol_states(self):
"""Initializes symbols before call of a runtime for access in memory"""
# Total network information
self.__library.SymGetListofVehicleIdsEx.restype = c_char_p
self.__library.SymGetTotalTravelTimeEx.restype = c_double
self.__library.SymGetTotalTravelDistanceEx.restype = c_double
# Vehicle information
self.__library.SymGetVehicleAcc.restype = c_double
self.__library.SymGetVehicleSpeed.restype = c_double
self.__library.SymGetVehicleLink.restype = c_char_p
self.__library.SymGetVehicleAbscissa.restype = c_double
self.__library.SymGetVehicleOrdinate.restype = c_double
self.__library.SymGetVehicleLane.restype = c_int
self.__library.SymGetVehicleRelativePositionOnLink.restype = c_double
self.__library.SymGetVehicleTravelDistance.restype = c_double
self.__library.SymGetVehicleTravelTime.restype = c_double
[docs] def get_vehicle_acceleration(self, vehid: int) -> float:
"""Extract information related to the vehicle's acceleration
Args:
vehid (int): vehicle identifier
Returns:
float: vehicle acceleration [m/s²]
"""
return self.__library.SymGetVehicleAcc(c_int(vehid))
[docs] def get_vehicle_speed(self, vehid: int) -> float:
"""Extract information related to the vehicle's speed
Args:
vehid (int): vehicle identifier
Returns:
float: vehicle speed [m/s]
"""
return self.__library.SymGetVehicleSpeed(c_int(vehid))
[docs] def get_vehicle_link(self, vehid: int) -> str:
"""Extract information related to the vehicle's link
Args:
vehid (int): vehicle identifier
Returns:
str: vehicle link [string]
"""
response = self.__library.SymGetVehicleLink(c_int(vehid))
return "" if response is None else response.decode("UTF8")
[docs] def get_vehicle_abscissa(self, vehid: int) -> float:
"""Extract information related to the vehicle's abscissa
Args:
vehid (int): vehicle identifier
Returns:
float: vehicle abcissa (x) position [m]
"""
return float(self.__library.SymGetVehicleAbscissa(c_int(vehid)))
[docs] def get_vehicle_ordinate(self, vehid: int) -> float:
"""Extract information related to the vehicle's ordinate
Args:
vehid (int): vehicle identifier
Returns:
float: vehicle ordinate (y) position [m]
"""
return self.__library.SymGetVehicleOrdinate(c_int(vehid))
[docs] def get_vehicle_lane(self, vehid: int) -> int:
"""Extract information related to the vehicle's lane
Args:
vehid (int): vehicle identifier
Returns:
int: vehicle lane position (0) right most lane [int]
"""
return self.__library.SymGetVehicleLane(c_int(vehid))
[docs] def get_vehicle_distance(self, vehid: int) -> float:
"""Extract information related to the vehicle's distance
Args:
vehid (int): vehicle identifier
Returns:
float: vehicle distance in link position [m]
"""
return self.__library.SymGetVehicleRelativePositionOnLink(c_int(vehid))
[docs] def get_vehicle_total_travel_distance(self, vehid: int) -> float:
"""Extract information related to the vehicle's total
Args:
vehid (int): vehicle identifier
Returns:
float: vehicle total traveled distance [m]
"""
return self.__library.SymGetVehicleTravelDistance(c_int(vehid))
[docs] def get_vehicle_total_travel_time(self, vehid: int) -> float:
"""Extract information related to the vehicle's total
Args:
vehid (int): vehicle identifier
Returns:
float: vehicle total traveled time [s]
"""
return self.__library.SymGetVehicleTravelTime(c_int(vehid))
[docs] def get_total_travel_time(self, sensors_mfd: list = []) -> TupleFloat:
"""Extracts the total travel time of vehicles in a specific MFD region
Args:
sensors_mfd (list, optional): MFD sensor ids, defaults to [].
Returns:
TupleFloat: Associated total travel time
"""
# TODO: Improvement → Better organizadtion
if isinstance(sensors_mfd, str):
return self.__library.SymGetTotalTravelTimeEx(
sensors_mfd.encode("UTF8")
)
if not sensors_mfd:
sensors_mfd = self.simulation.get_mfd_sensor_names()
return tuple(
self.__library.SymGetTotalTravelTimeEx(sensor.encode("UTF8"))
for sensor in sensors_mfd
)
[docs] def get_total_travel_distance(self, sensors_mfd: list = []) -> TupleFloat:
"""Extracts total travel distance of vehicles in a specific MFD region
Args:
sensors_mfd (list, optional): MFD sensor ids, defaults to [].
Returns:
TupleFloat: Associated total travel distance
"""
if isinstance(sensors_mfd, str):
return self.__library.SymGetTotalTravelDistanceEx(
sensors_mfd.encode("UTF8")
)
if not sensors_mfd:
sensors_mfd = self.simulation.get_mfd_sensor_names()
return tuple(
self.__library.SymGetTotalTravelDistanceEx(sensor.encode("UTF8"))
for sensor in sensors_mfd
)
[docs] def get_mfd_speed(self, sensors_mfd: list = []) -> TupleFloat:
"""Estimates the spatial speed of vehicles in a specific MFD region
Args:
sensors_mfd (list, optional): [MFD sensor id, defaults to [].
Returns:
TupleFloat: Estimated speed computed as ttt/ttd
"""
if isinstance(sensors_mfd, str):
d = self.get_total_travel_distance(sensors_mfd)
t = self.get_total_travel_time(sensors_mfd)
spd = d / t if t != 0 else 10
return spd
itdsttm = zip(
self.get_total_travel_distance(sensors_mfd),
self.get_total_travel_time(sensors_mfd),
)
spd = []
for d, t in itdsttm:
if t != 0:
spd.append(d / t)
else:
spd.append(10) # minimum speed?
return tuple(spd)
[docs] def get_vehicle_inside_area(self, sensors_mfd: list = []):
"""Obtains the set of vehicles inside a list
Args:
sensors_mfd (list, optional): Sensor name. Defaults to [].
Returns:
[type]: [description]
"""
if isinstance(sensors_mfd, str):
return tuple(
(
self.__library.SymGetListofVehicleIdsEx(
sensors_mfd.encode("UTF8")
)
)
.decode("UTF8")
.split(" ")[:-1]
)
return tuple()
[docs] def add_control_probability_zone_mfd(
self, access_probability: dict, minimum_distance: dict
):
"""
Add a probability to control the access to a specific zone within the network
:param access_probability: Key (zone name) Value (probability of access)
:type access_probability: dict
:param minimum_distance: Key (zone name) Value (distance before entering the zone to activate policy)
:type minimum_distance: dict
"""
self.dctidzone = {}
for tp_zn_pb, tp_zn_md in zip(
access_probability.items(), minimum_distance.items()
):
sensor, accrate = tp_zn_pb
_, min_dst = tp_zn_md
links = self.simulation.get_links_in_mfd_sensor(sensor)
links_str = " ".join(links)
self.dctidzone[sensor] = self.__library.SymAddControlZoneEx(
-1,
c_double(accrate),
c_double(min_dst),
c_double(1),
f"{links_str}".encode("UTF8"),
)
# Apply set control
self.__library.SymApplyControlZonesEx(-1)
return self.dctidzone
[docs] def modify_control_probability_zone_mfd(self, access_probability: dict):
"""
Modifies a probability to control the access to a specific zone within the network
:param access_probability: Key (zone name) Value (probability of access)
:type access_probability: dict
"""
for sensor, probablity in access_probability.items():
self.__library.SymModifyControlZoneEx(
-1, self.dctidzone[sensor], c_double(probablity)
)
# Apply set control
self.__library.SymApplyControlZonesEx(-1)
return self.dctidzone
def __enter__(self):
"""
This method initializes the usage of the ``Simulator`` class as a context manager.
The protocol followed in order to perform the full connection is as follows
"""
self.reset_state()
# Compliance situation
self.__performCompliance()
# Connect to platform
self.__performConnect()
# Variable initialization
self.__performInitialize()
# Extra
return self
def __exit__(self, type, value, traceback) -> bool:
self.__library.SymUnloadCurrentNetworkEx()
click.echo("Runtime: End")
return False
[docs] def build_dynamic_param(self):
"""Construct parameters for vehicle dynamics"""
self.__dct_par = {
"time_step": self.simulation.time_step,
"engine_tau": CT.ENGINE_CONSTANT,
}
# =========================================================================
# STATE MACHINE
# =========================================================================
def __performCompliance(self) -> None:
"""
Perform compliance check
"""
self.next_state(True)
def __performConnect(self) -> None:
"""
Perform simulation connection
"""
self.load_symuvia()
self.load_network()
self.__library.SymGetListofVehicleIdsEx.restype = c_char_p
self.next_state(True)
def __performInitialize(self) -> None:
"""
Perform simulation initialization
"""
self._b_end = c_int()
self.request = SimulatorRequest()
self._n_iter = iter(self._sim.get_simulation_steps())
self._c_iter = next(self._n_iter)
self._bContinue = True
self.vehicles = VehicleList(self.request)
self.init_symbol_states()
self.build_dynamic_param()
self.next_state(self.do_next)
def __performPreRoutine(self) -> None:
"""
Perform simulator preroutine
"""
self.next_state(self.do_next)
def __performQuery(self) -> None:
"""
Perform simulator Query
"""
self.next_state(self.do_next)
def __performControl(self) -> None:
"""
Perform simulator Control
"""
self.next_state(self.do_next)
def _set_manual_initialization(self) -> None:
"""
This method is a way to set manual initialization of the simulator
for testing purposes. (Internal use)
"""
self.__performCompliance()
self.__performConnect()
self.__performInitialize()
# =========================================================================
# ATTRIBUTES
# =========================================================================
[docs] def scenarioFilename(self, encoding=None) -> str:
"""
Scenario filenamme
:return: Absolute path towards the XML input for SymuFlow
:rtype: str
"""
return self.simulation.filename(encoding)
@property
def s_response_dec(self):
"""
Obtains instantaneous data from simulator
:return: last query from simulator
:rtype: str
"""
return self.buffer_string.value.decode("UTF8")
@property
def do_next(self) -> bool:
"""
Returns true if the simulation shold continue
:return: True if next step continues
:rtype: bool
"""
return self._bContinue
@property
def get_request(self) -> dict:
"""
Returns the query received from the simulator
:return: Request from the simulator
:rtype: dict
"""
return self.request.data_query
@property
def simulation(self) -> Simulation:
"""
Simulation scenario
:return: Object describing senario under simulation
:rtype: Simulation
"""
return self._sim
@property
def simulationstep(self) -> float:
"""
Current simulation step.
Example:
You can use the time step to control actions
>>> with simulator as s:
... while s.do_next()
... if s.simulationstep>0:
... print(s.simulationtimestep)
:return: current simulation iteration
:rtype: str
"""
return self._c_iter
@property
def sampling_time(self) -> float:
"""
Simulation sampling time
:return: sampling time from XML file
:rtype: float
"""
return self.simulation.sampling_time
@property
def library(self):
return self.__library
# =========================================================================
# CONSTRUCTORS
# =========================================================================
[docs] @classmethod
def from_path(cls, filename_path: str, symuvia_path: str):
"""Alternative constructor for the Simulator
Example:
To use this alternative constructor ``Simulator`` declare in a string the ``path`` to the simulator ::
>>> path = "path/to/simulator.so"
>>> scenario = "path/to/scenario.xml"
>>> simulator = Simulator.from_path(path,scenario)
"""
sim = cls(library_path=symuvia_path)
sim.register_simulation(filename_path)
return sim