Source code for symupy.runtime.monitor.monitors

import re

import numpy as np

from symupy.runtime.api import Simulator
from symupy.runtime.monitor.manager import ScatterMonitorView, LineMonitorView
from symupy.utils.constants import DEFAULT_PATH_SYMUFLOW


[docs]def launch_simuflow(file) -> tuple: sim_instance = Simulator.from_path(file, DEFAULT_PATH_SYMUFLOW) sim_instance.trace_flow = True sim_instance.step_launch_mode = "full" with sim_instance as s: while s.do_next: s.run_step() yield sim_instance.simulationstep, sim_instance.request.datatraj
[docs]class SymuFlowMonitorMFD(ScatterMonitorView): def __init__(self): super(SymuFlowMonitorMFD, self).__init__('MFD', 'Accumulation', 'Production')
[docs] def update(self, step, instants, ind): nbveh = instants.nbveh if instants.vit: mean_speed = np.mean(list(instants.vit.values())) return nbveh, mean_speed*nbveh else: return None, None
[docs]class SymuFlowMonitorAccumulation(LineMonitorView): def __init__(self): super(SymuFlowMonitorAccumulation, self).__init__('Accumulation', 'Instant', 'VEH number')
[docs] def update(self, step, instants, ind): return step, instants.nbveh
[docs]class SymuFlowMonitorVEH(LineMonitorView): def __init__(self, ids, indicator): self._indicators = {"speed": self._update_speed, "acceleration": self._update_acceleration, "distance": self._update_distance} assert indicator in self._indicators super(SymuFlowMonitorVEH, self).__init__(f'VEH {indicator}', 'Instant', indicator, nb_plots=len(ids)) self.ids = ids self.indicator = indicator self._dst = [0]*len(ids) self._lastpos = [None]*len(ids) self.line_labels = [str(id) for id in ids] def _update_speed(self, instants, ind): return instants.vit[self.ids[ind]] def _update_acceleration(self, instants, ind): return instants.acc[self.ids[ind]] def _update_distance(self, instants, ind): curr_pos = np.array([instants.abs[self.ids[ind]], instants.ord[self.ids[ind]]]) if self._lastpos[ind] is None: self._lastpos[ind] = curr_pos else: curr_dst = np.linalg.norm(curr_pos-self._lastpos[ind]) self._dst[ind] += curr_dst self._lastpos[ind] = curr_pos return self._dst[ind]
[docs] def update(self, step, instants, ind): veh_ids = instants.id veh_state = None if self.ids[ind] in veh_ids: y = self._indicators[self.indicator](instants, ind) return step, y else: return None, None
[docs]class SymuFlowMonitorTTT(LineMonitorView): def __init__(self, zone, aggregation_period=1): super(SymuFlowMonitorTTT, self).__init__('Total Travel Time', 'Instant', 'Time') self.zone = set(zone) self.aggregation_period = aggregation_period self.vehs= {} self.ttt = 0
[docs] def update(self, step, instants, ind): if step%self.aggregation_period==0: time = instants.inst curr_t = set() for vehid, tron in instants.tron.items(): if tron in self.zone and vehid in self.vehs: self.ttt += time - self.vehs[vehid] self.vehs[vehid] = time if tron in self.zone: curr_t.add(vehid) self.vehs[vehid] = time [self.vehs.pop(t) for t in list(self.vehs.keys()) if t not in curr_t] return step, self.ttt else: return None, None
[docs]class SymuFlowMonitorTTD(LineMonitorView): def __init__(self, zone, aggregation_period=1): super(SymuFlowMonitorTTD, self).__init__('Total Travel Distance', 'Instant', 'Distance') self.zone = set(zone) self.aggregation_period = aggregation_period self.vehs= {} self.ttd = 0
[docs] def update(self, step, instants, ind): if step%self.aggregation_period==0: curr_t = set() for vehid, tron, abs, ord in zip(instants.id, instants.tron.values(), instants.abs.values(), instants.ord.values()): pos = np.array([abs, ord]) if tron in self.zone and vehid in self.vehs: self.ttd += np.linalg.norm(pos - self.vehs[vehid]) self.vehs[vehid] = pos if tron in self.zone: curr_t.add(vehid) self.vehs[vehid] = pos [self.vehs.pop(t) for t in list(self.vehs.keys()) if t not in curr_t] return step, self.ttd else: return None, None
[docs]class SymuFlowMonitorFlux(LineMonitorView): def __init__(self, zone, aggregation_period=1): super(SymuFlowMonitorFlux, self).__init__('Flux', 'Instant', 'Number', nb_plots=2, line_labels=["InFlux", "OutFlux"]) self.zone = set(zone) self.aggregation_period = aggregation_period self.influx = 0 self.outflux = 0 self.invehs = set()
[docs] def update(self, step, instants, ind): if ind == 0: for vehid, tron in instants.tron.items(): if vehid not in self.invehs and tron in self.zone: self.influx += 1 self.invehs.add(vehid) val = self.influx if step % self.aggregation_period == 0: self.influx= 0 else: curr_vehs = list(instants.tron.keys()) to_del = set() for t in self.invehs: if t not in curr_vehs: self.outflux += 1 to_del.add(t) self.invehs = self.invehs.difference(to_del) val = self.outflux if step % self.aggregation_period == 0: self.outflux= 0 if step % self.aggregation_period == 0: return step, val else: return None, None
[docs]class SymuFlowMonitorFlow(ScatterMonitorView): def __init__(self, xrange=None, yrange=None): super(SymuFlowMonitorFlow, self).__init__('Flow', 'X', 'Y', stack_value=False, symbol="o", xrange=xrange, yrange=yrange)
[docs] def update(self, step, instants, ind): ord = [float(v) for v in instants.ord.values()] abs = [float(v) for v in instants.abs.values()] if ord: return abs, ord else: return None, None