infobeamer-testcard/service
2023-04-17 22:02:35 +02:00

137 lines
3.4 KiB
Python

#!/usr/bin/python
import os
import sys
import fcntl
import socket
import struct
import requests
import time
import traceback
from functools import partial
from heapq import heappush, heapreplace
from hosted import api, node
sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
def fetch_info():
info = api.device_info.get()
print >> sys.stderr, info
node["/device_info"](
dict(
description=info["description"],
location=info["location"],
)
)
def send(key, val):
sys.stderr.write("sending %s->%s\n" % (key, val))
sock.sendto('testcard/update/%s:%s' % (key, val), ('127.0.0.1', 4444))
def get_default_gateway():
with open("/proc/net/route") as fh:
for line in fh:
fields = line.strip().split()
if fields[1] != '00000000' or not int(fields[3], 16) & 2:
continue
return socket.inet_ntoa(struct.pack("<L", int(fields[2], 16)))
def get_ipv4(ifname):
try:
s = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
info = fcntl.ioctl(s.fileno(), 0x8915,
struct.pack('256s', ifname[:15]))
ip = socket.inet_ntoa(info[20:24])
mask = struct.unpack('>I', fcntl.ioctl(
s.fileno(), 0x891b, struct.pack('256s', ifname))[20:24])[0]
# Remember: not everything has to be performance optimal :-)
mask = bin(mask)[2:].count('1')
return "%s/%d" % (ip, mask)
except IOError:
return "<no ipv4>"
except:
return None
def get_ipv6(ifname):
try:
s = socket.socket(socket.AF_INET6, socket.SOCK_DGRAM)
info = fcntl.ioctl(s.fileno(), 0x8915,
struct.pack('256s', ifname[:15]))
ip = socket.inet_ntoa(info[20:24])
mask = struct.unpack('>I', fcntl.ioctl(
s.fileno(), 0x891b, struct.pack('256s', ifname))[20:24])[0]
# Remember: not everything has to be performance optimal :-)
mask = bin(mask)[2:].count('1')
return "%s/%d" % (ip, mask)
except IOError:
return "<no ipv6>"
except:
return None
def check_network():
return os.path.exists('/sd/config/network') and "static" or "dhcp"
def check_internet():
try:
r = requests.get("http://ping.infobeamer.com/ping", timeout=10, headers={
'User-Agent': 'info-beamer syncer/network-check',
})
r.raise_for_status()
if r.content == "pong":
return "online"
else:
return "filtered"
except:
traceback.print_exc()
return "offline"
tests = [
(1, "ethipv4", partial(get_ipv4, "eth0")),
(1, "ethipv6", partial(get_ipv6, "eth0")),
(1, "wlanipv4", partial(get_ipv4, "wlan0")),
(1, "wlanipv6", partial(get_ipv6, "wlan0")),
(1, "gw", get_default_gateway),
(10, "online", check_internet),
(5, "network", check_network),
]
q = []
now = time.time()
for interval, key, val_fn in tests:
heappush(q, (now, (interval, key, val_fn)))
def run_next_test():
now = time.time()
t, test = q[0]
if now < t:
return False
interval, key, val_fn = test
heapreplace(q, (now + interval, test))
val = val_fn()
if val is not None:
send(key, val)
return True
if __name__ == "__main__":
while 1:
try:
fetch_info()
if not run_next_test():
time.sleep(0.5)
except:
traceback.print_exc()