Source code for symupy.tsc.vehicles

"""
Vehicle Model
=============
This module implements a vehicle model.

Vehicle model acts as an instance to trace individual vehicle data and modify vehicle behavior according to given dynamics
"""

# ============================================================================
# STANDARD  IMPORTS
# ============================================================================

from typing import Dict, List
import itertools
import numpy as np
import pandas as pd
from dataclasses import dataclass, asdict


# ============================================================================
# INTERNAL IMPORTS
# ============================================================================

from symupy.runtime.logic.subscriber import Subscriber
from symupy.utils import constants as ct
from .dynamics import VehicleDynamic
from symupy.runtime.logic.sorted_frozen_set import SortedFrozenSet

# ============================================================================
# CLASS AND DEFINITIONS
# ============================================================================


[docs]@dataclass class Vehicle(Subscriber): """Vehicle class defined for storing data on a single vehicle: You need a Publisher from where the vehicle is going to take data: Args: request (Publisher): Parser or object publishing data Retunrns: vehicle (Vehicle): A Dataclass with vehicle parameters ============================ ================================= **Variable** **Description** ---------------------------- --------------------------------- ``abscissa`` Current coordinate on y axis ``acceleration`` Current acceleration ``distance`` Current distance traveled on link ``elevation`` Current elevation ``lane`` Current lane ``link`` Current road vehicle is traveling ``ordinate`` Current coordinate x axis ``speed`` Current speed ``vehid`` Vehicle id ``vehtype`` Vehicle class ============================ ================================= Example: This is one example on how to register a new vehicle :: >>> req = SimulatorRequest() >>> veh = Vehicle(req) >>> req.dispatch() # This will update vehicle data When having multiple vehicles please indicate the `vehid` before launching the dispatch method. This is because the vehicle object is looks for a vehicle id within the data. Example: This is one example on how to register two vehicles :: >>> req = SimulatorRequest() >>> veh1 = Vehicle(req, vehid=0) >>> veh2 = Vehicle(req, vehid=1) >>> req.dispatch() # This will update vehicle data on both vehicles """ counter = itertools.count() abscissa: float = 0.0 acceleration: float = 0.0 distance: float = 0.0 driven: bool = False elevation: float = 0.0 lane: int = 1 link: str = "Zone_001" ordinate: float = 0.0 speed: float = 25.0 vehid: int = 0 vehtype: str = "" def __init__(self, request, **kwargs): """This initializer creates a Vehicle""" # Undefined properties self.count = next(self.__class__.counter) self.dynamic = VehicleDynamic() self.itinerary = [] # Optional properties for key, value in kwargs.items(): setattr(self, key, value) super().__init__(request) def __hash__(self): return hash((type(self), self.vehid)) def __eq__(self, veh): if not isinstance(veh, type(self)): return NotImplemented return self.vehid == veh.vehid
[docs] def update(self): """Updates data from publisher""" dataveh = self._publisher.get_vehicle_properties(self.vehid) self.__dict__.update(**dataveh) link = getattr(self, "link") if link not in getattr(self, "itinerary"): self.itinerary.append(link)
@property def x(self): """Vehicle state vector (x,v,a)""" return np.array((self.distance, self.speed, self.acceleration))
[docs]class VehicleList(SortedFrozenSet): """Class defining a set of vehicles. This class is based on a sorted frozen set and supports multiple operations in between sets. You can define a list based on a simluator request and the list will update automatically via a single method. Args: request (Publisher): Publisher of information Example: Define a list of vehicles to trace the requests :: >>> simrequest = SimulatorRequest() >>> simrequest.query = one_vehicle_xml >>> vl = VehicleList(simrequest) >>> simrequest.query = second_vehicle_xml >>> vl.update_list() # This updates manually The list could be eventually updated as an observer but for simplicity reasons it is kept like this. """ _cumul = set() def __init__(self, request): self._request = request data = [Vehicle(request, **v) for v in request.get_vehicle_data()] self._free = [] self.__class__._cumul = self.__class__._cumul.union( request.get_vehicles_property("vehid") ) SortedFrozenSet.__init__(self, tuple(data))
[docs] def update_list(self): """Update vehicle data according to an update in the request.""" newveh = [] # Create only new vehicles for v in self._request.get_vehicle_data(): if v.get("vehid") not in self.__class__._cumul: newveh.append(Vehicle(self._request, **v)) self.__class__._cumul.add(v.get("vehid")) data = SortedFrozenSet(self._items).union(newveh) # Put vehicles on list self._items = data._items # Take out exciting vehicles for veh in self._items: if veh.vehid not in self._request.get_vehicles_property("vehid"): self.release(veh)
[docs] def release(self, veh: Vehicle): """Moves a vehicle to a free list so that it is not considered in the Args: r (VehType): Vehicle object """ self._items.remove(veh) self.__class__._cumul.remove(veh.vehid) self._free.append(veh)
def _get_vehicles_attribute(self, attribute: str) -> pd.Series: """Retrieve list of parameters Args: attribute (str): One of the vehicles attribute e.g. 'distance' Returns dataframe (series): Returns values for a set of vehicles """ return self._to_pandas()[attribute] @property def acceleration(self) -> pd.Series: """ Returns all vehicle's accelerations """ return self._get_vehicles_attribute("acceleration") @property def speed(self) -> pd.Series: """ Returns all vehicle's accelerations """ return self._get_vehicles_attribute("speed") @property def distance(self) -> pd.Series: """ Returns all vehicle's accelerations """ return self._get_vehicles_attribute("distance") def _to_pandas(self) -> pd.DataFrame: """Transforms vehicle list into a pandas for rendering purposes Returns: df (DataFrame): Returns a table with pandas data. """ return pd.DataFrame([asdict(v) for v in self._items]) def __str__(self): if not self._items: return "No vehicles have been registered" return str(self._to_pandas()) def __repr__(self): if not self._items: return "No vehicles have been registered" return repr(self._to_pandas())