-
Notifications
You must be signed in to change notification settings - Fork 2
/
Copy pathsystem_agent.py
255 lines (211 loc) · 9.15 KB
/
system_agent.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
#!/usr/bin/env python3
"""
AION SYSTEM AGENT v8.0
Strategy Marketplace Implementation with Cryptographic Verification
"""
import os
import sys
import json
import hashlib
import asyncio
import logging
from dataclasses import dataclass, field
from pathlib import Path
from typing import Dict, List, Optional
from cryptography.hazmat.primitives import hashes
from cryptography.hazmat.primitives.asymmetric import padding, rsa, utils
from cryptography.hazmat.primitives.serialization import load_pem_public_key
from cryptography.exceptions import InvalidSignature
# Configuration
MARKETPLACE_CONFIG = {
"strategy_pool": "/opt/aion/strategies",
"registry_url": "https://marketplace.aion.net/registry.json",
"trusted_keys": "/etc/aion/trusted_keys",
"audit_log": "/var/log/aion/strategy_audit.log",
"verification_hash": "blake2b",
"signature_algorithm": hashes.SHA384,
"max_strategy_size": 1024 * 1024, # 1MB
}
@dataclass
class StrategyPackage:
code: str
metadata: Dict[str, str]
signature: bytes
author_key: str
class StrategyMarketplace:
def __init__(self):
self.strategies: Dict[str, StrategyPackage] = {}
self.registry: Dict[str, Dict] = {}
self._setup_verification()
self._init_marketplace()
def _setup_verification(self):
"""Initialize cryptographic verification resources"""
self.verification_padding = padding.PSS(
mgf=padding.MGF1(MARKETPLACE_CONFIG['signature_algorithm']()),
salt_length=padding.PSS.MAX_LENGTH
)
self.trusted_keys = self._load_trusted_keys()
def _load_trusted_keys(self) -> Dict[str, rsa.RSAPublicKey]:
"""Load authorized developer public keys"""
keys = {}
key_dir = Path(MARKETPLACE_CONFIG['trusted_keys'])
for key_file in key_dir.glob("*.pub"):
try:
with open(key_file, "rb") as f:
keys[key_file.stem] = load_pem_public_key(f.read())
except Exception as e:
logging.error(f"Invalid key file {key_file}: {str(e)}")
return keys
def _init_marketplace(self):
"""Initialize local strategy pool"""
strategy_dir = Path(MARKETPLACE_CONFIG['strategy_pool'])
strategy_dir.mkdir(exist_ok=True, parents=True)
for strategy_file in strategy_dir.glob("*.strategy"):
self._load_local_strategy(strategy_file)
def _load_local_strategy(self, path: Path):
"""Load and verify a local strategy package"""
try:
with open(path, "r") as f:
package_data = json.load(f)
package = StrategyPackage(
code=package_data['code'],
metadata=package_data['metadata'],
signature=bytes.fromhex(package_data['signature']),
author_key=package_data['author_key']
)
if self.verify(package):
self.strategies[package.metadata['id']] = package
logging.info(f"Loaded strategy: {package.metadata['name']}")
else:
logging.warning(f"Invalid signature for strategy: {package.metadata['id']}")
except Exception as e:
logging.error(f"Failed to load strategy {path}: {str(e)}")
def verify(self, package: StrategyPackage) -> bool:
"""Verify strategy authenticity and integrity"""
# Check author trust
if package.author_key not in self.trusted_keys:
logging.error(f"Untrusted author: {package.author_key}")
return False
# Validate package structure
required_fields = {'name', 'id', 'version', 'author'}
if not required_fields.issubset(package.metadata.keys()):
logging.error("Missing required metadata fields")
return False
# Verify cryptographic signature
try:
public_key = self.trusted_keys[package.author_key]
hasher = hashlib.blake2b()
hasher.update(package.code.encode())
digest = hasher.digest()
public_key.verify(
package.signature,
digest,
self.verification_padding,
utils.Prehashed(hashes.BLAKE2b(64))
)
return True
except InvalidSignature:
logging.error("Invalid strategy signature")
return False
except Exception as e:
logging.error(f"Verification error: {str(e)}")
return False
async def refresh_registry(self):
"""Fetch updated strategy registry from marketplace"""
try:
async with aiohttp.ClientSession() as session:
async with session.get(MARKETPLACE_CONFIG['registry_url']) as response:
self.registry = await response.json()
logging.info("Updated strategy registry")
except Exception as e:
logging.error(f"Failed to update registry: {str(e)}")
async def install_strategy(self, strategy_id: str):
"""Install a strategy from the marketplace"""
if strategy_id not in self.registry:
raise ValueError("Unknown strategy ID")
strategy_info = self.registry[strategy_id]
try:
async with aiohttp.ClientSession() as session:
async with session.get(strategy_info['download_url']) as response:
content = await response.text()
package = StrategyPackage(
code=content,
metadata=strategy_info['metadata'],
signature=bytes.fromhex(strategy_info['signature']),
author_key=strategy_info['author_key']
)
if self.verify(package):
self._save_strategy(package)
self.strategies[strategy_id] = package
logging.info(f"Installed strategy: {strategy_info['name']}")
else:
logging.error("Failed to verify downloaded strategy")
except Exception as e:
logging.error(f"Installation failed: {str(e)}")
def _save_strategy(self, package: StrategyPackage):
"""Persist strategy to local pool"""
strategy_path = Path(MARKETPLACE_CONFIG['strategy_pool']) / f"{package.metadata['id']}.strategy"
with open(strategy_path, "w") as f:
json.dump({
"code": package.code,
"metadata": package.metadata,
"signature": package.signature.hex(),
"author_key": package.author_key
}, f)
def list_strategies(self) -> List[Dict]:
"""Return available strategies with verification status"""
return [{
'id': s.metadata['id'],
'name': s.metadata['name'],
'version': s.metadata['version'],
'author': s.metadata['author'],
'verified': self.verify(s)
} for s in self.strategies.values()]
class VerifiedStrategyRunner:
def __init__(self, marketplace: StrategyMarketplace):
self.marketplace = marketplace
self.strategy_cache: Dict[str, Any] = {}
async def execute(self, strategy_id: str, *args, **kwargs):
"""Execute a verified strategy"""
if strategy_id not in self.marketplace.strategies:
raise ValueError("Strategy not available")
strategy = self.marketplace.strategies[strategy_id]
if not self.marketplace.verify(strategy):
raise SecurityError("Strategy verification failed")
return await self._run_strategy(strategy.code, args, kwargs)
async def _run_strategy(self, code: str, args, kwargs):
"""Safely execute strategy code"""
try:
# Execute in restricted environment
restricted_globals = {
'__builtins__': {
'print': logging.info,
'len': len,
'str': str,
'int': int,
# Add other safe builtins as needed
}
}
exec(code, restricted_globals, {
'args': args,
'kwargs': kwargs,
'result': None
})
return restricted_globals.get('result')
except Exception as e:
logging.error(f"Strategy execution failed: {str(e)}")
raise
class SecurityError(Exception):
"""Security verification failure exception"""
pass
if __name__ == "__main__":
# Example usage
marketplace = StrategyMarketplace()
runner = VerifiedStrategyRunner(marketplace)
async def main():
await marketplace.refresh_registry()
await marketplace.install_strategy("cpu_optimizer_v2")
result = await runner.execute("cpu_optimizer_v2",
{"cpu_load": 85})
print(f"Optimization result: {result}")
asyncio.run(main())