Newer
Older
#!/usr/bin/python3
# coding: utf-8
# aurbot - Archlinux User Repository Builder Bot
#
# Started, October 30th 2011
#
# This program is free software; you can redistribute it and/or
# modify it under the terms of the GNU General Public License
# as published by the Free Software Foundation; either version 2
# of the License, or (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program; if not, write to the Free Software
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
from argparse import ArgumentParser
from configparser import ConfigParser
from json import load as jload, dump as jdump, loads as jloads
from logging import StreamHandler, getLogger, Formatter, DEBUG, INFO
from logging import debug, warning, info, error
from os import chdir, environ, getcwd, mkdir
from os.path import exists, join
from tarfile import open as tar
from tempfile import TemporaryDirectory
AUR_URL = 'https://aur.archlinux.org'
USER_AGENT = "aurbot"
XDG_DIRECTORY = "aurbot"
ERR_USAGE = 1
ERR_FATAL = 2
ERR_ABORT = 3
ERR_UNKNOWN = 4
class ABFormatter(Formatter):
'''
Customer logging formater
'''
def __init__(self, fmt="[%(levelname)s] %(msg)s"):
Formatter.__init__(self, fmt)
def format(self, record):
format_orig = self._style._fmt
if record.levelno == INFO and getLogger(record.name).getEffectiveLevel() != DEBUG:
self._style._fmt = "%(msg)s"
result = Formatter.format(self, record)
self._style._fmt = format_orig
return result
class AURPackage(dict):
'''
Abstract AUR package action
'''
def __init__(self, name, timeout=None):
self.name = name
debug("getting %s aur infos" % self.name)
url = "%s/rpc.php?type=info&arg=%s" % (AUR_URL, name)
url_req = Request(url, headers={"User-Agent": USER_AGENT})
url_fd = urlopen(url_req, timeout=timeout)
d = jloads(url_fd.read().decode("utf-8"))
if d["version"] != 1:
raise Exception("Unknown AUR Backend version: %s" % d["version"])
if len(d["results"]) == 0:
raise Exception("No such package: %s" % name)
self._info = d["results"]
def __getattr__(self, name):
for k, v in self._info.items():
if name == k.lower():
return v
raise AttributeError()
def __repr__(self):
def extract(self, path):
'''
Extract aur source tarball inside a directory path
'''
fo = urlopen('%s/%s' % (AUR_URL, self.urlpath))
tarball = tar(mode='r|*', fileobj=fo)
tarball.extractall(path)
fo.close()
class LocalPackage(dict):
'''Local package data abstraction'''
def __init__(self, name):
self.name = name
self.path = join(save_data_path(XDG_DIRECTORY), name)
debug("local path is: %s" % self.path)
if not exists(self.path):
mkdir(self.path)
@property
def logdir(self):
logdir = join(self.path, "log")
if not exists(logdir):
mkdir(logdir)
return logdir
return int(open(join(self.path, X), "r").read())
except Exception as exp:
debug("Failed to read %s time: %s" % (X, exp))
return 0
def setlastX(self, X, value):
try:
open(join(self.path, X), "w").write("%d\n" % value)
except Exception as exp:
error("Failed to save %s time: %s" % (X, exp))
lastbuild = property(lambda x: LocalPackage.getlastX(x, "lastbuild"),
lambda x, y: LocalPackage.setlastX(x, "lastbuild", y))
lastmodified = property(lambda x: LocalPackage.getlastX(x, "lastmodified"),
lambda x, y: LocalPackage.setlastX(x, "lastmodified", y))
def build(config, localpkg, aurpkg):
'''
Build and commit a package
Notify if succeeded
'''
# log files
fp = join(localpkg.logdir, strftime("build-%Y-%m-%d-%H-%M-%S.log", localtime(time())))
debug("Build log file path: %s" % fp)
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
with open(fp, "w") as fd:
try:
cwd = getcwd()
chdir("%s/%s" % (build_dir.name, aurpkg.name))
# build
info("Starting build command")
start_time = time()
try:
check_call(config["build_cmd"], stdin=DEVNULL, stdout=fd,
stderr=fd, shell=True, close_fds=True)
except Exception as exp:
error("Build command failure: %s" % exp)
raise
info("Build duration: %ss" % (time() - start_time))
# commit
if "commit_cmd" in config:
info("Starting commit command")
start_time = time()
try:
check_call(config["commit_cmd"], stdin=DEVNULL, stdout=fd,
stderr=fd, shell=True, close_fds=True)
except Exception as exp:
error("Commit command failure: %s" % exp)
raise
info("Commit duration: %ss" % (time() - start_time))
# register success
localpkg.lastbuild = time()
localpkg.lastmodified = aurpkg.lastmodified
finally:
chdir(cwd)
for name, config in packages.items():
if name == "DEFAULT":
continue
try:
aur = AURPackage(name)
except Exception as exp:
error("Unable to get AUR package info: %s" % exp)
continue
maintainer = config.get("maintainer")
if maintainer != aur.maintainer:
debug("AUR last modified: %s" % aur.lastmodified)
debug("Local last modified: %s" % local.lastmodified)
debug("Local last build: %s" % local.lastbuild)
# build new aur version
if aur.lastmodified > local.lastmodified:
info("New version available: %s" % aur.version)
# re-build package when force time is passed
elif "force" in config:
if config["force"].isdigit() is False:
warning("Invalid force value, ignore it")
continue
# if lastbuild not exists, it will be equal to 0
# too small to be > to time() even with big force time
now = int(time())
force = int(config["force"])
debug("Force at: %ss, currently: %ss" % (force, now - local.lastbuild))
if local.lastbuild + force <= now:
info("Forced update")
debug("waiting for %ds" % timeout)
sleep(timeout)
def parse_argv():
'''Parse command line arguments'''
# load parser
parser = ArgumentParser()
parser.add_argument("-c", "--config", help="packages config file path")
parser.add_argument("-s", "--sleep", type=int, default=86400, help="sleep interval between checks")
parser.add_argument("-d", "--debug", action="store_true", help="debug mode")
parser.epilog = "You could set $XDG_DATA_HOME to change the path of the local package cache."
# parse it!
args = parser.parse_args()
# set global debug mode
if args.debug:
getLogger().setLevel(DEBUG)
# set default paths
if args.config is None:
args.config = join(save_config_path(XDG_DIRECTORY), "packages.conf")
return args
def main():
'''Program entry point'''
try:
# set logger config
hdlr = StreamHandler()
hdlr.setFormatter(ABFormatter())
getLogger().addHandler(hdlr)
getLogger().setLevel(DEBUG if "AURBOT_DEBUG" in environ else INFO)
# parse command line
args = parse_argv()
# parse package list
packages = ConfigParser()
# tell to systemd we are ready
notify("READY=1\n")
# while 42
except KeyboardInterrupt:
exit(ERR_ABORT)
except Exception as exp:
error(exp)
if getLogger().getEffectiveLevel() != DEBUG:
error("Unknown error. Please report it with --debug.")
else:
raise
exit(ERR_UNKNOWN)
if __name__ == '__main__':