-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathzigpy_app.py
executable file
·308 lines (256 loc) · 9.93 KB
/
zigpy_app.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
#!/usr/bin/python3
from zigpy.config import CONF_DEVICE
import zigpy.config as conf
from zigpy.types.named import EUI64
import zigpy.device
import asyncio
from asyncio import create_task
import shella
import json
"""
Replace 'zigpy_xbee' with the radio that you are using, you can download it using pip.
https://github.com/zigpy/zigpy-xbee
https://github.com/zigpy/zigpy-deconz
https://github.com/zigpy/zigpy-zigate
"""
from zigpy_xbee.zigbee.application import ControllerApplication
from zigpy.zcl.clusters.general import OnOff
s=OnOff.server_commands.get(0x0)
def print_cluster_info(cluster):
print(cluster.name)
for x in cluster.server_commands:
s=OnOff.server_commands.get(x)
if s:
print(f"Command '{s.name}', arguments= {s.schema.__dict__.get('__annotations__')}")
for x in cluster.client_commands:
s=OnOff.server_commands.get(x)
if s:
print(f"Command '{s.name}', arguments= {s.schema.__dict__.get('__annotations__')}")
for k,v in enumerate(cluster.attributes_by_name):
print(f"Attribute {k}: {v} ")
if 0:
for c in OnOff._registry:
cluster=OnOff._registry.get(c)
print(cluster)
print(cluster.attributes_by_name.keys())
print_cluster_commands(cluster)
if 0:
import sys
sys.exit(1)
device_config = {
#Change to your device
conf.CONF_DEVICE_PATH: "/dev/ttyDigi",
}
zigpy_config = {
conf.CONF_DATABASE: "zigpy.db",
conf.CONF_DEVICE: device_config,
conf.CONF_NWK_CHANNEL: 20
}
cluster_help="[device_id:int|ieee] [endpoint:int] [cluster:int] [in_out:str]"
cluster_template="%s %d %d %s"
"""
argv=[device_id:int|ieee, endpoint:int, cluster:int, in_out:str]
"""
def get_cluster_from_args(argv:list):
try:
#The IEEE address is being used as keys in a dict
device_ids=list(za.devices.keys())
device=None
if ":" in argv[1]:
device=za.get_device(EUI64.convert(argv[1]))
else:
try:
dev_id=int(argv[1])
ieee=str(device_ids[dev_id])
device=za.get_device(EUI64.convert(ieee))
except:
dev_help_list=[str(i)+':\t'+str(v) for i,v in enumerate(device_ids)]
print(f"Could not find device <{dev_id}>, available devices are:")
print("<id>\t<IEEE>")
print('\n'.join(dev_help_list))
print("Please provide either as a IEEE address or id")
return None
endpoint_id=int(argv[2])
endpoint=None
help_list=[str(ep) for ep in device.endpoints.values()]
try:
endpoint=device.endpoints.get(endpoint_id)
if not endpoint:
raise Exception("No endpoint found")
except:
print(device.endpoints.values())
print(f"Could not find endpoint {endpoint_id}, avalible endpoints for device {argv[1]} are:\n",'\n'.join(help_list))
return None
cluster_id=int(argv[3])
out_in=argv[4].lower()
try:
cluster=None
if out_in in ["o","output","out"]:
cluster=endpoint.out_clusters.get(cluster_id)
elif out_in in ["i","input","in"]:
cluster=endpoint.in_clusters.get(cluster_id)
else:
print("Please specifiy cluster direction as i|in|input or o|out|output")
return None
if not cluster:
raise Exception("Could not find cluster")
return cluster
except:
print(f"Could not find cluster {cluster_id}, available clusters are:\n",'\n'.join(help_list))
return None
except:
print("ERROR, Please use [device_id:int|ieee, endpoint:int, cluster:int, in_out:str]")
@shella.shell_cmd(["cluster","ci"],desc="Get cluster info",usage=f"[cluster_id:int]",template=f"%d")
async def cluster_info_cmf(argv):
cluster=OnOff._registry.get(int(argv[1]))
print_cluster_info(cluster)
@shella.shell_cmd(["command","c"],desc="Enable pairing mode",usage=f"{cluster_help} [json_cmd]",template=f"{cluster_template} %s")
async def command_cmd(argv):
if len(argv)<2:
print("device_id:int|ieee, endpoint:int, cluster:int, in_out:str, command_id, (json_cmd)")
print("Example, turn off onOff cluster: c 9 1 6 in 0")
print("Example, command 1 1 3 in 0 {'identify_time':10}")
return
#https://github.com/zigpy/zigpy/blob/dev/zigpy/zcl/clusters/general.py
cluster=get_cluster_from_args(argv)
command_id=None
if not cluster:
return
try:
command_id=int(argv[5])
except:
print("Provide command id:int")
return
cmd_args={}
try:
cmd_args=json.loads(argv[6])
except:
pass
print("Cmd args",cmd_args)
#command 1 1 3 in 0 {"identify_time":10}
await cluster.command(command_id,**cmd_args)
@shella.shell_cmd(["pair","p"],desc="Enable pairing mode",usage=f"[pair_duration]",template="%d")
async def pair_cmd(argv):
if len(argv)>1:
duration=argv[1]
await za.permit(int(duration))
if(int(duration)>254):
print("Warning: please provide a duration from 1-254 seconds")
print(f"Allowing pairing for {duration} seconds")
else:
print("Please provide pairing time from 1-254 seconds")
return False
@shella.shell_cmd(["bind","bd"],desc="Bind to a device",usage=f"{cluster_help}",template=f"{cluster_template}")
async def bind_cmd(argv):
await get_cluster_from_args(argv).bind()
@shella.shell_cmd(["unbind"],desc="Undbind from a device")
async def unbind_cmd(argv):
await get_cluster_from_args(argv).unbind()
@shella.shell_cmd(["list","ls"],desc="Shows all avalible devices")
async def devices_list_cmd(argv):
print(f"Found {len(za.devices.values())} devices...")
for i,device in enumerate(za.devices.values()):
print(f"{i}: {device.ieee}: '{device.manufacturer}': '{device.model}'")
@shella.shell_cmd(["device","d"],desc="Shows device information",usage="[int|ieee]",template="%s")
async def devices_cmd(argv):
if len(argv)<1:
print("Please provide device int|ieee")
for i,device in enumerate(za.devices.values()):
if i==int(argv[1]):
print(f"====== {i}: {device.manufacturer} {device.model} ====== ")
print(f"IEEE Adress {device.ieee}")
print(f"NWK {device.nwk}")
print(f"Initialized {device.is_initialized}")
print(f"rssi {device.rssi}")
for endpoint in device.endpoints.values():
if not isinstance(endpoint, zigpy.zdo.ZDO):
print(f"~~~ Endpoint #{endpoint.endpoint_id} ~~~")
def print_clusters_info(clusters):
any_clusters=False
for k,v in clusters.items():
any_clusters=True
#print(f"{v.cluster_id} Cluster Command: {list(command.name for command in v.client_commands.values())}")
print(f"\t{v.cluster_id}\t{v.ep_attribute}")
if not any_clusters:
print("\tNo clusters")
print(" Input clusters:")
print_clusters_info(endpoint.in_clusters)
print(" Output clusters:")
print_clusters_info(endpoint.out_clusters)
@shella.shell_cmd(["q","quit"],desc="Quit program")
async def stop_program(argv):
global running
import sys
print("Shutting down zigbee, saving state...")
await za.shutdown()
print("Stopping other tasks...")
running=False
stop_minicli()
print("Good bye")
class YourListenerClass:
def __init__(self,za,ext_callback):
self.za=za
self.ext_callback=ext_callback
"""
These are called by the ControllerApplication using call like this:
self.listener_event("raw_device_initialized", device)
"""
def raw_device_initialized(self,device: zigpy.device.Device):
print("Raw device init",device)
def device_initialized(self,device: zigpy.device.Device):
print("device_initialized",device)
def device_removed(self,device: zigpy.device.Device):
print("device_removed",device)
def handle_message(
self,
device: zigpy.device.Device,
profile: int,
cluster: int,
src_ep: int,
dst_ep: int,
message: bytes,
) -> None:
if(self.ext_callback):
self.ext_callback(ieee=device.ieee,message=message,cluster=cluster)
else:
print(f"Handle_message {device.ieee} profile {profile}, cluster {cluster}, src_ep {src_ep}, dst_ep {dst_ep},\tmessage {message} ")
def device_joined(self,device: zigpy.device.Device):
print("device_joined",device)
def group_added(self,group,ep):
print("Group added",group,ep)
def device_left(self,device: zigpy.device.Device):
print("device_left",device)
def get_za():
global za
return za
async def start_zigbee(ext_callback):
try:
global za
za = await ControllerApplication.new(
config=ControllerApplication.SCHEMA(zigpy_config),
auto_form=True,
start_radio=True,
)
listener = YourListenerClass( za,ext_callback )
za.add_listener(listener)
za.groups.add_listener(listener)
print("ZigPy started!")
except Exception:
import traceback
traceback.print_exc()
running=True
async def run_application(ext_callback=None):
await start_zigbee(ext_callback)
shella_task_ = asyncio.create_task(shella.shell_task())
shella.set_prompt("zigbee$")
await shella_task_
if za:
await za.shutdown()
running=False
async def stop_application():
print("Stopping application...")
global running
running=False
await za.shutdown()
if __name__ =="__main__":
asyncio.run(run_application())