Newer
Older
'''
aurbot - Archlinux User Repository Builder Bot
Copyright © 2020 Sébastien Luttringer
Started, October 30th 2011
This program is free software; you can redistribute it and/or
modify it under the terms of the GNU General Public License
as published by the Free Software Foundation; either version 2
of the License, or (at your option) any later version.
This program is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
GNU General Public License for more details.
You should have received a copy of the GNU General Public License
along with this program; if not, write to the Free Software
Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
'''
# standard imports
from argparse import ArgumentParser
from configparser import ConfigParser
from email.mime.multipart import MIMEMultipart
from email.mime.text import MIMEText
from logging import StreamHandler, getLogger, Formatter, DEBUG, INFO
from os import chdir, environ, getcwd, mkdir, makedirs, geteuid, stat
from smtplib import SMTP, SMTP_SSL
from subprocess import check_call, DEVNULL
from tarfile import open as tar
from tempfile import TemporaryDirectory
"""Error handling"""
ERR_USAGE = 1
ERR_ABORT = 2
ERR_CRITICAL = 3
ERR_UNKNOWN = 4
class ABFormatter(Formatter):
'''
Customer logging formater
'''
def __init__(self, fmt="[%(levelname)s] %(msg)s"):
def format(self, record):
format_orig = self._style._fmt
if record.levelno == INFO and getLogger(record.name).getEffectiveLevel() != DEBUG:
result = Formatter.format(self, record)
self._style._fmt = format_orig
return result
class AURPackage(dict):
'''
Abstract AUR package action
'''
AUR_URL = 'https://aur.archlinux.org'
USER_AGENT = "aurbot"
url = "%s/rpc.php?type=info&arg=%s" % (self.AUR_URL, name)
url_req = Request(url, headers={"User-Agent": self.USER_AGENT})
url_fd = urlopen(url_req, timeout=timeout)
d = jloads(url_fd.read().decode("utf-8"))
if d["version"] != 1:
raise Exception("Unknown AUR Backend version: %s" % d["version"])
if len(d["results"]) == 0:
raise Exception("No such package: %s" % name)
if d["results"]["PackageBase"] != name:
raise Exception("No such base package: %s" % name)
self._info = d["results"]
def __getattr__(self, name):
for k, v in self._info.items():
if name == k.lower():
return v
raise AttributeError()
def __repr__(self):
def extract(self, path):
'''
Extract aur source tarball inside a directory path
'''
fo = urlopen('%s/%s' % (self.AUR_URL, self.urlpath))
tarball = tar(mode='r|*', fileobj=fo)
tarball.extractall(path)
fo.close()
class LocalPackage(dict):
'''Local package data abstraction'''
self.path = join(environ.get("AURBOT_DATADIR", self.DEFAULT_DATA_DIR), name)
logdir = join(self.path, "log")
if not exists(logdir):
mkdir(logdir)
return logdir
def getlastX(self, X, cast=int, default=0):
filepath = join(self.path, X)
if not exists(filepath):
return default
return cast(open(filepath, "r").read())
debug("Failed to load %s: %s" % (X, exp))
return default
def setlastX(self, X, value, cast=int):
open(join(self.path, X), "w").write("%s" % cast(value))
# store the moment where the build was done locally
lastbuild = property(
lambda x: LocalPackage.getlastX(x, "lastbuild"),
lambda x, y: LocalPackage.setlastX(x, "lastbuild", y)
)
# store the aur lastmodified value of the last sucessful build
lastsuccess = property(
lambda x: LocalPackage.getlastX(x, "lastsuccess"),
lambda x, y: LocalPackage.setlastX(x, "lastsuccess", y)
)
# store the aur lastmodified value of the last failed build
lastfailed = property(
lambda x: LocalPackage.getlastX(x, "lastfailed"),
lambda x, y: LocalPackage.setlastX(x, "lastfailed", y)
)
lastchecked = property(
lambda x: LocalPackage.getlastX(x, "lastchecked"),
lambda x, y: LocalPackage.setlastX(x, "lastchecked", y)
)
# store the last maintainer for the package
lastmaintainer = property(
lambda x: LocalPackage.getlastX(x, "lastmaintainer", str, ""),
lambda x, y: LocalPackage.setlastX(x, "lastmaintainer", y, str)
)
DEFAULT_CHECK_INTERVAL = 86400
DEFAULT_CONFIG_FILE = "/etc/aurbot.conf"
def __init__(self, path):
''' initialize the bot
'''
self.parse_config()
def init_config(self, path=None):
''' default value for configured
'''
if path is not None:
self.config_path = path
self.config_mtime = 0
self.config = ConfigParser()
def parse_config(self):
''' parse the config file
'''
# get the modification time of the config file
mtime = stat(self.config_path).st_mtime
except Exception as exp:
self.init_config()
debug("Unable to stat config file, empty one used: %s" % str(exp))
return
# reload only when file has been modified
if mtime > self.config_mtime:
self.config_mtime = mtime
self.config = ConfigParser()
self.init_config()
debug("Unable to parse config file, empty one used: %s" % str(exp))
info("Config file loaded %s" % self.config_path)
def send_message(self, pkgconfig, msg):
''' Send message to an smtp server
'''
info("Sending message to %s" % pkgconfig["notify"])
# load smtp info
try:
smtp_host = pkgconfig["smtp_host"]
smtp_port = pkgconfig["smtp_port"]
smtp_login = pkgconfig.get("smtp_login", "")
smtp_pass = pkgconfig.get("smtp_pass", "")
smtp_security = pkgconfig.get("smtp_security", "")
except:
error("Unable to load smtp config")
return
# display message content when debug
# prepare connection
con = SMTP_SSL() if smtp_security == "ssl" else SMTP()
if getLogger().isEnabledFor(DEBUG):
con.set_debuglevel(True)
con._host = smtp_host
try:
con.connect(smtp_host, smtp_port)
if smtp_security == "starttls":
con.starttls()
if smtp_login != "" and smtp_pass != "":
con.login(smtp_login, smtp_pass)
# send it
con.send_message(msg)
# gentleman quit
con.quit()
except Exception as exp:
error("Unable to send message via smtp: %s" % str(exp))
def send_build_report(self, pkgconfig, localpkg, aurpkg, status, logfile):
''' Send build notification
'''
info("Send build report")
# generate message
msg = MIMEMultipart()
msg["Subject"] = "Build %s for %s %s" % (
"successful" if status else "failure", localpkg.name, aurpkg.version)
msg["From"] = pkgconfig.get("from", "Aurbot")
msg["To"] = pkgconfig["notify"]
msg["Date"] = formatdate(localtime=True)
# attach logfile
with open(logfile, "r") as fd:
mt = MIMEText(fd.read())
msg.attach(mt)
def send_maintainer_report(self, pkgconfig, localpkg, aurpkg):
''' Send email to notify invalid maintainer
'''
info("Send invalid maintainer report")
# generate message
msg = MIMEText(
"Maintainer for package %s is invalid.\r\n" % localpkg.name +
"He has probably changed. Check if the new one is trustworthy.\r\n"
"\r\n"
"Configured maintainer is %s.\r\n" % pkgconfig.get("maintainer") +
"AUR maintainer is %s.\r\n" % aurpkg.maintainer +
"\r\n"
"Your aurbot configuration need to be updated!\r\n")
msg["Subject"] = "Invalid maintainer for %s" % localpkg.name
msg["From"] = pkgconfig.get("from", "Aurbot")
msg["To"] = pkgconfig["notify"]
msg["Date"] = formatdate(localtime=True)
def build(self, pkgconfig, localpkg, aurpkg):
''' Build a package
'''
# register the build
localpkg.lastbuild = time()
# log files
fp = join(localpkg.logdir, strftime("build-%Y-%m-%d-%H-%M-%S.log", localtime(time())))
debug("Build log file path: %s" % fp)
# find build dir
build_dir = TemporaryDirectory()
debug("Build dir is %s" % build_dir.name)
# extract tarball
debug("Extracting aur tarball")
aurpkg.extract(build_dir.name)
with open(fp, "w") as fd:
try:
chdir("%s/%s" % (build_dir.name, aurpkg.name))
# build
info("Starting build command")
debug(pkgconfig["build_cmd"])
fd.write("Build command: %s\n" % pkgconfig["build_cmd"])
check_call(pkgconfig["build_cmd"], stdin=DEVNULL, stdout=fd,
stderr=fd, shell=True, close_fds=True)
except Exception as exp:
raise Exception("Build failure: %s" % str(exp)) from exp
info("Build duration: %.2fs" % (end_time - start_time))
fd.write("Build duration: %.2fs\n" % (end_time - start_time))
# commit
if "commit_cmd" in pkgconfig:
info("Starting commit command")
debug(pkgconfig["commit_cmd"])
fd.write("Commit command: %s\n" % pkgconfig["commit_cmd"])
fd.flush()
start_time = time()
try:
check_call(pkgconfig["commit_cmd"], stdin=DEVNULL,
stdout=fd, stderr=fd, shell=True, close_fds=True)
except Exception as exp:
raise Exception("Commit failure: %s" % str(exp)) from exp
end_time = time()
info("Commit duration: %.2fs" % (end_time - start_time))
fd.write("Commit duration: %.2fs\n" % (end_time - start_time))
chdir(cwd)
# we have to register after chdir in the original directory
localpkg.lastsuccess = aurpkg.lastmodified
error("Update failure: %s" % exp)
chdir(cwd)
# we have to register after chdir in the original directory
localpkg.lastfailed = aurpkg.lastmodified
status = False
if "notify" in pkgconfig:
self.send_build_report(pkgconfig, localpkg, aurpkg, status, fp)
def update(self, pkgconfig, localpkg, aurpkg):
''' Update (build and commit) a package
'''
debug("Updating %s" % aurpkg.name)
# for security, if the maintainer is incorrect we fail
debug("Configured maintainer: %s" % pkgconfig.get("maintainer"))
debug("AUR maintainer: %s" % aurpkg.maintainer)
debug("Last maintainer: %s" % localpkg.lastmaintainer)
# str is required to handle no maintainer as None string
if pkgconfig.get("maintainer") != str(aurpkg.maintainer):
if localpkg.lastmaintainer != str(aurpkg.maintainer):
self.send_maintainer_report(pkgconfig, localpkg, aurpkg)
localpkg.lastmaintainer = aurpkg.maintainer
error("Invalid maintainer for package %s" % aurpkg.name)
return
localpkg.lastmaintainer = aurpkg.maintainer
def start(self):
''' start the bot loop
'''
while True:
try:
# reload package list
self.parse_config()
next_checks = set()
for pkgname, pkgconfig in self.config.items():
if pkgname == "DEFAULT":
continue
info("[%s]" % pkgname)
if "build_cmd" not in pkgconfig:
error("build_cmd is missing in config file")
continue
check_interval = pkgconfig.getint("check_interval", self.DEFAULT_CHECK_INTERVAL)
debug("Check interval is %ss" % check_interval)
check_delta = int(localpkg.lastchecked - time() + check_interval)
debug("Check delta is %ss" % check_delta)
if check_delta > 0:
# next check is in the future
next_checks.add(check_delta)
continue
next_checks.add(check_interval)
# get remote data
try:
aurpkg = AURPackage(pkgname, pkgconfig.getint("timeout"))
localpkg.lastchecked = int(time())
except Exception as exp:
error("Unable to get AUR package info: %s" % exp)
continue
# few debug printing
debug("AUR last modified: %s" % aurpkg.lastmodified)
debug("Local last success lastmodified: %s" % localpkg.lastbuild)
debug("Local last failed lastmodified: %s" % localpkg.lastfailed)
debug("Local last build time: %s" % localpkg.lastbuild)
# check if package need to be updated
if "force" in pkgconfig:
info("Up to date, but force value is present.")
if pkgconfig["force"].isdigit() is False:
warning("Invalid force value, ignore it")
continue
# if lastbuild not exists, it will be equal to 0
# too small to be > to time() even with big force time
now = int(time())
force = int(pkgconfig["force"])
debug("Force is: %ss" % force)
debug("Force Delta is: %ss" % force_delta)
if force_delta < 0:
info("Forced update")
else:
info("Next forced update in %ss" % force_delta)
elif localpkg.lastfailed >= aurpkg.lastmodified:
warning("Last build has failed, skipping. Remove lastfailed file to retry.")
info("New version available: %s" % aurpkg.version)
self.update(pkgconfig, localpkg, aurpkg)
# sleep until next check
# len(next_checks) is 0 when there is no package configured
timeout = min(next_checks) if len(next_checks) > 0 else self.DEFAULT_CHECK_INTERVAL
sleep(timeout)
except InterruptedError:
pass
info("SIGHUP received")
# since python 3.5 we need to raise an exception to prevent python to EINTR
# see https://www.python.org/dev/peps/pep-0475/
raise InterruptedError()
def parse_argv():
'''Parse command line arguments'''
# load parser
parser = ArgumentParser()
parser.add_argument("-c", "--config", help="config file path",
default=environ.get("AURBOT_CONFIG", self.DEFAULT_CONFIG_FILE))
parser.add_argument("-d", "--debug", action="store_true", help="debug mode")
parser.epilog = "You could set $XDG_DATA_HOME to change the path of the local package cache."
# parse it!
args = parser.parse_args()
# set global debug mode
if args.debug:
getLogger().setLevel(DEBUG)
return args
def main():
'''Program entry point'''
try:
# set logger config
hdlr = StreamHandler()
hdlr.setFormatter(ABFormatter())
getLogger().addHandler(hdlr)
getLogger().setLevel(DEBUG if "AURBOT_DEBUG" in environ else INFO)
# do not run as root
if geteuid() == 0:
raise Error("Do not run as root")
# create the bot object
bot = Aurbot(args.config)
exit(Error.ERR_ABORT)
except Error as exp:
critical(exp)
exit(Error.ERR_CRITICAL)
if getLogger().getEffectiveLevel() != DEBUG:
error("Unknown error. Please report it with --debug.")
else:
raise