#!/usr/bin/python3 # coding: utf-8 # aurbot - Archlinux User Repository Builder Bot # Copyright © 2014 Sébastien Luttringer # # Started, October 30th 2011 # # This program is free software; you can redistribute it and/or # modify it under the terms of the GNU General Public License # as published by the Free Software Foundation; either version 2 # of the License, or (at your option) any later version. # # This program is distributed in the hope that it will be useful, # but WITHOUT ANY WARRANTY; without even the implied warranty of # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the # GNU General Public License for more details. # # You should have received a copy of the GNU General Public License # along with this program; if not, write to the Free Software # Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA. from argparse import ArgumentParser from configparser import ConfigParser from json import load as jload, dump as jdump, loads as jloads from logging import StreamHandler, getLogger, Formatter, DEBUG, INFO from logging import debug, warning, info, error from os import chdir, environ, getcwd from os.path import join from subprocess import check_call from tarfile import open as tar from tempfile import TemporaryDirectory from time import sleep from urllib.request import urlopen, Request from xdg.BaseDirectory import save_config_path, save_cache_path from systemd.daemon import notify AUR_URL = 'https://aur.archlinux.org' USER_AGENT = "aurbot" XDG_DIRECTORY = "aurbot" ERR_USAGE = 1 ERR_FATAL = 2 ERR_ABORT = 3 ERR_UNKNOWN = 4 class ABFormatter(Formatter): ''' Customer logging formater ''' def __init__(self, fmt="[%(levelname)s] %(msg)s"): Formatter.__init__(self, fmt) def format(self, record): format_orig = self._style._fmt if record.levelno == INFO and getLogger(record.name).getEffectiveLevel() != DEBUG: self._style._fmt = "%(msg)s" result = Formatter.format(self, record) self._style._fmt = format_orig return result class AURPackage(dict): ''' Abstract AUR package action ''' def __init__(self, name, timeout=None): self.name = name debug("getting %s aur infos" % self.name) url = "%s/rpc.php?type=info&arg=%s" % (AUR_URL, name) url_req = Request(url, headers={"User-Agent": USER_AGENT}) debug("Requesting url: %s" % url) debug("Timeout is %s" % timeout) url_fd = urlopen(url_req, timeout=timeout) d = jloads(url_fd.read().decode("utf-8")) if d["version"] != 1: raise Exception("Unknown AUR Backend version") self._info = d["results"] def __getattr__(self, name): for k, v in self._info.items(): if name == k.lower(): return v raise AttributeError() def __repr__(self): return "%s %s" % (self.name, self.version) def extract(self, path): ''' Extract aur source tarball inside a directory path ''' fo = urlopen('%s/%s' % (AUR_URL, self.urlpath)) tarball = tar(mode='r|*', fileobj=fo) tarball.extractall(path) fo.close() class JsonDictFile(dict): '''Json serialized dict''' def __init__(self, path): '''Load json file''' assert(path is not None) try: self._fileobj = open(path, "a+") except (IOError, OSError) as exp: error("Unable to access to json file %s: %s" % (path, exp)) raise if self._fileobj.seek(0, 1) == 0: debug("Json file is empty") else: debug("Loading json file %s" % path) try: self._fileobj.seek(0, 0) dico = jload(self._fileobj) self.update(dico) except Exception as exp: error("Unable to load json file %s: %s" % (path, exp)) raise def save(self): '''Save current dict into a json file''' if len(self) == 0: debug("Not saved. Dict is empty") return if self._fileobj is not None: debug("Saving dict into json file") try: self._fileobj.truncate(0) self._fileobj.seek(0, 0) jdump(self, self._fileobj) self._fileobj.flush() except Exception as exp: error("Unable to save json file: %s" % exp) raise def build(localpkg, aurpkg): info("Getting last AUR version") # find build dir build_dir = TemporaryDirectory() debug("Build dir is %s" % build_dir.name) # extract tarball debug("Extracting aur tarball") aurpkg.extract(build_dir.name) cwd = getcwd() try: chdir("%s/%s" % (build_dir.name, aurpkg.name)) info("Starting build command") check_call(localpkg["build_cmd"], shell=True, close_fds=True) if localpkg.get("commit_cmd", None) is not None: info("Starting commit command") check_call(localpkg["commit_cmd"], shell=True, close_fds=True) finally: chdir(cwd) def event_loop(packages, cache, timeout): ''' program roundabout ''' while True: for name, config in packages.items(): if name == "DEFAULT": continue info("[%s]" % name) if config.get("build_cmd", None) is None: error("Build_cmd is missing in config file") continue pkg = AURPackage(name) # For security, if the maintainer has changed we pass maintainer = config.get("maintainer", None) if maintainer != pkg.maintainer: error("Invalid maintainer for package %s" % name) debug("registered maintainer: %s" % maintainer) debug("AUR maintainer: %s" % pkg.maintainer) continue # checks update debug("Cache lastmodified: %s" % cache.get(name, 0)) debug("AUR lastmodified: %s" % pkg.lastmodified) if pkg.lastmodified <= cache.get(name, 0): info("No new version available") continue # package needs to be built and commited try: info("New version available: %s" % pkg.version) build(config, pkg) except Exception as exp: error(exp) continue # we save last successful build in cache cache[name] = pkg.lastmodified cache.save() # night is coming, save cache debug("waiting for %ds" % timeout) sleep(timeout) def parse_argv(): '''Parse command line arguments''' # load parser parser = ArgumentParser() parser.add_argument("-p", "--packages-path", help="packages config file path") parser.add_argument("-c", "--cache-path", help="cache file path") parser.add_argument("-s", "--sleep", type=int, default=86400, help="sleep interval between checks") parser.add_argument("-d", "--debug", action="store_true", help="debug mode") # parse it! args = parser.parse_args() # set global debug mode if args.debug: getLogger().setLevel(DEBUG) # set default paths if args.packages_path is None: args.packages_path = join(save_config_path(XDG_DIRECTORY), "packages.conf") if args.cache_path is None: args.cache_path = join(save_cache_path(XDG_DIRECTORY), "packages.cache") return args def main(): '''Program entry point''' try: # set logger config hdlr = StreamHandler() hdlr.setFormatter(ABFormatter()) getLogger().addHandler(hdlr) getLogger().setLevel(DEBUG if "AURBOT_DEBUG" in environ else INFO) # parse command line args = parse_argv() # parse package list packages = ConfigParser() packages.read(args.packages_path) # load cache cache = JsonDictFile(args.cache_path) # tell to systemd we are ready notify("READY=1\n") # while 42 event_loop(packages, cache, args.sleep) except KeyboardInterrupt: exit(ERR_ABORT) # except BaseError as exp: # error(exp) # exit(ERR_FATAL) except Exception as exp: error(exp) if getLogger().getEffectiveLevel() != DEBUG: error("Unknown error. Please report it with --debug.") else: raise exit(ERR_UNKNOWN) if __name__ == '__main__': main()