#!/usr/bin/python3 # coding: utf-8 ''' aurbot - Archlinux User Repository Builder Bot Copyright © 2020 Sébastien Luttringer Started, October 30th 2011 This program is free software; you can redistribute it and/or modify it under the terms of the GNU General Public License as published by the Free Software Foundation; either version 2 of the License, or (at your option) any later version. This program is distributed in the hope that it will be useful, but WITHOUT ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License for more details. You should have received a copy of the GNU General Public License along with this program; if not, write to the Free Software Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA. ''' # standard imports from argparse import ArgumentParser from configparser import ConfigParser from email.mime.multipart import MIMEMultipart from email.mime.text import MIMEText from email.utils import formatdate from json import loads as jloads from logging import debug, warning, info, error, critical from logging import StreamHandler, getLogger, Formatter, DEBUG, INFO from os import chdir, environ, getcwd, mkdir, makedirs, geteuid, stat from os.path import exists, join, abspath from signal import signal, SIGHUP from smtplib import SMTP, SMTP_SSL from subprocess import check_call, DEVNULL from tarfile import open as tar from tempfile import TemporaryDirectory from time import sleep, time, strftime, localtime from urllib.request import urlopen, Request # extra import from systemd.daemon import notify class Error(BaseException): '''Error handling.''' ERR_USAGE = 1 ERR_ABORT = 2 ERR_CRITICAL = 3 ERR_UNKNOWN = 4 class ABFormatter(Formatter): '''Customer logging formater.''' def __init__(self, fmt="[%(levelname)s] %(msg)s"): super().__init__(fmt) def format(self, record): format_orig = self._style._fmt if record.levelno == INFO and getLogger(record.name).getEffectiveLevel() != DEBUG: self._style._fmt = "%(msg)s" result = Formatter.format(self, record) self._style._fmt = format_orig return result class AURPackage(dict): '''Abstract AUR package data.''' AUR_URL = 'https://aur.archlinux.org' USER_AGENT = "aurbot" def __init__(self, name, timeout=None): super().__init__() self.name = name url = "%s/rpc.php?v=5&type=info&arg[]=%s" % (self.AUR_URL, name) url_req = Request(url, headers={"User-Agent": self.USER_AGENT}) debug(f"{name} Requesting url: {url} (timeout: {timeout}s)") url_fd = urlopen(url_req, timeout=timeout) d = jloads(url_fd.read().decode("utf-8")) if d["version"] != 5: raise Exception(f"Unknown AUR Backend version: {d['version']}") try: if d["results"][0]["PackageBase"] != name: raise Exception(f"Not a base package") self._info = d["results"][0] except Exception as err: raise Exception(f"No such package: {name}") from err def __getattr__(self, name): for k, v in self._info.items(): if name == k.lower(): return v raise AttributeError() def __repr__(self): return "%s %s" % (self.name, self.version) def extract(self, path): '''Extract aur source tarball inside a directory path.''' fo = urlopen(f"{self.AUR_URL}/{self.urlpath}") tarball = tar(mode='r|*', fileobj=fo) tarball.extractall(path) fo.close() class LocalPackage(dict): '''Local package data.''' DEFAULT_DATA_DIR = "/var/lib/aurbot" def __init__(self, name): super().__init__() self.name = name self.path = join(environ.get("AURBOT_DATADIR", self.DEFAULT_DATA_DIR), name) debug(f"{name}: local path is: {self.path}") makedirs(self.path, exist_ok=True) @property def logdir(self): '''Return log files directory path.''' logdir = join(self.path, "log") if not exists(logdir): mkdir(logdir) return logdir def getlastX(self, X, cast=int, default=0): '''Return saved value of X casted in cast.''' filepath = join(self.path, X) if not exists(filepath): return default try: return cast(open(filepath, "r").read()) except Exception as exp: debug(f"Failed to load {X}: {exp}") return default def setlastX(self, X, value, cast=int): '''Cast the value X in cast and save it to file named X.''' open(join(self.path, X), "w").write("%s" % cast(value)) # Store the moment where the build was done locally. lastbuild = property( lambda x: LocalPackage.getlastX(x, "lastbuild"), lambda x, y: LocalPackage.setlastX(x, "lastbuild", y) ) # Store the aur lastmodified value of the last sucessful build. lastsuccess = property( lambda x: LocalPackage.getlastX(x, "lastsuccess"), lambda x, y: LocalPackage.setlastX(x, "lastsuccess", y) ) # Store the aur lastmodified value of the last failed build. lastfailed = property( lambda x: LocalPackage.getlastX(x, "lastfailed"), lambda x, y: LocalPackage.setlastX(x, "lastfailed", y) ) # Store the last time we check the aur. lastchecked = property( lambda x: LocalPackage.getlastX(x, "lastchecked"), lambda x, y: LocalPackage.setlastX(x, "lastchecked", y) ) # Store the last maintainer for the package. lastmaintainer = property( lambda x: LocalPackage.getlastX(x, "lastmaintainer", str, ""), lambda x, y: LocalPackage.setlastX(x, "lastmaintainer", y, str) ) class Package(): '''Package Meta Abstraction.''' DEFAULT_CHECK_INTERVAL = 86400 def __init__(self, pkgname, pkgconfig): self.name = pkgname self._config = pkgconfig self._local = LocalPackage(pkgname) # Print sugars. self.debug = lambda msg: debug(f"{self.name}: {msg}") self.info = lambda msg: info(f"{self.name}: {msg}") self.error = lambda msg: error(f"{self.name}: {msg}") self.warn = lambda msg: warning(f"{self.name}: {msg}") def send_message(self, msg): '''Send message to an smtp server.''' self.info(f"Sending message to {self._config['notify']}") # Load smtp info. try: smtp_host = self._config["smtp_host"] smtp_port = self._config["smtp_port"] smtp_login = self._config.get("smtp_login", "") smtp_pass = self._config.get("smtp_pass", "") smtp_security = self._config.get("smtp_security", "") except: self.error("Unable to load smtp config") return # Display message content when debug. self.debug(msg) # Prepare connection. con = SMTP_SSL() if smtp_security == "ssl" else SMTP() if getLogger().isEnabledFor(DEBUG): con.set_debuglevel(True) con._host = smtp_host try: con.connect(smtp_host, smtp_port) if smtp_security == "starttls": con.starttls() if smtp_login != "" and smtp_pass != "": con.login(smtp_login, smtp_pass) # Send it. con.send_message(msg) # Gentleman quit. con.quit() except Exception as exp: self.error(f"Unable to send message via smtp: {exp}") def send_build_report(self, status, logfile): '''Send build notification.''' self.info("Send build report") # Generate message. msg = MIMEMultipart() msg["Subject"] = f"Build {status} for {self.name} {self._aur.version}" msg["From"] = self._config.get("from", "Aurbot") msg["To"] = self._config["notify"] msg["Date"] = formatdate(localtime=True) # Attach logfile. with open(logfile, "r") as fd: mt = MIMEText(fd.read()) msg.attach(mt) self.send_message(msg) def send_maintainer_report(self): '''Send email to notify of invalid maintainership.''' self.info("Send invalid maintainer report") # Generate message. msg = MIMEText( "Maintainer for package %s is invalid.\r\n" % self.name + "He has probably changed. Check if the new one is trustworthy.\r\n" "\r\n" "Configured maintainer is %s.\r\n" % self._config.get("maintainer") + "AUR maintainer is %s.\r\n" % self._aur.maintainer + "\r\n" "Your aurbot configuration need to be updated!\r\n") msg["Subject"] = "Invalid maintainer for %s" % self.name msg["From"] = self._config.get("from", "Aurbot") msg["To"] = self._config["notify"] msg["Date"] = formatdate(localtime=True) self.send_message(msg) def _run_command(self, name, cmd, log): '''Fancy run of command cmd and log output in file object log.''' self.info(f"Starting {name} command: {cmd}") capname = name.capitalize() log.write(f"{capname} command: {cmd}\n") log.flush() start_time = time() try: check_call(cmd, stdin=DEVNULL, stdout=log, stderr=log, shell=True, close_fds=True) except Exception as exp: raise Exception(f"{capname} failure: {exp}") from exp end_time = time() self.info(f"{capname} duration: {end_time - start_time:.2f}s") log.write(f"{capname} duration: {end_time - start_time:.2f}\n") def _build(self): '''Build a package.''' if "build_cmd" not in self._config: self.error("No build command.") return # Register the build start time. self._local.lastbuild = time() # Choose a log file name. logfn = join(self._local.logdir, strftime("build-%Y-%m-%d-%H-%M-%S.log", localtime(time()))) self.debug(f"Build log file path: {logfn}") # Make a temporary build directory. build_dir = TemporaryDirectory() # Extract the tarball inside it. self.debug("Extracting aur tarball in {build_dir.name}") self._aur.extract(build_dir.name) with open(logfn, "w") as logfo: cwd = getcwd() try: chdir(f"{build_dir.name}/{self.name}") # Execute build command. self._run_command("build", self._config['build_cmd'], logfo) # Execute commit command. if "commit_cmd" in self._config: self._run_command("commit", self._config['commit_cmd'], logfo) chdir(cwd) # we have to register after chdir in the original directory self._local.lastsuccess = self._aur.lastmodified status = "successful" except Exception as exp: self.error(f"Update failure: {exp}") chdir(cwd) # we have to register after chdir in the original directory self._local.lastsuccess = self._aur.lastmodified status = "failure" if "notify" in self._config: self.send_build_report(status, logfn) def update(self): '''Update a package.''' # For security, if the maintainer is incorrect we fail. self.debug("Configured maintainer: %s" % self._config.get("maintainer")) self.debug("AUR maintainer: %s" % self._aur.maintainer) self.debug("Last maintainer: %s" % self._local.lastmaintainer) # str cast is required to handle no maintainer as None string if self._config.get("maintainer") == str(self._aur.maintainer): self._build() else: self.error(f"Invalid maintainer") # we notify by mail only once the maintainer is invalid if self._local.lastmaintainer != str(self._aur.maintainer): self.send_maintainer_report() self._local.lastmaintainer = self._aur.maintainer def check_delta(self): '''Return the time in seconds remaining before next check.''' check_interval = self._config.getint("check_interval", self.DEFAULT_CHECK_INTERVAL) check_delta = int(self._local.lastchecked - time() + check_interval) self.debug(f"Check interval is {check_interval}s, remaining is {check_delta}s") return check_delta def check(self): '''Check packages for updates. Return the time in second before next check.''' # compute check delta check_delta = self.check_delta() if check_delta > 0: # next check is in the future self.info(f"Next check is planned in {check_delta}s") return check_delta # Update the last check time self._local.lastchecked = int(time()) check_delta = self.check_delta() # get remote data try: self._aur = AURPackage(self.name, self._config.getint("timeout")) except Exception as exp: self.error(f"Unable to get AUR package info: {exp}") return check_delta # few debug printing self.debug(f"AUR last modified: {self._aur.lastmodified}") self.debug(f"Local last success lastmodified: {self._local.lastbuild}") self.debug(f"Local last failed lastmodified: {self._local.lastfailed}") self.debug(f"Local last build time: {self._local.lastbuild}") # check if package need to be updated if self._local.lastsuccess >= self._aur.lastmodified: if "force" in self._config: self.info("Up to date, but force value is present.") if self._config["force"].isdigit() is False: self.warn("Invalid force value, ignore it") return check_delta # if lastbuild not exists, it will be equal to 0 # too small to be > to time() even with big force time now = int(time()) force = int(self._config["force"]) self.debug(f"Force is: {force}s") force_delta = self._local.lastbuild - now + force self.debug(f"Force Delta is: {force_delta}s") if force_delta < 0: self.info("Forced update") self.update() else: self.info(f"Next forced update in {force_delta}s") else: self.info("Up to date, nothing to do.") elif self._local.lastfailed >= self._aur.lastmodified: self.warn("Last build has failed, skipping. Remove lastfailed file to retry.") else: self.info(f"New version available: {self._aur.version}") self.update() # return updated check_delta return check_delta class Robot(): '''AUR Package Builder Robot.''' DEFAULT_CONFIG_FILE = "/etc/aurbot.conf" @staticmethod def sighup_handler(signum, frame): '''Handler for HUP signal (a.k.a reload)''' info("SIGHUP received") # Since python 3.5 we need to raise an exception to prevent python to EINTR, # see https://www.python.org/dev/peps/pep-0475/. raise InterruptedError() def __init__(self): # Set logger config. hdlr = StreamHandler() hdlr.setFormatter(ABFormatter()) getLogger().addHandler(hdlr) # Early debugging mode. getLogger().setLevel(DEBUG if "AURBOT_DEBUG" in environ else INFO) # Do not run as root. if geteuid() == 0 and "AURBOT_RUN_AS_ROOT" not in environ: raise Error("Do not run as root") # Use sighup to unblock sleep syscall. signal(SIGHUP, self.sighup_handler) # Parse command line. self._parse_argv() # Late debugging mode. if self._args.debug: getLogger().setLevel(DEBUG) # Load config. self._parse_config() # Tell to systemd we are ready. notify("READY=1\n") def _parse_argv(self): '''Parse command line arguments''' # Load parser. parser = ArgumentParser() parser.add_argument("-c", "--config", help="config file path", default=environ.get("AURBOT_CONFIG", self.DEFAULT_CONFIG_FILE)) parser.add_argument("-d", "--debug", action="store_true", help="debug mode") # Parse it! self._args = parser.parse_args() def _parse_config(self): '''Parse the config file.''' try: # Get the modification time of the config file. mtime = stat(self._args.config).st_mtime # Reload only when file has been modified. if not hasattr(self, "_config") or mtime > self._config_mtime: self._config = ConfigParser() self._config.read(self._args.config) self._config_mtime = mtime info(f"Config file loaded: {self._args.config}") if len(self._config.sections()) == 0: raise Error("Empty configuration") except Exception as exp: raise Error(f"Unable to load config file: {exp}") def start(self): '''Start the robot rock.''' while True: try: # Check for config update. self._parse_config() next_checks = set() for pkgname in self._config.sections(): pkg = Package(pkgname, self._config[pkgname]) next_checks.add(pkg.check()) # Time to sleep until next check, with a minimum of 1s. min_next_checks = min(next_checks) timeout = max(1, min_next_checks) debug(f"Next check is planned in {min_next_checks}s, waiting for {timeout}s") sleep(timeout) except InterruptedError: pass if __name__ == '__main__': try: Robot().start() except KeyboardInterrupt: exit(Error.ERR_ABORT) except Error as exp: critical(exp) exit(Error.ERR_CRITICAL) except Exception as exp: critical(exp) if getLogger().getEffectiveLevel() != DEBUG: error("Unknown error. Please report it with --debug.") else: raise exit(Error.ERR_UNKNOWN)