This repository was archived by the owner on Oct 18, 2022. It is now read-only.
-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathmain.py
81 lines (65 loc) · 2.4 KB
/
main.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
from dataclasses import dataclass
from datetime import datetime, timezone
import json
import os
from fastapi import FastAPI, Response
from redis import Redis
app = FastAPI()
redis = Redis.from_url(os.environ.get("REDIS_URL"))
@app.get("/api/environment/solar")
async def solar_now():
solar_estimates = json.loads(redis.get("solar-estimates"))
time = datetime.now().time()
now = str(datetime(2022, 7, 11, time.hour, (time.minute // 30) * 30))
return {
"pv_estimate": solar_estimates[now],
}
def load_profiles() -> dict[str, str]:
profiles = {}
path = os.environ.get("PROFILES_DIRECTORY") or "data/profiles"
for file in os.listdir(path):
if file.endswith(".yaml"):
with open(os.path.join(path, file), "r", encoding="utf-8") as f:
profiles[file] = f.read()
return profiles
def load_solar():
import csv
from dateutil import parser
estimates: dict[datetime, float] = {}
with open("data/environment/solar/estimated_actuals.csv") as file:
reader = csv.reader(file)
next(reader) # ignore the header
for row in reader:
pv_estimate = float(row[0])
period_end = parser.parse(row[1]).replace(tzinfo=None)
estimates[str(period_end)] = pv_estimate
redis.set("solar-estimates", json.dumps(estimates))
@app.get("/api/retrieve")
async def retrieve_profile():
hydration_states: dict[str, bool] = json.loads(redis.get("hydration-states"))
for key in [key for (key, hydrated) in hydration_states.items() if hydrated]:
profile = redis.get(key)
hydration_states[key] = False
redis.set("hydration-states", json.dumps(hydration_states))
return Response(profile, media_type="text/plain")
return Response(
content=json.dumps({"message": "Requires rehydration. Profiles exhausted."}),
media_type="application/json",
status_code=429,
)
@app.get("/api/inspect")
async def inspect():
return json.loads(redis.get("hydration-states"))
@app.get("/api/rehydrate")
async def rehydrate():
profiles = load_profiles()
hydration_states = {}
for key, profile in profiles.items():
hydration_states[key] = True
redis.set(key, profile)
redis.set("hydration-states", json.dumps(hydration_states))
return hydration_states
@app.on_event("startup")
async def startup_event():
load_solar()
await rehydrate()