infobeamer-testcard/service

119 lines
2.8 KiB
Plaintext
Raw Normal View History

2023-03-14 08:55:54 +00:00
#!/usr/bin/python
2023-04-17 18:49:53 +00:00
import os
2023-03-14 12:37:38 +00:00
import sys
2023-04-17 18:49:53 +00:00
import fcntl
import socket
import struct
import requests
2023-03-14 08:55:54 +00:00
import time
import traceback
2023-04-17 18:49:53 +00:00
from functools import partial
from heapq import heappush, heapreplace
2023-03-14 08:55:54 +00:00
from hosted import api, node
2023-04-17 18:49:53 +00:00
sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
2023-03-14 08:55:54 +00:00
def fetch_info():
info = api.device_info.get()
print >> sys.stderr, info
node["/device_info"](
dict(
description=info["description"],
location=info["location"],
)
)
2023-04-17 18:49:53 +00:00
def send(key, val):
sys.stderr.write("sending %s->%s\n" % (key, val))
sock.sendto('testcard/update/%s:%s' % (key, val), ('127.0.0.1', 4444))
def get_default_gateway():
with open("/proc/net/route") as fh:
for line in fh:
fields = line.strip().split()
if fields[1] != '00000000' or not int(fields[3], 16) & 2:
continue
return socket.inet_ntoa(struct.pack("<L", int(fields[2], 16)))
def get_ipv4(ifname):
try:
s = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
info = fcntl.ioctl(s.fileno(), 0x8915,
struct.pack('256s', ifname[:15]))
ip = socket.inet_ntoa(info[20:24])
mask = struct.unpack('>I', fcntl.ioctl(
s.fileno(), 0x891b, struct.pack('256s', ifname))[20:24])[0]
# Remember: not everything has to be performance optimal :-)
mask = bin(mask)[2:].count('1')
return "%s/%d" % (ip, mask)
except IOError:
return "<no ipv4>"
except:
return None
def check_network():
return os.path.exists('/sd/config/network') and "static" or "dhcp"
def check_internet():
try:
r = requests.get("http://ping.infobeamer.com/ping", timeout=10, headers={
'User-Agent': 'info-beamer syncer/network-check',
})
r.raise_for_status()
if r.content == "pong":
return "online"
else:
return "filtered"
except:
traceback.print_exc()
return "offline"
tests = [
(1, "ethip", partial(get_ipv4, "eth0")),
(1, "wlanip", partial(get_ipv4, "wlan0")),
(1, "gw", get_default_gateway),
(10, "online", check_internet),
(5, "network", check_network),
]
q = []
now = time.time()
for interval, key, val_fn in tests:
heappush(q, (now, (interval, key, val_fn)))
def run_next_test():
now = time.time()
t, test = q[0]
if now < t:
return False
interval, key, val_fn = test
heapreplace(q, (now + interval, test))
val = val_fn()
if val is not None:
send(key, val)
return True
2023-03-14 08:55:54 +00:00
if __name__ == "__main__":
while 1:
try:
fetch_info()
2023-04-17 18:49:53 +00:00
if not run_next_test():
2023-04-17 19:30:21 +00:00
time.sleep(5)
2023-03-14 08:55:54 +00:00
except:
traceback.print_exc()
2023-04-17 19:30:21 +00:00
# finally:
# time.sleep(1)