Newer
Older
'''
aurbot - Archlinux User Repository Builder Bot
Copyright © 2020 Sébastien Luttringer
Started, October 30th 2011
This program is free software; you can redistribute it and/or
modify it under the terms of the GNU General Public License
as published by the Free Software Foundation; either version 2
of the License, or (at your option) any later version.
This program is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
GNU General Public License for more details.
You should have received a copy of the GNU General Public License
along with this program; if not, write to the Free Software
Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
'''
# standard imports
from argparse import ArgumentParser
from configparser import ConfigParser
from email.mime.multipart import MIMEMultipart
from email.mime.text import MIMEText
from logging import StreamHandler, getLogger, Formatter, DEBUG, INFO
from os import chdir, environ, getcwd, mkdir, makedirs, geteuid, stat
from smtplib import SMTP, SMTP_SSL
from subprocess import check_call, DEVNULL
from tarfile import open as tar
from tempfile import TemporaryDirectory
ERR_USAGE = 1
ERR_ABORT = 2
ERR_CRITICAL = 3
ERR_UNKNOWN = 4
def __init__(self, fmt="[%(levelname)s] %(msg)s"):
super().__init__(fmt)
def format(self, record):
format_orig = self._style._fmt
if record.levelno == INFO and getLogger(record.name).getEffectiveLevel() != DEBUG:
self._style._fmt = "%(msg)s"
result = Formatter.format(self, record)
self._style._fmt = format_orig
return result
AUR_URL = 'https://aur.archlinux.org'
USER_AGENT = "aurbot"
def __init__(self, name, timeout=None):
super().__init__()
self.name = name
url = "%s/rpc.php?type=info&arg=%s" % (self.AUR_URL, name)
url_req = Request(url, headers={"User-Agent": self.USER_AGENT})
url_fd = urlopen(url_req, timeout=timeout)
d = jloads(url_fd.read().decode("utf-8"))
if d["version"] != 1:
self._info = d["results"]
def __getattr__(self, name):
for k, v in self._info.items():
if name == k.lower():
return v
raise AttributeError()
def __repr__(self):
return "%s %s" % (self.name, self.version)
def extract(self, path):
'''Extract aur source tarball inside a directory path.'''
fo = urlopen(f"{self.AUR_URL}/{self.urlpath}")
tarball = tar(mode='r|*', fileobj=fo)
tarball.extractall(path)
fo.close()
DEFAULT_DATA_DIR = "/var/lib/aurbot"
def __init__(self, name):
super().__init__()
self.name = name
self.path = join(environ.get("AURBOT_DATADIR", self.DEFAULT_DATA_DIR), name)
makedirs(self.path, exist_ok=True)
@property
def logdir(self):
logdir = join(self.path, "log")
if not exists(logdir):
mkdir(logdir)
return logdir
def getlastX(self, X, cast=int, default=0):
filepath = join(self.path, X)
if not exists(filepath):
return default
try:
return cast(open(filepath, "r").read())
except Exception as exp:
return default
def setlastX(self, X, value, cast=int):
open(join(self.path, X), "w").write("%s" % cast(value))
lastbuild = property(
lambda x: LocalPackage.getlastX(x, "lastbuild"),
lambda x, y: LocalPackage.setlastX(x, "lastbuild", y)
)
lastsuccess = property(
lambda x: LocalPackage.getlastX(x, "lastsuccess"),
lambda x, y: LocalPackage.setlastX(x, "lastsuccess", y)
)
lastfailed = property(
lambda x: LocalPackage.getlastX(x, "lastfailed"),
lambda x, y: LocalPackage.setlastX(x, "lastfailed", y)
)
lastchecked = property(
lambda x: LocalPackage.getlastX(x, "lastchecked"),
lambda x, y: LocalPackage.setlastX(x, "lastchecked", y)
)
lastmaintainer = property(
lambda x: LocalPackage.getlastX(x, "lastmaintainer", str, ""),
lambda x, y: LocalPackage.setlastX(x, "lastmaintainer", y, str)
)
def __init__(self, pkgname, pkgconfig):
self.name = pkgname
self._config = pkgconfig
self._local = LocalPackage(pkgname)
# Print sugars.
self.debug = lambda msg: debug(f"{self.name}: {msg}")
self.info = lambda msg: info(f"{self.name}: {msg}")
self.error = lambda msg: error(f"{self.name}: {msg}")
self.warn = lambda msg: warning(f"{self.name}: {msg}")
def send_message(self, msg):
'''Send message to an smtp server.'''
self.info(f"Sending message to {self._config['notify']}")
# Load smtp info.
smtp_host = self._config["smtp_host"]
smtp_port = self._config["smtp_port"]
smtp_login = self._config.get("smtp_login", "")
smtp_pass = self._config.get("smtp_pass", "")
smtp_security = self._config.get("smtp_security", "")
# Display message content when debug.
self.debug(msg)
# Prepare connection.
con = SMTP_SSL() if smtp_security == "ssl" else SMTP()
if getLogger().isEnabledFor(DEBUG):
con.set_debuglevel(True)
con._host = smtp_host
try:
con.connect(smtp_host, smtp_port)
if smtp_security == "starttls":
con.starttls()
if smtp_login != "" and smtp_pass != "":
con.login(smtp_login, smtp_pass)
def send_build_report(self, status, logfile):
'''Send build notification.'''
self.info("Send build report")
# Generate message.
msg["Subject"] = f"Build {status} for {self.name} {self._aur.version}"
msg["From"] = self._config.get("from", "Aurbot")
msg["To"] = self._config["notify"]
with open(logfile, "r") as fd:
mt = MIMEText(fd.read())
msg.attach(mt)
def send_maintainer_report(self):
'''Send email to notify of invalid maintainership.'''
self.info("Send invalid maintainer report")
# Generate message.
"He has probably changed. Check if the new one is trustworthy.\r\n"
"\r\n"
"Configured maintainer is %s.\r\n" % self._config.get("maintainer") +
"AUR maintainer is %s.\r\n" % self._aur.maintainer +
"\r\n"
"Your aurbot configuration need to be updated!\r\n")
msg["Subject"] = "Invalid maintainer for %s" % self.name
msg["From"] = self._config.get("from", "Aurbot")
msg["To"] = self._config["notify"]
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
self.send_message(msg)
def _run_command(self, name, cmd, log):
'''Fancy run of command cmd and log output in file object log.'''
self.info(f"Starting {name} command: {cmd}")
log.write(f"Build command: {cmd}\n")
log.flush()
start_time = time()
try:
check_call(cmd, stdin=DEVNULL, stdout=log, stderr=log, shell=True, close_fds=True)
except Exception as exp:
raise Exception(f"Build failure: {exp}") from exp
end_time = time()
self.info(f"Build duration: {end_time - start_time:.2f}s")
log.write(f"Build duration: {end_time - start_time:.2f}\n")
def _build(self):
'''Build a package.'''
if "build_cmd" not in self._config:
self.error("No build command.")
return
# Register the build start time.
self._local.lastbuild = time()
# Choose a log file name.
logfn = join(self._local.logdir, strftime("build-%Y-%m-%d-%H-%M-%S.log", localtime(time())))
self.debug(f"Build log file path: {logfn}")
# Make a temporary build directory.
# Extract the tarball inside it.
self.debug("Extracting aur tarball in {build_dir.name}")
self._aur.extract(build_dir.name)
with open(logfn, "w") as logfo:
chdir(f"{build_dir.name}/{self.name}")
# Execute build command.
self._run_command("build", self._config['build_cmd'], logfo)
# Execute commit command.
if "commit_cmd" in self._config:
self._run_command("commit", self._config['commit_cmd'], logfo)
chdir(cwd)
# we have to register after chdir in the original directory
self._local.lastsuccess = self._aur.lastmodified
status = "successful"
chdir(cwd)
# we have to register after chdir in the original directory
self._local.lastsuccess = self._aur.lastmodified
status = "failure"
if "notify" in self._config:
self.send_build_report(status, logfn)
def update(self):
'''Update a package.'''
# For security, if the maintainer is incorrect we fail.
self.debug("Configured maintainer: %s" % self._config.get("maintainer"))
self.debug("AUR maintainer: %s" % self._aur.maintainer)
self.debug("Last maintainer: %s" % self._local.lastmaintainer)
# str cast is required to handle no maintainer as None string
if self._config.get("maintainer") == str(self._aur.maintainer):
self._build()
else:
self.error(f"Invalid maintainer")
if self._local.lastmaintainer != str(self._aur.maintainer):
self.send_maintainer_report()
self._local.lastmaintainer = self._aur.maintainer
def check_delta(self):
'''Return the time in seconds remaining before next check.'''
check_interval = self._config.getint("check_interval", self.DEFAULT_CHECK_INTERVAL)
self.debug(f"Check interval is {check_interval}s")
check_delta = int(self._local.lastchecked - time() + check_interval)
self.debug(f"Check delta is {check_delta}s")
return check_delta
def check(self):
# compute check delta
check_delta = self.check_delta()
if check_delta > 0:
# next check is in the future
self.info(f"Next check is planned in {check_delta}s")
return check_delta
# get remote data
try:
self._aur = AURPackage(self.name, self._config.getint("timeout"))
self._local.lastchecked = int(time())
except Exception as exp:
self.error(f"Unable to get AUR package info: {exp}")
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
# few debug printing
self.debug(f"AUR last modified: {self._aur.lastmodified}")
self.debug(f"Local last success lastmodified: {self._local.lastbuild}")
self.debug(f"Local last failed lastmodified: {self._local.lastfailed}")
self.debug(f"Local last build time: {self._local.lastbuild}")
# check if package need to be updated
if self._local.lastsuccess >= self._aur.lastmodified:
if "force" in self._config:
self.info("Up to date, but force value is present.")
if self._config["force"].isdigit() is False:
self.warn("Invalid force value, ignore it")
return
# if lastbuild not exists, it will be equal to 0
# too small to be > to time() even with big force time
now = int(time())
force = int(self._config["force"])
self.debug(f"Force is: {force}s")
force_delta = self._local.lastbuild - now + force
self.debug(f"Force Delta is: {force_delta}s")
if force_delta < 0:
self.info("Forced update")
self.update()
else:
self.info(f"Next forced update in {force_delta}s")
else:
self.info("Up to date, nothing to do.")
elif self._local.lastfailed >= self._aur.lastmodified:
self.warn("Last build has failed, skipping. Remove lastfailed file to retry.")
else:
self.info(f"New version available: {self._aur.version}")
self.update()
# return updated check_delta
return self.check_delta()
class Robot():
'''AUR Package Builder Robot.'''
DEFAULT_CONFIG_FILE = "/etc/aurbot.conf"
@staticmethod
def sighup_handler(signum, frame):
'''Handler for HUP signal (a.k.a reload)'''
info("SIGHUP received")
# Since python 3.5 we need to raise an exception to prevent python to EINTR,
# see https://www.python.org/dev/peps/pep-0475/.
raise InterruptedError()
def __init__(self):
# Set logger config.
hdlr = StreamHandler()
hdlr.setFormatter(ABFormatter())
getLogger().addHandler(hdlr)
# Early debugging mode.
getLogger().setLevel(DEBUG if "AURBOT_DEBUG" in environ else INFO)
# Do not run as root.
if geteuid() == 0 and "AURBOT_RUN_AS_ROOT" not in environ:
raise Error("Do not run as root")
# Use sighup to unblock sleep syscall.
signal(SIGHUP, self.sighup_handler)
# Parse command line.
self._parse_argv()
# Late debugging mode.
if self._args.debug:
getLogger().setLevel(DEBUG)
# Load config.
self._parse_config()
# Tell to systemd we are ready.
notify("READY=1\n")
def _parse_argv(self):
'''Parse command line arguments'''
# Load parser.
parser = ArgumentParser()
parser.add_argument("-c", "--config", help="config file path",
default=environ.get("AURBOT_CONFIG", self.DEFAULT_CONFIG_FILE))
parser.add_argument("-d", "--debug", action="store_true", help="debug mode")
# Parse it!
self._args = parser.parse_args()
def _parse_config(self):
'''Parse the config file.'''
try:
# Get the modification time of the config file.
mtime = stat(self._args.config).st_mtime
# Reload only when file has been modified.
if not hasattr(self, "_config") or mtime > self._config_mtime:
self._config = ConfigParser()
self._config.read(self._args.config)
self._config_mtime = mtime
info(f"Config file loaded: {self._args.config}")
if len(self._config.sections()) == 0:
raise Error("Empty configuration")
except Exception as exp:
raise Error(f"Unable to load config file: {exp}")
for pkgname in self._config.sections():
pkg = Package(pkgname, self._config[pkgname])
next_checks.add(pkg.check())
# Sleep until next check.
timeout = min(next_checks)
debug(f"Waiting for {timeout}s")
except KeyboardInterrupt:
exit(Error.ERR_ABORT)
except Error as exp:
critical(exp)
exit(Error.ERR_CRITICAL)
except Exception as exp:
critical(exp)
if getLogger().getEffectiveLevel() != DEBUG:
error("Unknown error. Please report it with --debug.")
else:
raise
exit(Error.ERR_UNKNOWN)