-
Notifications
You must be signed in to change notification settings - Fork 4
/
Copy pathpath-to-nodes.py
77 lines (56 loc) · 1.72 KB
/
path-to-nodes.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
import numpy as np
import osmnx as ox
import json
from pathlib import Path
from db.api import EVedDb
from geo.trajectory import GraphRoute
from valhalla.utils import decode_polyline
def get_geometry(traj_id: int) -> str:
db = EVedDb()
sql = "select geometry from traj_match where traj_id = ?;"
res = db.query_scalar(sql, [traj_id])
geometry = None
if res is not None:
geometry = str(res)
return geometry
def get_max_nodes() -> int:
db = EVedDb()
sql = "select max(traj_id) from traj_nodes;"
n = db.query_scalar(sql)
if n is None:
return 0
else:
return int(n)
def get_max_traj_id() -> int:
db = EVedDb()
sql = "select max(traj_id) from trajectory;"
n = int(db.query_scalar(sql))
return n
def insert_nodes(traj_id: int,
nodes: str) -> None:
db = EVedDb()
sql = "insert into traj_nodes (traj_id, nodes) values (?, ?)"
db.execute_sql(sql, [traj_id, nodes])
def load_graph():
file_name = "./db/ann-arbor.graphml"
path = Path(file_name)
if path.is_file():
gr = GraphRoute.from_file(file_name)
else:
gr = GraphRoute.from_place('Ann Arbor, Michigan')
gr.save(file_name)
return gr
def main():
print("Loading graph...")
gr = load_graph()
max_traj_id = get_max_traj_id()
max_nodes = get_max_nodes()
for traj_id in range(max_nodes + 1, max_traj_id + 1):
print(traj_id)
geometry = get_geometry(traj_id)
if geometry is not None:
line = np.array(decode_polyline(geometry))
nodes = ox.distance.nearest_nodes(gr.graph, line[:, 0], line[:, 1])
insert_nodes(traj_id, json.dumps(nodes))
if __name__ == "__main__":
main()