1375 lines
40 KiB
Python
1375 lines
40 KiB
Python
#
|
|
# Part of info-beamer hosted. You can find the latest version
|
|
# of this file at:
|
|
#
|
|
# https://github.com/info-beamer/package-sdk
|
|
#
|
|
# Copyright (c) 2014-2020 Florian Wesch <fw@info-beamer.com>
|
|
# All rights reserved.
|
|
#
|
|
# Redistribution and use in source and binary forms, with or without
|
|
# modification, are permitted provided that the following conditions are
|
|
# met:
|
|
#
|
|
# Redistributions of source code must retain the above copyright
|
|
# notice, this list of conditions and the following disclaimer.
|
|
#
|
|
# Redistributions in binary form must reproduce the above copyright
|
|
# notice, this list of conditions and the following disclaimer in the
|
|
# documentation and/or other materials provided with the
|
|
# distribution.
|
|
#
|
|
# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS
|
|
# IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO,
|
|
# THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR
|
|
# PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR
|
|
# CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL,
|
|
# EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO,
|
|
# PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR
|
|
# PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF
|
|
# LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING
|
|
# NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS
|
|
# SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
|
|
|
|
VERSION = "1.9"
|
|
|
|
import os, re, sys, json, time, traceback, marshal, hashlib
|
|
import errno, socket, select, threading, Queue, ctypes
|
|
import pyinotify, requests
|
|
from functools import wraps
|
|
from collections import namedtuple
|
|
from tempfile import NamedTemporaryFile
|
|
|
|
types = {}
|
|
|
|
def init_types():
|
|
def type(fn):
|
|
types[fn.__name__] = fn
|
|
return fn
|
|
|
|
@type
|
|
def color(value):
|
|
return value
|
|
|
|
@type
|
|
def string(value):
|
|
return value
|
|
|
|
@type
|
|
def text(value):
|
|
return value
|
|
|
|
@type
|
|
def section(value):
|
|
return value
|
|
|
|
@type
|
|
def boolean(value):
|
|
return value
|
|
|
|
@type
|
|
def select(value):
|
|
return value
|
|
|
|
@type
|
|
def duration(value):
|
|
return value
|
|
|
|
@type
|
|
def integer(value):
|
|
return value
|
|
|
|
@type
|
|
def float(value):
|
|
return value
|
|
|
|
@type
|
|
def font(value):
|
|
return value
|
|
|
|
@type
|
|
def device(value):
|
|
return value
|
|
|
|
@type
|
|
def resource(value):
|
|
return value
|
|
|
|
@type
|
|
def device_token(value):
|
|
return value
|
|
|
|
@type
|
|
def json(value):
|
|
return value
|
|
|
|
@type
|
|
def custom(value):
|
|
return value
|
|
|
|
@type
|
|
def date(value):
|
|
return value
|
|
|
|
init_types()
|
|
|
|
def log(msg, name='hosted.py'):
|
|
sys.stderr.write("[{}] {}\n".format(name, msg))
|
|
|
|
def abort_service(reason):
|
|
log("restarting service (%s)" % reason)
|
|
os._exit(0)
|
|
time.sleep(2)
|
|
os.kill(os.getpid(), 2)
|
|
time.sleep(2)
|
|
os.kill(os.getpid(), 15)
|
|
time.sleep(2)
|
|
os.kill(os.getpid(), 9)
|
|
time.sleep(100)
|
|
|
|
CLOCK_MONOTONIC_RAW = 4 # see <linux/time.h>
|
|
|
|
class timespec(ctypes.Structure):
|
|
_fields_ = [
|
|
('tv_sec', ctypes.c_long),
|
|
('tv_nsec', ctypes.c_long),
|
|
]
|
|
|
|
librt = ctypes.CDLL('librt.so.1')
|
|
clock_gettime = librt.clock_gettime
|
|
clock_gettime.argtypes = [ctypes.c_int, ctypes.POINTER(timespec)]
|
|
|
|
def monotonic_time():
|
|
t = timespec()
|
|
clock_gettime(CLOCK_MONOTONIC_RAW , ctypes.pointer(t))
|
|
return t.tv_sec + t.tv_nsec * 1e-9
|
|
|
|
class InfoBeamerQueryException(Exception):
|
|
pass
|
|
|
|
class InfoBeamerQuery(object):
|
|
def __init__(self, host='127.0.0.1', port=4444):
|
|
self._sock = None
|
|
self._conn = None
|
|
self._host = host
|
|
self._port = port
|
|
self._timeout = 2
|
|
self._version = None
|
|
|
|
def _reconnect(self):
|
|
if self._conn is not None:
|
|
return
|
|
try:
|
|
self._sock = socket.create_connection((self._host, self._port), self._timeout)
|
|
self._conn = self._sock.makefile()
|
|
intro = self._conn.readline()
|
|
except socket.timeout:
|
|
self._reset()
|
|
raise InfoBeamerQueryException("Timeout while reopening connection")
|
|
except socket.error as err:
|
|
self._reset()
|
|
raise InfoBeamerQueryException("Cannot connect to %s:%s: %s" % (
|
|
self._host, self._port, err))
|
|
m = re.match("^Info Beamer PI ([^ ]+)", intro)
|
|
if not m:
|
|
self._reset()
|
|
raise InfoBeamerQueryException("Invalid handshake. Not info-beamer?")
|
|
self._version = m.group(1)
|
|
|
|
def _parse_line(self):
|
|
line = self._conn.readline()
|
|
if not line:
|
|
return None
|
|
return line.rstrip()
|
|
|
|
def _parse_multi_line(self):
|
|
lines = []
|
|
while 1:
|
|
line = self._conn.readline()
|
|
if not line:
|
|
return None
|
|
line = line.rstrip()
|
|
if not line:
|
|
break
|
|
lines.append(line)
|
|
return '\n'.join(lines)
|
|
|
|
def _send_cmd(self, min_version, cmd, multiline=False):
|
|
for retry in (1, 2):
|
|
self._reconnect()
|
|
if self._version <= min_version:
|
|
raise InfoBeamerQueryException(
|
|
"This query is not implemented in your version of info-beamer. "
|
|
"%s or higher required, %s found" % (min_version, self._version)
|
|
)
|
|
try:
|
|
self._conn.write(cmd + "\n")
|
|
self._conn.flush()
|
|
response = self._parse_multi_line() if multiline else self._parse_line()
|
|
if response is None:
|
|
self._reset()
|
|
continue
|
|
return response
|
|
except socket.error:
|
|
self._reset()
|
|
continue
|
|
except socket.timeout:
|
|
self._reset()
|
|
raise InfoBeamerQueryException("Timeout waiting for response")
|
|
except Exception:
|
|
self._reset()
|
|
continue
|
|
raise InfoBeamerQueryException("Failed to get a response")
|
|
|
|
def _reset(self, close=True):
|
|
if close:
|
|
try:
|
|
if self._conn: self._conn.close()
|
|
if self._sock: self._sock.close()
|
|
except:
|
|
pass
|
|
self._conn = None
|
|
self._sock = None
|
|
|
|
@property
|
|
def addr(self):
|
|
return "%s:%s" % (self._host, self._port)
|
|
|
|
def close(self):
|
|
self._reset()
|
|
|
|
@property
|
|
def ping(self):
|
|
"tests if info-beamer is reachable"
|
|
return self._send_cmd(
|
|
"0.6", "*query/*ping",
|
|
) == "pong"
|
|
|
|
@property
|
|
def uptime(self):
|
|
"returns the uptime in seconds"
|
|
return int(self._send_cmd(
|
|
"0.6", "*query/*uptime",
|
|
))
|
|
|
|
@property
|
|
def objects(self):
|
|
"returns the number of allocated info-beamer objects"
|
|
return int(self._send_cmd(
|
|
"0.9.4", "*query/*objects",
|
|
))
|
|
|
|
@property
|
|
def version(self):
|
|
"returns the running info-beamer version"
|
|
return self._send_cmd(
|
|
"0.6", "*query/*version",
|
|
)
|
|
|
|
@property
|
|
def fps(self):
|
|
"returns the FPS of the top level node"
|
|
return float(self._send_cmd(
|
|
"0.6", "*query/*fps",
|
|
))
|
|
|
|
@property
|
|
def display(self):
|
|
"returns the display configuration"
|
|
return json.loads(self._send_cmd(
|
|
"1.0", "*query/*display",
|
|
))
|
|
|
|
ResourceUsage = namedtuple("ResourceUsage", "user_time system_time memory")
|
|
@property
|
|
def resources(self):
|
|
"returns information about used resources"
|
|
return self.ResourceUsage._make(int(v) for v in self._send_cmd(
|
|
"0.6", "*query/*resources",
|
|
).split(','))
|
|
|
|
ScreenSize = namedtuple("ScreenSize", "width height")
|
|
@property
|
|
def screen(self):
|
|
"returns the native screen size"
|
|
return self.ScreenSize._make(int(v) for v in self._send_cmd(
|
|
"0.8.1", "*query/*screen",
|
|
).split(','))
|
|
|
|
@property
|
|
def runid(self):
|
|
"returns a unique run id that changes with every restart of info-beamer"
|
|
return self._send_cmd(
|
|
"0.9.0", "*query/*runid",
|
|
)
|
|
|
|
@property
|
|
def nodes(self):
|
|
"returns a list of nodes"
|
|
nodes = self._send_cmd(
|
|
"0.9.3", "*query/*nodes",
|
|
).split(',')
|
|
return [] if not nodes[0] else nodes
|
|
|
|
class Node(object):
|
|
def __init__(self, ib, path):
|
|
self._ib = ib
|
|
self._path = path
|
|
|
|
@property
|
|
def mem(self):
|
|
"returns the Lua memory usage of this node"
|
|
return int(self._ib._send_cmd(
|
|
"0.6", "*query/*mem/%s" % self._path
|
|
))
|
|
|
|
@property
|
|
def fps(self):
|
|
"returns the framerate of this node"
|
|
return float(self._ib._send_cmd(
|
|
"0.6", "*query/*fps/%s" % self._path
|
|
))
|
|
|
|
def io(self, raw=True):
|
|
"creates a tcp connection to this node"
|
|
status = self._ib._send_cmd(
|
|
"0.6", "%s%s" % ("*raw/" if raw else '', self._path),
|
|
)
|
|
if status != 'ok!':
|
|
raise InfoBeamerQueryException("Cannot connect to node %s" % self._path)
|
|
sock = self._ib._sock
|
|
sock.settimeout(None)
|
|
return self._ib._conn
|
|
|
|
@property
|
|
def has_error(self):
|
|
"queries the error flag"
|
|
return bool(int(self._ib._send_cmd(
|
|
"0.8.2", "*query/*has_error/%s" % self._path,
|
|
)))
|
|
|
|
@property
|
|
def error(self):
|
|
"returns the last Lua traceback"
|
|
return self._ib._send_cmd(
|
|
"0.8.2", "*query/*error/%s" % self._path, multiline=True
|
|
)
|
|
|
|
def __repr__(self):
|
|
return "%s/%s" % (self._ib, self._path)
|
|
|
|
def node(self, node):
|
|
return self.Node(self, node)
|
|
|
|
def __repr__(self):
|
|
return "<info-beamer@%s>" % self.addr
|
|
|
|
|
|
class Configuration(object):
|
|
def __init__(self):
|
|
self._restart = False
|
|
self._options = []
|
|
self._config = {}
|
|
self._parsed = {}
|
|
self.parse_node_json(do_update=False)
|
|
self.parse_config_json()
|
|
|
|
def restart_on_update(self):
|
|
log("going to restart when config is updated")
|
|
self._restart = True
|
|
|
|
def parse_node_json(self, do_update=True):
|
|
with open("node.json") as f:
|
|
self._options = json.load(f).get('options', [])
|
|
if do_update:
|
|
self.update_config()
|
|
|
|
def parse_config_json(self, do_update=True):
|
|
with open("config.json") as f:
|
|
self._config = json.load(f)
|
|
if do_update:
|
|
self.update_config()
|
|
|
|
def update_config(self):
|
|
if self._restart:
|
|
return abort_service("restart_on_update set")
|
|
|
|
def parse_recursive(options, config, target):
|
|
# print 'parsing', config
|
|
for option in options:
|
|
if not 'name' in option:
|
|
continue
|
|
if option['type'] == 'list':
|
|
items = []
|
|
for item in config[option['name']]:
|
|
parsed = {}
|
|
parse_recursive(option['items'], item, parsed)
|
|
items.append(parsed)
|
|
target[option['name']] = items
|
|
continue
|
|
target[option['name']] = types[option['type']](config[option['name']])
|
|
|
|
parsed = {}
|
|
parse_recursive(self._options, self._config, parsed)
|
|
log("updated config")
|
|
self._parsed = parsed
|
|
|
|
@property
|
|
def raw(self):
|
|
return self._config
|
|
|
|
@property
|
|
def metadata(self):
|
|
return self._config['__metadata']
|
|
|
|
def __getitem__(self, key):
|
|
return self._parsed[key]
|
|
|
|
def __getattr__(self, key):
|
|
return self._parsed[key]
|
|
|
|
def setup_inotify(configuration):
|
|
class EventHandler(pyinotify.ProcessEvent):
|
|
def process_default(self, event):
|
|
basename = os.path.basename(event.pathname)
|
|
if basename == 'node.json':
|
|
log("node.json changed")
|
|
configuration.parse_node_json()
|
|
elif basename == 'config.json':
|
|
log("config.json changed!")
|
|
configuration.parse_config_json()
|
|
elif basename.endswith('.py'):
|
|
abort_service("python file changed")
|
|
|
|
wm = pyinotify.WatchManager()
|
|
|
|
notifier = pyinotify.ThreadedNotifier(wm, EventHandler())
|
|
notifier.daemon = True
|
|
notifier.start()
|
|
|
|
wm.add_watch('.', pyinotify.IN_MOVED_TO)
|
|
|
|
class RPC(object):
|
|
def __init__(self, path, callbacks):
|
|
self._path = path
|
|
self._callbacks = callbacks
|
|
self._lock = threading.Lock()
|
|
self._con = None
|
|
thread = threading.Thread(target=self._listen_thread)
|
|
thread.daemon = True
|
|
thread.start()
|
|
|
|
def _get_connection(self):
|
|
if self._con is None:
|
|
try:
|
|
self._con = InfoBeamerQuery().node(
|
|
self._path + "/rpc/python"
|
|
).io(raw=True)
|
|
except InfoBeamerQueryException:
|
|
return None
|
|
return self._con
|
|
|
|
def _close_connection(self):
|
|
with self._lock:
|
|
if self._con:
|
|
try:
|
|
self._con.close()
|
|
except:
|
|
pass
|
|
self._con = None
|
|
|
|
def _send(self, line):
|
|
with self._lock:
|
|
con = self._get_connection()
|
|
if con is None:
|
|
return
|
|
try:
|
|
con.write(line + '\n')
|
|
con.flush()
|
|
return True
|
|
except:
|
|
self._close_connection()
|
|
return False
|
|
|
|
def _recv(self):
|
|
with self._lock:
|
|
con = self._get_connection()
|
|
try:
|
|
return con.readline()
|
|
except:
|
|
self._close_connection()
|
|
|
|
def _listen_thread(self):
|
|
while 1:
|
|
line = self._recv()
|
|
if not line:
|
|
self._close_connection()
|
|
time.sleep(0.5)
|
|
continue
|
|
try:
|
|
args = json.loads(line)
|
|
method = args.pop(0)
|
|
callback = self._callbacks.get(method)
|
|
if callback:
|
|
callback(*args)
|
|
else:
|
|
log("callback '%s' not found" % (method,))
|
|
except:
|
|
traceback.print_exc()
|
|
|
|
def register(self, name, fn):
|
|
self._callbacks[name] = fn
|
|
|
|
def call(self, fn):
|
|
self.register(fn.__name__, fn)
|
|
|
|
def __getattr__(self, method):
|
|
def call(*args):
|
|
args = list(args)
|
|
args.insert(0, method)
|
|
return self._send(json.dumps(
|
|
args,
|
|
ensure_ascii=False,
|
|
separators=(',',':'),
|
|
).encode('utf8'))
|
|
return call
|
|
|
|
class Cache(object):
|
|
def __init__(self, scope='default'):
|
|
self._touched = set()
|
|
self._prefix = 'cache-%s-' % scope
|
|
|
|
def key_to_fname(self, key):
|
|
return self._prefix + hashlib.md5(key).hexdigest()
|
|
|
|
def has(self, key, max_age=None):
|
|
try:
|
|
stat = os.stat(self.key_to_fname(key))
|
|
if max_age is not None:
|
|
now = time.time()
|
|
if now > stat.st_mtime + max_age:
|
|
return False
|
|
return True
|
|
except:
|
|
return False
|
|
|
|
def get(self, key, max_age=None):
|
|
try:
|
|
with open(self.file_ref(key)) as f:
|
|
if max_age is not None:
|
|
stat = os.fstat(f.fileno())
|
|
now = time.time()
|
|
if now > stat.st_mtime + max_age:
|
|
return None
|
|
return f.read()
|
|
except:
|
|
return None
|
|
|
|
def get_json(self, key, max_age=None):
|
|
data = self.get(key, max_age)
|
|
if data is None:
|
|
return None
|
|
return json.loads(data)
|
|
|
|
def set(self, key, value):
|
|
with open(self.file_ref(key), "wb") as f:
|
|
f.write(value)
|
|
|
|
def set_json(self, key, data):
|
|
self.set(key, json.dumps(data))
|
|
|
|
def file_ref(self, key):
|
|
fname = self.key_to_fname(key)
|
|
self._touched.add(fname)
|
|
return fname
|
|
|
|
def start(self):
|
|
self._touched = set()
|
|
|
|
def prune(self):
|
|
existing = set()
|
|
for fname in os.listdir("."):
|
|
if not fname.startswith(self._prefix):
|
|
continue
|
|
existing.add(fname)
|
|
prunable = existing - self._touched
|
|
for fname in prunable:
|
|
try:
|
|
log("pruning %s" % fname)
|
|
os.unlink(fname)
|
|
except:
|
|
pass
|
|
|
|
def clear(self):
|
|
self.start()
|
|
self.prune()
|
|
|
|
def call(self, max_age=None):
|
|
def deco(fn):
|
|
@wraps(fn)
|
|
def wrapper(*args, **kwargs):
|
|
key = marshal.dumps((fn.__name__, args, kwargs), 2)
|
|
cached = self.get(key, max_age)
|
|
if cached is not None:
|
|
return marshal.loads(cached)
|
|
val = fn(*args, **kwargs)
|
|
self.set(key, marshal.dumps(val, 2))
|
|
return val
|
|
return wrapper
|
|
return deco
|
|
|
|
def file_producer(self, max_age=None):
|
|
def deco(fn):
|
|
@wraps(fn)
|
|
def wrapper(*args, **kwargs):
|
|
key = marshal.dumps((fn.__name__, args, kwargs), 2)
|
|
if self.has(key, max_age):
|
|
return self.file_ref(key)
|
|
val = fn(*args, **kwargs)
|
|
if val is None:
|
|
return None
|
|
self.set(key, val)
|
|
return self.file_ref(key)
|
|
return wrapper
|
|
return deco
|
|
|
|
class Node(object):
|
|
def __init__(self, node):
|
|
self._node = node
|
|
self._sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
|
|
|
|
def send_raw(self, raw):
|
|
log("sending %r" % (raw,))
|
|
self._sock.sendto(raw, ('127.0.0.1', 4444))
|
|
|
|
def send(self, data):
|
|
self.send_raw(self._node + data)
|
|
|
|
def send_json(self, path, data):
|
|
self.send('%s:%s' % (path, json.dumps(
|
|
data,
|
|
ensure_ascii=False,
|
|
separators=(',',':'),
|
|
).encode('utf8')))
|
|
|
|
@property
|
|
def is_top_level(self):
|
|
return self._node == "root"
|
|
|
|
@property
|
|
def path(self):
|
|
return self._node
|
|
|
|
def write_file(self, filename, content):
|
|
f = NamedTemporaryFile(prefix='.hosted-py-tmp', dir=os.getcwd())
|
|
try:
|
|
f.write(content)
|
|
except:
|
|
traceback.print_exc()
|
|
f.close()
|
|
raise
|
|
else:
|
|
f.delete = False
|
|
f.close()
|
|
os.rename(f.name, filename)
|
|
|
|
def write_json(self, filename, data):
|
|
self.write_file(filename, json.dumps(
|
|
data,
|
|
ensure_ascii=False,
|
|
separators=(',',':'),
|
|
).encode('utf8'))
|
|
|
|
class Sender(object):
|
|
def __init__(self, node, path):
|
|
self._node = node
|
|
self._path = path
|
|
|
|
def __call__(self, data):
|
|
if isinstance(data, (dict, list)):
|
|
raw = "%s:%s" % (self._path, json.dumps(
|
|
data,
|
|
ensure_ascii=False,
|
|
separators=(',',':'),
|
|
).encode('utf8'))
|
|
else:
|
|
raw = "%s:%s" % (self._path, data)
|
|
self._node.send_raw(raw)
|
|
|
|
def __getitem__(self, path):
|
|
return self.Sender(self, self._node + path)
|
|
|
|
def __call__(self, data):
|
|
return self.Sender(self, self._node)(data)
|
|
|
|
def connect(self, suffix=""):
|
|
ib = InfoBeamerQuery()
|
|
return ib.node(self.path + suffix).io(raw=True)
|
|
|
|
def rpc(self, **callbacks):
|
|
return RPC(self.path, callbacks)
|
|
|
|
def cache(self, scope='default'):
|
|
return Cache(scope)
|
|
|
|
def scratch_cached(self, filename, generator):
|
|
cached = os.path.join(os.environ['SCRATCH'], filename)
|
|
|
|
if not os.path.exists(cached):
|
|
f = NamedTemporaryFile(prefix='scratch-cached-tmp', dir=os.environ['SCRATCH'])
|
|
try:
|
|
generator(f)
|
|
except:
|
|
raise
|
|
else:
|
|
f.delete = False
|
|
f.close()
|
|
os.rename(f.name, cached)
|
|
|
|
if os.path.exists(filename):
|
|
try:
|
|
os.unlink(filename)
|
|
except:
|
|
pass
|
|
os.symlink(cached, filename)
|
|
|
|
class APIError(Exception):
|
|
pass
|
|
|
|
class APIProxy(object):
|
|
def __init__(self, apis, api_name):
|
|
self._apis = apis
|
|
self._api_name = api_name
|
|
|
|
@property
|
|
def url(self):
|
|
index = self._apis.get_api_index()
|
|
if not self._api_name in index:
|
|
raise APIError("api '%s' not available" % (self._api_name,))
|
|
return index[self._api_name]['url']
|
|
|
|
def unwrap(self, r):
|
|
r.raise_for_status()
|
|
if r.status_code == 304:
|
|
return None
|
|
if r.headers['content-type'] == 'application/json':
|
|
resp = r.json()
|
|
if not resp['ok']:
|
|
raise APIError(u"api call failed: %s" % (
|
|
resp.get('error', '<unknown error>'),
|
|
))
|
|
return resp.get(self._api_name)
|
|
else:
|
|
return r.content
|
|
|
|
def add_default_args(self, kwargs):
|
|
if not 'timeout' in kwargs:
|
|
kwargs['timeout'] = 10
|
|
return kwargs
|
|
|
|
def get(self, **kwargs):
|
|
try:
|
|
return self.unwrap(self._apis.session.get(
|
|
url = self.url,
|
|
**self.add_default_args(kwargs)
|
|
))
|
|
except APIError:
|
|
raise
|
|
except Exception as err:
|
|
raise APIError(err)
|
|
|
|
def post(self, **kwargs):
|
|
try:
|
|
return self.unwrap(self._apis.session.post(
|
|
url = self.url,
|
|
**self.add_default_args(kwargs)
|
|
))
|
|
except APIError:
|
|
raise
|
|
except Exception as err:
|
|
raise APIError(err)
|
|
|
|
def delete(self, **kwargs):
|
|
try:
|
|
return self.unwrap(self._apis.session.delete(
|
|
url = self.url,
|
|
**self.add_default_args(kwargs)
|
|
))
|
|
except APIError:
|
|
raise
|
|
except Exception as err:
|
|
raise APIError(err)
|
|
|
|
|
|
class OnDeviceAPIs(object):
|
|
def __init__(self, config):
|
|
self._config = config
|
|
self._index = None
|
|
self._valid_until = 0
|
|
self._lock = threading.Lock()
|
|
self._session = requests.Session()
|
|
self._session.headers.update({
|
|
'User-Agent': 'hosted.py version/%s' % (VERSION,)
|
|
})
|
|
|
|
def update_apis(self):
|
|
log("fetching api index")
|
|
r = self._session.get(
|
|
url = self._config.metadata['api'],
|
|
timeout = 5,
|
|
)
|
|
r.raise_for_status()
|
|
resp = r.json()
|
|
if not resp['ok']:
|
|
raise APIError("cannot retrieve api index")
|
|
self._index = resp['apis']
|
|
self._valid_until = resp['valid_until'] - 300
|
|
|
|
def get_api_index(self):
|
|
with self._lock:
|
|
now = time.time()
|
|
if now > self._valid_until:
|
|
self.update_apis()
|
|
return self._index
|
|
|
|
@property
|
|
def session(self):
|
|
return self._session
|
|
|
|
def list(self):
|
|
try:
|
|
index = self.get_api_index()
|
|
return sorted(index.keys())
|
|
except Exception as err:
|
|
raise APIError(err)
|
|
|
|
def __getitem__(self, api_name):
|
|
return APIProxy(self, api_name)
|
|
|
|
def __getattr__(self, api_name):
|
|
return APIProxy(self, api_name)
|
|
|
|
class HostedAPI(object):
|
|
def __init__(self, api, on_device_token):
|
|
self._api = api
|
|
self._on_device_token = on_device_token
|
|
self._lock = threading.Lock()
|
|
self._next_refresh = 0
|
|
self._api_key = None
|
|
self._uses = 0
|
|
self._expire = 0
|
|
self._base_url = None
|
|
self._session = requests.Session()
|
|
self._session.headers.update({
|
|
'User-Agent': 'hosted.py version/%s - on-device' % (VERSION,)
|
|
})
|
|
|
|
def use_api_key(self):
|
|
with self._lock:
|
|
now = time.time()
|
|
self._uses -= 1
|
|
if self._uses <= 0:
|
|
log('hosted API adhoc key used up')
|
|
self._api_key = None
|
|
elif now > self._expire:
|
|
log('hosted API adhoc key expired')
|
|
self._api_key = None
|
|
else:
|
|
log('hosted API adhoc key usage: %d uses, %ds left' %(
|
|
self._uses, self._expire - now
|
|
))
|
|
if self._api_key is None:
|
|
if time.time() < self._next_refresh:
|
|
return None
|
|
log('refreshing hosted API adhoc key')
|
|
self._next_refresh = time.time() + 15
|
|
try:
|
|
r = self._api['api_key'].get(
|
|
params = dict(
|
|
on_device_token = self._on_device_token
|
|
),
|
|
timeout = 5,
|
|
)
|
|
except:
|
|
return None
|
|
self._api_key = r['api_key']
|
|
self._uses = r['uses']
|
|
self._expire = now + r['expire'] - 1
|
|
self._base_url = r['base_url']
|
|
return self._api_key
|
|
|
|
def add_default_args(self, kwargs):
|
|
if not 'timeout' in kwargs:
|
|
kwargs['timeout'] = 10
|
|
return kwargs
|
|
|
|
def ensure_api_key(self, kwargs):
|
|
api_key = self.use_api_key()
|
|
if api_key is None:
|
|
raise APIError('cannot retrieve API key')
|
|
kwargs['auth'] = ('', api_key)
|
|
|
|
def get(self, endpoint, **kwargs):
|
|
try:
|
|
self.ensure_api_key(kwargs)
|
|
r = self._session.get(
|
|
url = self._base_url + endpoint,
|
|
**self.add_default_args(kwargs)
|
|
)
|
|
r.raise_for_status()
|
|
return r.json()
|
|
except APIError:
|
|
raise
|
|
except Exception as err:
|
|
raise APIError(err)
|
|
|
|
def post(self, endpoint, **kwargs):
|
|
try:
|
|
self.ensure_api_key(kwargs)
|
|
r = self._session.post(
|
|
url = self._base_url + endpoint,
|
|
**self.add_default_args(kwargs)
|
|
)
|
|
r.raise_for_status()
|
|
return r.json()
|
|
except APIError:
|
|
raise
|
|
except Exception as err:
|
|
raise APIError(err)
|
|
|
|
def delete(self, endpoint, **kwargs):
|
|
try:
|
|
self.ensure_api_key(kwargs)
|
|
r = self._session.delete(
|
|
url = self._base_url + endpoint,
|
|
**self.add_default_args(kwargs)
|
|
)
|
|
r.raise_for_status()
|
|
return r.json()
|
|
except APIError:
|
|
raise
|
|
except Exception as err:
|
|
raise APIError(err)
|
|
|
|
class DeviceKV(object):
|
|
def __init__(self, api):
|
|
self._api = api
|
|
self._cache = {}
|
|
self._cache_complete = False
|
|
self._use_cache = True
|
|
|
|
def cache_enabled(self, enabled):
|
|
self._use_cache = enabled
|
|
self._cache = {}
|
|
self._cache_complete = False
|
|
|
|
def __setitem__(self, key, value):
|
|
if self._use_cache:
|
|
if key in self._cache and self._cache[key] == value:
|
|
return
|
|
self._api['kv'].post(
|
|
data = {
|
|
key: value
|
|
}
|
|
)
|
|
if self._use_cache:
|
|
self._cache[key] = value
|
|
|
|
def __getitem__(self, key):
|
|
if self._use_cache:
|
|
if key in self._cache:
|
|
return self._cache[key]
|
|
result = self._api['kv'].get(
|
|
params = dict(
|
|
keys = key,
|
|
),
|
|
timeout = 5,
|
|
)['v']
|
|
if key not in result:
|
|
raise KeyError(key)
|
|
value = result[key]
|
|
if self._use_cache:
|
|
self._cache[key] = value
|
|
return value
|
|
|
|
# http api cannot reliably determine if a key has
|
|
# been deleted, so __delitem__ always succeeds and
|
|
# does not throw KeyError for missing keys.
|
|
def __delitem__(self, key):
|
|
if self._use_cache and self._cache_complete:
|
|
if key not in self._cache:
|
|
return
|
|
self._api['kv'].delete(
|
|
params = dict(
|
|
keys = key,
|
|
),
|
|
timeout = 5,
|
|
)
|
|
if self._use_cache and key in self._cache:
|
|
if key in self._cache:
|
|
del self._cache[key]
|
|
|
|
def update(self, dct):
|
|
if self._use_cache:
|
|
for key, value in dct.items():
|
|
if key in self._cache and self._cache[key] == value:
|
|
dct.pop(key)
|
|
if not dct:
|
|
return
|
|
self._api['kv'].post(
|
|
data = dct
|
|
)
|
|
if self._use_cache:
|
|
for key, value in dct.iteritems():
|
|
self._cache[key] = value
|
|
|
|
def get(self, key, default=None):
|
|
try:
|
|
return self[key]
|
|
except KeyError:
|
|
return default
|
|
|
|
def items(self):
|
|
if self._use_cache and self._cache_complete:
|
|
return self._cache.items()
|
|
result = self._api['kv'].get(
|
|
timeout = 5,
|
|
)['v']
|
|
if self._use_cache:
|
|
for key, value in result.iteritems():
|
|
self._cache[key] = value
|
|
self._cache_complete = True
|
|
return result.items()
|
|
|
|
iteritems = items
|
|
|
|
def clear(self):
|
|
self._api['kv'].delete()
|
|
if self._use_cache:
|
|
self._cache = {}
|
|
self._cache_complete = False
|
|
|
|
class GPIO(object):
|
|
def __init__(self):
|
|
self._pin_fd = {}
|
|
self._state = {}
|
|
self._fd_2_pin = {}
|
|
self._poll = select.poll()
|
|
self._lock = threading.Lock()
|
|
|
|
def setup_pin(self, pin, direction="in", invert=False):
|
|
if not os.path.exists("/sys/class/gpio/gpio%d" % pin):
|
|
with open("/sys/class/gpio/export", "wb") as f:
|
|
f.write(str(pin))
|
|
# mdev is giving the newly create GPIO directory correct permissions.
|
|
for i in range(10):
|
|
try:
|
|
with open("/sys/class/gpio/gpio%d/active_low" % pin, "wb") as f:
|
|
f.write("1" if invert else "0")
|
|
break
|
|
except IOError as err:
|
|
if err.errno != errno.EACCES:
|
|
raise
|
|
time.sleep(0.1)
|
|
log("waiting for GPIO permissions")
|
|
else:
|
|
raise IOError(errno.EACCES, "Cannot access GPIO")
|
|
with open("/sys/class/gpio/gpio%d/direction" % pin, "wb") as f:
|
|
f.write(direction)
|
|
|
|
def set_pin_value(self, pin, high):
|
|
with open("/sys/class/gpio/gpio%d/value" % pin, "wb") as f:
|
|
f.write("1" if high else "0")
|
|
|
|
def monitor(self, pin, invert=False):
|
|
if pin in self._pin_fd:
|
|
return
|
|
self.setup_pin(pin, direction="in", invert=invert)
|
|
with open("/sys/class/gpio/gpio%d/edge" % pin, "wb") as f:
|
|
f.write("both")
|
|
fd = os.open("/sys/class/gpio/gpio%d/value" % pin, os.O_RDONLY)
|
|
self._state[pin] = bool(int(os.read(fd, 5)))
|
|
self._fd_2_pin[fd] = pin
|
|
self._pin_fd[pin] = fd
|
|
self._poll.register(fd, select.POLLPRI | select.POLLERR)
|
|
|
|
def poll(self, timeout=1000):
|
|
changes = []
|
|
for fd, evt in self._poll.poll(timeout):
|
|
os.lseek(fd, 0, 0)
|
|
state = bool(int(os.read(fd, 5)))
|
|
pin = self._fd_2_pin[fd]
|
|
with self._lock:
|
|
prev_state, self._state[pin] = self._state[pin], state
|
|
if state != prev_state:
|
|
changes.append((pin, state))
|
|
return changes
|
|
|
|
def poll_forever(self):
|
|
while 1:
|
|
for event in self.poll():
|
|
yield event
|
|
|
|
def on(self, pin):
|
|
with self._lock:
|
|
return self._state.get(pin, False)
|
|
|
|
class SyncerAPI(object):
|
|
def __init__(self):
|
|
self._session = requests.Session()
|
|
|
|
def unwrap(self, r):
|
|
r.raise_for_status()
|
|
return r.json()
|
|
|
|
def get(self, path, params={}):
|
|
return self.unwrap(self._session.get(
|
|
'http://127.0.0.1:81%s' % path,
|
|
params=params, timeout=10
|
|
))
|
|
|
|
def post(self, path, data={}):
|
|
return self.unwrap(self._session.post(
|
|
'http://127.0.0.1:81%s' % path,
|
|
data=data, timeout=10
|
|
))
|
|
|
|
class ProofOfPlay(object):
|
|
def __init__(self, api, dirname):
|
|
self._api = api
|
|
self._prefix = os.path.join(os.environ['SCRATCH'], dirname)
|
|
try:
|
|
os.makedirs(self._prefix)
|
|
except:
|
|
pass
|
|
|
|
pop_info = self._api.pop.get()
|
|
|
|
self._max_delay = pop_info['max_delay']
|
|
self._max_lines = pop_info['max_lines']
|
|
self._submission_min_delay = pop_info['submission']['min_delay']
|
|
self._submission_error_delay = pop_info['submission']['error_delay']
|
|
|
|
self._q = Queue.Queue()
|
|
self._log = None
|
|
|
|
thread = threading.Thread(target=self._submit_thread)
|
|
thread.daemon = True
|
|
thread.start()
|
|
|
|
thread = threading.Thread(target=self._writer_thread)
|
|
thread.daemon = True
|
|
thread.start()
|
|
|
|
def _submit(self, fname, queue_size):
|
|
with open(fname, 'rb') as f:
|
|
return self._api.pop.post(
|
|
timeout = 10,
|
|
data = {
|
|
'queue_size': queue_size,
|
|
},
|
|
files={
|
|
'pop-v1': f,
|
|
}
|
|
)
|
|
|
|
def _submit_thread(self):
|
|
time.sleep(3)
|
|
while 1:
|
|
delay = self._submission_min_delay
|
|
try:
|
|
log('[pop][submit] gathering files')
|
|
files = [
|
|
fname for fname
|
|
in os.listdir(self._prefix)
|
|
if fname.startswith('submit-')
|
|
]
|
|
log('[pop][submit] %d files' % len(files))
|
|
for fname in files:
|
|
fullname = os.path.join(self._prefix, fname)
|
|
if os.stat(fullname).st_size == 0:
|
|
os.unlink(fullname)
|
|
continue
|
|
try:
|
|
log('[pop][submit] submitting %s' % fullname)
|
|
status = self._submit(fullname, len(files))
|
|
if status['disabled']:
|
|
log('[pop][submit] WARNING: Proof of Play disabled for this device. Submission discarded')
|
|
else:
|
|
log('[pop][submit] success')
|
|
except APIError as err:
|
|
log('[pop][submit] failure to submit log %s: %s' % (
|
|
fullname, err
|
|
))
|
|
delay = self._submission_error_delay
|
|
break
|
|
os.unlink(fullname)
|
|
break
|
|
if not files:
|
|
delay = 10
|
|
except Exception as err:
|
|
log('[pop][submit] error: %s' % err)
|
|
log('[pop][submit] sleeping %ds' % delay)
|
|
time.sleep(delay)
|
|
|
|
def reopen_log(self):
|
|
log_name = os.path.join(self._prefix, 'current.log')
|
|
if self._log is not None:
|
|
self._log.close()
|
|
self._log = None
|
|
if os.path.exists(log_name):
|
|
os.rename(log_name, os.path.join(
|
|
self._prefix, 'submit-%s.log' % os.urandom(16).encode('hex')
|
|
))
|
|
self._log = open(log_name, 'wb')
|
|
return self._log
|
|
|
|
def _writer_thread(self):
|
|
submit, log_file, lines = monotonic_time() + self._max_delay, self.reopen_log(), 0
|
|
while 1:
|
|
reopen = False
|
|
max_wait = max(0.1, submit - monotonic_time())
|
|
log('[pop] got %d lines. waiting %ds for more log lines' % (lines, max_wait))
|
|
try:
|
|
line = self._q.get(block=True, timeout=max_wait)
|
|
log_file.write(line + '\n')
|
|
log_file.flush()
|
|
os.fsync(log_file.fileno())
|
|
lines += 1
|
|
log('[pop] line added: %r' % line)
|
|
except Queue.Empty:
|
|
if lines == 0:
|
|
submit += self._max_delay # extend deadline
|
|
else:
|
|
reopen = True
|
|
except Exception as err:
|
|
log("[pop] error writing pop log line")
|
|
if lines >= self._max_lines:
|
|
reopen = True
|
|
if reopen:
|
|
log('[pop] closing log of %d lines' % lines)
|
|
submit, log_file, lines = monotonic_time() + self._max_delay, self.reopen_log(), 0
|
|
|
|
def log(self, play_start, duration, asset_id, asset_filename):
|
|
uuid = "%08x%s" % (
|
|
time.time(), os.urandom(12).encode('hex')
|
|
)
|
|
self._q.put(json.dumps([
|
|
uuid,
|
|
play_start,
|
|
duration,
|
|
0 if asset_id is None else asset_id,
|
|
asset_filename,
|
|
],
|
|
ensure_ascii = False,
|
|
separators = (',',':'),
|
|
).encode('utf8'))
|
|
|
|
class Device(object):
|
|
def __init__(self, kv, api):
|
|
self._socket = None
|
|
self._gpio = GPIO()
|
|
self._kv = kv
|
|
self._api = api
|
|
|
|
@property
|
|
def kv(self):
|
|
return self._kv
|
|
|
|
@property
|
|
def gpio(self):
|
|
return self._gpio
|
|
|
|
@property
|
|
def serial(self):
|
|
return os.environ['SERIAL']
|
|
|
|
@property
|
|
def screen_resolution(self):
|
|
with open("/sys/class/graphics/fb0/virtual_size", "rb") as f:
|
|
return [int(val) for val in f.read().strip().split(',')]
|
|
|
|
@property
|
|
def screen_w(self):
|
|
return self.screen_resolution[0]
|
|
|
|
@property
|
|
def screen_h(self):
|
|
return self.screen_resolution[1]
|
|
|
|
@property
|
|
def syncer_api(self):
|
|
return SyncerAPI()
|
|
|
|
def ensure_connected(self):
|
|
if self._socket:
|
|
return True
|
|
try:
|
|
log("establishing upstream connection")
|
|
self._socket = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM)
|
|
self._socket.connect(os.getenv('SYNCER_SOCKET', "/tmp/syncer"))
|
|
return True
|
|
except Exception as err:
|
|
log("cannot connect to upstream socket: %s" % (err,))
|
|
return False
|
|
|
|
def send_raw(self, raw):
|
|
try:
|
|
if self.ensure_connected():
|
|
self._socket.send(raw + '\n')
|
|
except Exception as err:
|
|
log("cannot send to upstream: %s" % (err,))
|
|
if self._socket:
|
|
self._socket.close()
|
|
self._socket = None
|
|
|
|
def send_upstream(self, **data):
|
|
self.send_raw(json.dumps(data))
|
|
|
|
def turn_screen_off(self):
|
|
self.send_raw("tv off")
|
|
|
|
def turn_screen_on(self):
|
|
self.send_raw("tv on")
|
|
|
|
def screen(self, on=True):
|
|
if on:
|
|
self.turn_screen_on()
|
|
else:
|
|
self.turn_screen_off()
|
|
|
|
def reboot(self):
|
|
self.send_raw("system reboot")
|
|
|
|
def halt_until_powercycled(self):
|
|
self.send_raw("system halt")
|
|
|
|
def restart_infobeamer(self):
|
|
self.send_raw("infobeamer restart")
|
|
|
|
def verify_cache(self):
|
|
self.send_raw("syncer verify_cache")
|
|
|
|
def pop(self, dirname='pop'):
|
|
return ProofOfPlay(self._api, dirname)
|
|
|
|
def hosted_api(self, on_device_token):
|
|
return HostedAPI(self._api, on_device_token)
|
|
|
|
if __name__ == "__main__":
|
|
print("nothing to do here")
|
|
sys.exit(1)
|
|
else:
|
|
log("starting version %s" % (VERSION,))
|
|
|
|
node = NODE = Node(os.environ['NODE'])
|
|
config = CONFIG = Configuration()
|
|
api = API = OnDeviceAPIs(CONFIG)
|
|
device = DEVICE = Device(
|
|
kv = DeviceKV(api),
|
|
api = api,
|
|
)
|
|
|
|
setup_inotify(CONFIG)
|
|
log("ready to go!")
|