"""
XML Parser
==========
A generic parser for XML files.
"""
# ============================================================================
# INTERNAL IMPORTS
# ============================================================================
import re
from functools import cache, cached_property
from typing import Pattern
from symupy.utils.constants import FIELD_FORMAT, FIELD_DATA
# ============================================================================
# CLASS AND DEFINITIONS
# ============================================================================
[docs]class XMLElement:
pattern_tag = re.compile("(?<=<)(\w+)(?=>|\s|\/)")
pattern_comment = re.compile("^<!(.*)>$")
pattern_args = re.compile('\s([a-zA-Z0-9_:]+)="(.*?)"')
pattern_childrens = re.compile("\/>$")
def __init__(self, line, pos, filename, linenum):
self._filename = filename
self._pos = pos
self.tag = XMLElement.pattern_tag.findall(line)[0]
self.attr = {key: val for key, val in XMLElement.pattern_args.findall(line)}
self.sourceline = linenum
if XMLElement.pattern_childrens.findall(line):
self._has_childrens = False
else:
self._has_childrens = True
[docs] def iterchildrens(self):
linenum = self.sourceline
with open(self._filename) as f:
f.seek(self._pos)
f.readline()
if self._has_childrens:
linenum += 1
line = f.readline().strip()
while not any(
bool(x)
for x in [
re.findall(self._end_tag(self.tag), line),
re.findall(self._startend_tag(self.tag), line),
]
):
# Checking if line is a comment or blank
if not XMLElement.pattern_comment.findall(line) and line != "":
new_tag = XMLElement.pattern_tag.findall(line)[0]
end_tag = re.compile(f"<\/{new_tag}>")
if re.findall(self._startend_tag(new_tag), line):
pos = f.tell() - (len(line) + 2)
yield XMLElement(line, pos, self._filename, linenum)
elif re.findall(self._start_tag(new_tag), line):
pos = f.tell() - (len(line) + 2)
keep_line = line
keepnum = linenum
while True:
linenum += 1
line = f.readline().strip()
if end_tag.findall(line):
break
yield XMLElement(keep_line, pos, self._filename, keepnum)
else:
break
linenum += 1
line = f.readline().strip()
else:
yield from ()
[docs] def getchildrens(self):
return list(self.iterchildrens())
[docs] def find_children_tag(self, tag):
for child in self.iterchildrens():
if child.tag == tag:
return child
[docs] def find_children_attr(self, attr, val):
for child in self.iterchildrens():
if attr in child.attr.keys():
if child.attr[attr] == val:
return child
def _start_tag(self, tag):
return f"<{tag}.*>"
def _end_tag(self, tag):
return f"<\/{tag}>"
def _startend_tag(self, tag):
return f"^<{tag}.*\/>"
def __repr__(self):
return f"XMLElement({self.tag}, {self.attr.__repr__()})"
def __hash__(self):
return hash((self._filename, self.sourceline))
def __eq__(self, another):
return (
self.sourceline == another.sourceline
and self._filename == another._filename
)
[docs]class XMLParser(object):
def __init__(self, filename):
self._filename = filename
[docs] def get_elem(self, elem):
linenum = 1
with open(self._filename, "r") as f:
line = f.readline()
while line:
if re.findall(f"(?<=<){elem}(?=>|\s|\/)", line):
pos = f.tell() - len(line)
return XMLElement(line.strip(), pos, self._filename, linenum)
linenum += 1
line = f.readline()
[docs] def xpath(self, path):
tags = path.split("/")
elem = self.get_elem(tags[0])
for t in tags[1:]:
if t[0] == "@":
return elem.attr[t[1:]]
elif elem is None:
return
else:
elem = elem.find_children_tag(t)
return elem
[docs] def get_root(self):
linenum = 1
with open(self._filename, "r") as f:
line = f.readline()
# Check first elem in XML but ignore header and comment
while not re.findall("^<[^\?|!](.*)>$", line):
linenum += 1
line = f.readline()
pos = f.tell() - len(line)
return XMLElement(line.strip(), pos, self._filename, linenum)
PATTERN = {
"abs": re.compile(r'abs="(.*?)"'),
"acc": re.compile(r'acc="(.*?)"'),
"dst": re.compile(r'dst="(.*?)"'),
"etat_pilotage": re.compile(r'dst="([\d\.]*?)"( etat_pilotage=".*?")? id="(.*?)"'),
"id": re.compile(r'id="(\d+)"'),
"ord": re.compile(r'ord="(.*?)"'),
"tron": re.compile(r'tron="(.*?)"'),
"type": re.compile(r'tron="(.*?)" type="(.*?)" vit="(.*?)"'),
"vit": re.compile(r'vit="(.*?)"'),
"voie": re.compile(r'voie="(.*?)"'),
"z": re.compile(r'z="(.*?)"'),
"traj": re.compile(
r'abs="(.*?)" acc="(.*?)" dst="([\d\.]*?)"( etat_pilotage=".*?")? id="(.*?)" ord="(.*?)" tron="(.*?)" type="(.*?)" vit="(.*?)" voie="(.*?)" z="(.*?)"'
),
"inst": re.compile(r'val="(.*?)"'),
"nbveh": re.compile(r'nbVeh="(.*?)"'),
"extract_traj_balise": re.compile('<TRAJS>(.*?)<\/TRAJS>')
}
CAV_TYPE = tuple(value for key, value in FIELD_FORMAT.items())
[docs]class XMLTrajectory:
"""Model object for a trajectory, it can be created from a xml and contains trajectories for a set of vehicles."""
aliases = {
"abscissa": "abs",
"acceleration": "acc",
"distance": "dst",
"elevation": "z",
"lane": "voie",
"link": "tron",
"ordinate": "ord",
"speed": "vit",
"vehid": "id",
"vehtype": "type",
}
def __init__(self, xml: bytes):
self._xml = xml.decode("UTF8")
self._trajs = ''
if len(self._xml)>0:
pattern = PATTERN['extract_traj_balise'].findall(self._xml)
if len(pattern)>0:
self._trajs = pattern[0]
def __getattr__(self, name):
if name == "aliases":
raise AttributeError # http://nedbatchelder.com/blog/201010/surprising_getattr_recursion.html
name = self.aliases.get(name, name)
return object.__getattribute__(self, name)
@cached_property
def abs(self) -> tuple:
"""`abs` cached values for all vehicles in network
Returns:
tuple: cached `abs` values
"""
abs = tuple(
map(
lambda x: FIELD_FORMAT["abs"](x.replace(',', '.')),
PATTERN.get("abs").findall(self._trajs),
)
)
return dict(zip(self.id, abs))
@cached_property
def acc(self):
"""`acceleration` cached values for all vehicles in network
Returns:
tuple: cached `acc` values
"""
acc = tuple(map(FIELD_FORMAT["acc"], PATTERN.get("acc").findall(self._trajs)))
return dict(zip(self.id, acc))
@cached_property
def dst(self):
"""`distance` cached values for all vehicles in network
Returns:
tuple: cached `dst` values
"""
dst = tuple(map(FIELD_FORMAT["dst"], PATTERN.get("dst").findall(self._trajs)))
return dict(zip(self.id, dst))
@cached_property
def driven(self):
"""alias for `etat_pilotage`"""
return self.etat_pilotage
@cached_property
def etat_pilotage(self):
"""`etat_pilotage` cached values for all vehicles in network
Returns:
tuple: cached `etat_pilotage` values
"""
drv = PATTERN.get("etat_pilotage").findall(self._trajs)
if drv:
drv = tuple(map(FIELD_FORMAT["etat_pilotage"], [x[1] for x in drv]))
return dict(zip(self.id, drv))
@cached_property
def id(self):
"""Vehicle `id` cached values for all vehicles in network
Returns:
tuple: cached `id` values
"""
return tuple(
map(
FIELD_FORMAT["id"],
[x for x in PATTERN.get("id").findall(self._trajs)],
)
)
@cached_property
def ord(self):
"""`ordinate` cached values for all vehicles in network
Returns:
tuple: cached `ord` values
"""
ord = tuple(map(lambda x: FIELD_FORMAT["ord"](x.replace(',', '.')), PATTERN.get("ord").findall(self._trajs)))
return dict(zip(self.id, ord))
@cached_property
def tron(self):
"""`link` cached values for all vehicles in network
Returns:
tuple: cached `tron` values
"""
tron = tuple(PATTERN.get("tron").findall(self._trajs))
return dict(zip(self.id, tron))
@cached_property
def type(self):
"""Vehicle `type` cached values for all vehicles in network
Returns:
tuple: cached `type` values
"""
types = tuple(x[1] for x in PATTERN.get("type").findall(self._trajs))
return dict(zip(self.id, types))
@cached_property
def vit(self):
"""`speed` cached values for all vehicles in network
Returns:
tuple: cached `vit` values
"""
vit = [FIELD_FORMAT["vit"](vit.replace(',', '.')) for vit in PATTERN.get("vit").findall(self._trajs)]
return dict(zip(self.id, vit))
@cached_property
def voie(self):
"""`lane` cached values for all vehicles in network
Returns:
tuple: cached `voie` values
"""
voie = tuple(map(FIELD_FORMAT["voie"], PATTERN.get("voie").findall(self._trajs)))
return dict(zip(self.id, voie))
@cached_property
def z(self):
"""`elevation` cached values for all vehicles in network
Returns:
tuple: cached `z` values
"""
z = tuple(map(float, PATTERN.get("z").findall(self._trajs)))
return dict(zip(self.id, z))
@cached_property
def traj(self):
"""Trajectory cached values for all vehicles in network
Returns:
tuple: cached `traj` values
"""
return tuple(
XMLTrajectory._typeconvert(x)
for x in PATTERN.get("traj").findall(self._trajs)
)
@cached_property
def inst(self):
"""`val` simulation time instant for current trajectory
Returns:
float: simulation time
"""
return float(PATTERN.get("inst").findall(self._xml)[0])
@cached_property
def nbveh(self):
"""`nbveh` simulation time instant for current trajectory
Returns:
int: number of vehicles
"""
return int(PATTERN.get("nbveh").findall(self._xml)[0])
@cached_property
def todict(self):
"""Converts to dictionary any of the data in the """
# Relies on order
return tuple(dict(zip(FIELD_DATA.values(), x)) for x in self.traj)
@classmethod
def _typeconvert(cls, data: tuple):
return tuple(a(b) for a, b in zip(CAV_TYPE, data))
if __name__ == "__main__":
import symupy
import os
file = (
os.path.dirname(symupy.__file__)
+ "/../tests/mocks/bottlenecks/bottleneck_001.xml"
)
parser = XMLParser(file)
root = parser.get_root()