-
Notifications
You must be signed in to change notification settings - Fork 1
/
Copy pathhellopi.py
377 lines (288 loc) · 13.1 KB
/
hellopi.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
#!/usr/bin/env python3
# Hello Pi
# A program to identify the ip address of Raspberry Pis (or other devices) added to the local area network.
#
# David Smith
# 4/11/21
# License: MIT
import errno
import datetime as dt
import logging
import os
import socket
import sys
import traceback
import ipxray.packet as ipxray
# ** Module configuration constants **
APP_NAME = "Hello Pi"
APP_VERSION = "1.0.1"
LOG_ENABLE_LOGGING = False
LOG_FILENAME = APP_NAME.replace(" ", "_").lower() + ".log"
LOG_LEVEL = logging.INFO
# The organizationally unique ids (OUI) for Raspberry Pis (first 3 bytes of the MAC address)
RASPBERRY_PI_MAC_OUIS = [bytes.fromhex("28cdc1"), bytes.fromhex("2ccf67"), bytes.fromhex("3a3541"),
bytes.fromhex("b827eb"), bytes.fromhex("d83add"), bytes.fromhex("dca632"),
bytes.fromhex("e45f01")]
# Identify the current OS platform
OS_PLATFORM = sys.platform
# ** Module objects/variables **
logger = logging.getLogger(__name__) # Local logger for this module
logger.addHandler(logging.NullHandler()) # Add a local null handler (eats messages when logging disabled)
logger.propagate = False # Prevent log messages from propagating further up than this level.
if LOG_ENABLE_LOGGING:
# Configure the logger, log file, level, and format
logging.basicConfig(filename=os.path.join(os.path.dirname(os.path.abspath(__file__)), LOG_FILENAME), level=LOG_LEVEL,
format="%(asctime)s [%(module)12.12s:%(lineno)4s] %(levelname)-8s %(message)s", filemode='w')
# Command-line arguments
cmdline_args_dict = {'-a': False, '-h': False, '-q': False, '-v': False}
show_all_devices = False # When True, show all devices - not just RPIs
verbose_output = False # When True, additional info is output
windows_platform = "win32" in OS_PLATFORM # True when on Windows; False otherwise
def format_time(datetime_obj: dt.datetime) -> str:
"""
Apply simple formatting to display the time from a datetime.datetime object.
:param datetime_obj: datetime.datetime object to use as time to display
:return: A time string formatted such as "1:27.03 AM"
"""
if not isinstance(datetime_obj, dt.datetime):
raise TypeError("TypeError: datetime.datetime object required.")
# Note: Windows doesn't support the -I formatter, so must use I for cross-platform
return datetime_obj.strftime("%I:%M.%S %p")
def open_socket():
"""
Open a raw socket in promiscuous mode to receive all LAN packets.
:return: A raw socket in promiscuous mode
"""
# socket.
ETH_P_ALL = 0x0003 # Receive all layer 3 protocols
ETH_P_IP = 0x0800 # Receive only ip layer 3 protocol
if "win32" in OS_PLATFORM:
# NOTE: On windows, firewall rules apply to raw sockets (unlike linux). Must add a windows firewall ALLOW rule
# with the following characteristics:
# WINDOWS FIREWALL ALLOW rule: Name->DHCP Server Port; Protocol->UDP; Local port->67; Remote port->68;
# Local IP Address: 255.255.255.255; Remote IP Address: 0.0.0.0
# Windows: Create raw socket
# AF_INET DOES NOT BYPASS WINDOWS FIREWALL, so port of interest must be opened in the firewall
try:
sock = socket.socket(socket.AF_INET, socket.SOCK_RAW, socket.IPPROTO_IP)
except OSError as ex:
# Permissions error under Windows
if ex.errno == errno.WSAEACCES:
if verbose_output:
msg = "Error: Program must be run with administrator privileges to open a raw socket."
else:
msg = "Error: Program must be run with administrator privileges."
print(msg)
logger.error(msg)
else:
msg = "Error attempting to open raw socket: " + str(ex)
print(msg)
logger.error(msg)
sys.exit(-1)
# Socket calls are blocking - using a short timeout is required to keep responsive to user ctrl-C on Windows
sock.settimeout(1)
# Bind raw socket to ip broadcast address
sock.bind(("255.255.255.255", 0))
# # [Windows specific] Include IP headers
sock.setsockopt(socket.IPPROTO_IP, socket.IP_HDRINCL, 1)
# [Windows specific] receive all packages
sock.ioctl(socket.SIO_RCVALL, socket.RCVALL_ON)
elif "darwin" in OS_PLATFORM:
# Mac OSX: Create raw socket
try:
sock = socket.socket(socket.AF_PACKET, socket.SOCK_RAW, socket.htons(ETH_P_IP))
except PermissionError:
if verbose_output:
msg = ("Error: Program must be run with administrator privileges (via sudo or as root) " +
"to open a raw socket.")
else:
msg = "Error: Program must be run with administrator privileges (via sudo or as root)."
print(msg)
logger.error(msg)
sys.exit(-1)
except Exception as ex:
msg = "Error attempting to open raw socket: " + str(ex)
print(msg)
logger.error(msg)
sys.exit(-1)
# Socket calls are blocking, but ctrl-C on seems to interrupt them without requiring a socket timeout
# and therefore doesn't seem to need the following line.
# sock.settimeout(1)
else:
# *nix: Create raw socket
# AF_PACKET for raw socket access bypasses iptables, so packet can be read without opening the firewall port
try:
sock = socket.socket(socket.AF_PACKET, socket.SOCK_RAW, socket.htons(ETH_P_IP))
except PermissionError:
if verbose_output:
msg = ("Error: Program must be run with administrator privileges (via sudo or as root) " +
"to open a raw socket.")
else:
msg = "Error: Program must be run with administrator privileges (via sudo or as root)."
print(msg)
logger.error(msg)
sys.exit(-1)
except Exception as ex:
msg = "Error attempting to open raw socket: " + str(ex)
print(msg)
logger.error(msg)
sys.exit(-1)
# Socket calls are blocking, but ctrl-C on *nix seems to interrupt them without requiring a socket timeout
# and therefore doesn't seem to need the following line.
# sock.settimeout(1)
return sock
def close_socket(sock: socket.socket):
if "win32" in OS_PLATFORM:
# Windows
# Take socket out of promisc mode
sock.ioctl(socket.SIO_RCVALL, socket.RCVALL_OFF)
# Shutdown and close the socket
# sock.shutdown(socket.SHUT_RDWR)
sock.close()
else:
# Shutdown and close the socket
sock.close()
def show_hello(p: ipxray.INETPacket, dhcp: ipxray.DHCPPacket, is_rpi: bool = True, verbose: bool = False):
"""
Display formatted 'hello' message.
:param p: INETPacket
:param dhcp: DHCPPacket
:param is_rpi: True-device is an RPi; False-device is not an RPi
:param verbose: True-provide more details; False-provide terse output
:return:
"""
verbose_ip = f"[IP Address {p.ip.format_ip_addr(dhcp.requested_ip)}]"
verbose_common = f"{format_time(dt.datetime.now())} | {verbose_ip:<28} "
terse_common = f"{p.ip.format_ip_addr(dhcp.requested_ip)+' ':<16}"
if is_rpi:
if verbose:
hello_msg = (verbose_common +
f"- Raspberry Pi '{dhcp.hostname}' "
f"({ipxray.ENETPacket.format_mac_addr(dhcp.client_hw_address)}) said \"Hello!\"")
else:
hello_msg = terse_common + f"- Raspberry Pi '{dhcp.hostname}' said \"Hello!\""
else:
if verbose:
hello_msg = (verbose_common + f"- Device '{dhcp.hostname}' "
f"({ipxray.ENETPacket.format_mac_addr(dhcp.client_hw_address)}) said \"Hello!\"")
else:
hello_msg = terse_common + f"- Device '{dhcp.hostname}' said \"Hello!\""
print(hello_msg)
logger.info(hello_msg)
def cmdline_options(argv: list, arg_dict: dict):
"""
Parse command line arguments passed via 'argv'.
:param argv: The list of command-line arguments as produced by sys.argv
:param arg_dict: Dictionary of valid command-line argument entries of type {str: bool}.
:return: arg_dict with args specified in argv True and unspecified args False
"""
if len(argv) > 1:
# Loop through options - skipping item 0 which is the name of the script
for arg in argv[1:]:
# Indicate which expected args that have been passed
if arg in arg_dict:
arg_dict[arg] = True
return arg_dict
def show_help():
print(APP_NAME, f"(v{APP_VERSION:s})")
print("--------")
print("While running, this utility monitors the local area network (LAN) for DHCP requests and reports the ip "
"address of any Raspberry Pi that powers-up connected to the LAN.")
print()
print("OPTIONS:")
print(" -a\tDisplay ALL devices (not just RPis) making a DHCP request for an ip address.")
print(" -h\tDisplay this help message.")
print(" -q\tQuiet the program startup information.")
print(" -v\tDisplay verbose messages.")
def main():
"""
Application main routine.
:return: None
"""
global cmdline_args_dict
global show_all_devices
global verbose_output
try:
# Command-line arguments
argv = sys.argv
ver_str = f"(Version {APP_VERSION:s})"
logging.info(APP_NAME + " " + ver_str)
if argv and len(argv) > 1:
logging.info("Command line arguments: " + " ".join(argv[1:]))
# Decipher command-line options and update
cmdline_args_dict = cmdline_options(argv, cmdline_args_dict)
# Cmdline arg: '-h' = Display help
if cmdline_args_dict.get('-h', False):
show_help()
sys.exit(0)
# Cmdline arg: '-a' = Display all device (not just RPis) DHCP requests
if cmdline_args_dict.get('-a', False):
show_all_devices = True
# Cmdline arg: '-v' = Use verbose output statements
if cmdline_args_dict.get('-v', False):
verbose_output = True
# Cmdline arg: '-q' = Quiet - suppress unnecessary output
if not cmdline_args_dict.get('-q', False):
# Print app identifying info and status
print(APP_NAME)
if verbose_output:
# Verbose version
if show_all_devices:
msg = "Watching LAN for 'Hello' messages from all devices."
msg += "\nPower-up a connected device to see its ip address."
else:
msg = "Watching LAN for 'Hello' messages from Raspberry Pis."
msg += "\nPower-up a connected Raspberry Pi to see its ip address."
print(msg)
else:
# Regular version
if show_all_devices:
print("Watching LAN for 'Hello' messages from all devices.")
else:
print("Watching LAN for 'Hello' messages from Raspberry Pis.")
print()
# Reusable packet buffer
packet_buffer = bytearray(2*4096)
# Open the socket and look for desired packet
sock = open_socket()
try:
while True:
try:
# Listen for packets (blocking call)
packet_size = sock.recv_into(packet_buffer)
except socket.timeout:
packet_size = None
if not packet_size:
continue
# Attempt to decode the packet
if windows_platform:
# Win sockets return ip protocol (layer 3)
p = ipxray.INETPacket(ip_packet=packet_buffer)
else:
# *nix sockets return ethernet protocol (layer 2)
p = ipxray.INETPacket(enet_packet=packet_buffer)
# Looking for DHCP messages being sent to DHCP server ...
if p.udp and p.udp.dst_port == 67:
# Attempt to interpret DHCP packet
dhcp = ipxray.DHCPPacket(p.udp.payload)
if not dhcp.is_dhcp:
continue
# DHCP request message found
if dhcp.message_type == dhcp.OptionMessageType.DHCPRequest:
# If OUI is in RPi Foundation OUIs, then device is a RPi
is_rpi = dhcp.client_hw_address[0:3] in RASPBERRY_PI_MAC_OUIS
if is_rpi or show_all_devices:
show_hello(p, dhcp, is_rpi=is_rpi, verbose=verbose_output)
finally:
# Close the socket when exiting
if sock:
close_socket(sock)
except KeyboardInterrupt:
sys.exit(0)
except Exception as ex:
print("Exception: ", ex)
print(traceback.format_exc())
logger.exception("Unhandled Exception:")
sys.exit(-1)
if __name__ == "__main__":
main()