Newer
Older
#!/usr/bin/python3
# coding: utf-8
# aurbot - Archlinux User Repository Builder Bot
# Copyright © 2014 Sébastien Luttringer
#
# Started, October 30th 2011
#
# This program is free software; you can redistribute it and/or
# modify it under the terms of the GNU General Public License
# as published by the Free Software Foundation; either version 2
# of the License, or (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program; if not, write to the Free Software
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
from argparse import ArgumentParser
from configparser import ConfigParser
from json import load as jload, dump as jdump, loads as jloads
from logging import StreamHandler, getLogger, Formatter, DEBUG, INFO
from logging import debug, warning, info, error
from os import getcwd, chdir
from os.path import join
from subprocess import check_call
from tarfile import open as tar
from tempfile import TemporaryDirectory
from time import sleep
from urllib.request import urlopen, Request
from xdg.BaseDirectory import save_config_path, save_cache_path
AUR_URL = 'https://aur.archlinux.org'
USER_AGENT = "aurbot"
XDG_DIRECTORY = "aurbot"
ERR_USAGE = 1
ERR_FATAL = 2
ERR_ABORT = 3
ERR_UNKNOWN = 4
class ABFormatter(Formatter):
'''
Customer logging formater
'''
def __init__(self, fmt="[%(levelname)s] %(msg)s"):
Formatter.__init__(self, fmt)
def format(self, record):
format_orig = self._style._fmt
if record.levelno == INFO and getLogger(record.name).getEffectiveLevel() != DEBUG:
self._style._fmt = "%(msg)s"
result = Formatter.format(self, record)
self._style._fmt = format_orig
return result
class AURPackage(dict):
'''
Abstract AUR package action
'''
def __init__(self, name, timeout=None):
self.name = name
debug("getting %s aur infos" % self.name)
url = "%s/rpc.php?type=info&arg=%s" % (AUR_URL, name)
url_req = Request(url, headers={"User-Agent": USER_AGENT})
debug("Requesting url: %s" % url)
debug("Timeout is %s" % timeout)
url_fd = urlopen(url_req, timeout=timeout)
d = jloads(url_fd.read().decode("utf-8"))
if d["version"] != 1:
raise Exception("Unknown AUR Backend version")
self._info = d["results"]
def __getattr__(self, name):
for k, v in self._info.items():
if name == k.lower():
return v
raise AttributeError()
def __repr__(self):
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
def extract(self, path):
'''
Extract aur source tarball inside a directory path
'''
fo = urlopen('%s/%s' % (AUR_URL, self.urlpath))
tarball = tar(mode='r|*', fileobj=fo)
tarball.extractall(path)
fo.close()
class JsonDictFile(dict):
'''Json serialized dict'''
def __init__(self, path):
'''Load json file'''
assert(path is not None)
try:
self._fileobj = open(path, "a+")
except (IOError, OSError) as exp:
error("Unable to access to json file %s: %s" % (path, exp))
raise
if self._fileobj.seek(0, 1) == 0:
debug("Json file is empty")
else:
debug("Loading json file %s" % path)
try:
self._fileobj.seek(0, 0)
dico = jload(self._fileobj)
self.update(dico)
except Exception as exp:
error("Unable to load json file %s: %s" % (path, exp))
raise
'''Save current dict into a json file'''
if len(self) == 0:
debug("Not saved. Dict is empty")
return
if self._fileobj is not None:
debug("Saving dict into json file")
try:
self._fileobj.truncate(0)
except Exception as exp:
error("Unable to save json file: %s" % exp)
raise
def build(localpkg, aurpkg):
aurpkg.extract(build_dir.name)
cwd = getcwd()
try:
chdir("%s/%s" % (build_dir.name, aurpkg.name))
if localpkg.get("commit_cmd", None) is not None:
info("Starting commit command")
check_call(localpkg["commit_cmd"], shell=True, close_fds=True)
finally:
chdir(cwd)
def event_loop(packages, cache, timeout):
'''
program roundabout
'''
while True:
for name, config in packages.items():
if name == "DEFAULT":
continue
if config.get("build_cmd", None) is None:
error("Build_cmd is missing in config file")
continue
pkg = AURPackage(name)
# For security, if the maintainer has changed we pass
maintainer = config.get("maintainer", None)
debug("registered maintainer: %s" % maintainer)
debug("AUR maintainer: %s" % pkg.maintainer)
continue
# checks update
debug("Cache lastmodified: %s" % cache.get(name, 0))
debug("AUR lastmodified: %s" % pkg.lastmodified)
continue
# we save last successful build in cache
cache[name] = pkg.lastmodified
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
debug("waiting for %ds" % timeout)
sleep(timeout)
def parse_argv():
'''Parse command line arguments'''
# load parser
parser = ArgumentParser()
parser.add_argument("-p", "--packages-path", help="packages config file path")
parser.add_argument("-c", "--cache-path", help="cache file path")
parser.add_argument("-s", "--sleep", type=int, default=86400, help="sleep interval between checks")
parser.add_argument("-d", "--debug", action="store_true", help="debug mode")
# parse it!
args = parser.parse_args()
# set global debug mode
if args.debug:
getLogger().setLevel(DEBUG)
debug("debug on")
# set default paths
if args.packages_path is None:
args.packages_path = join(save_config_path(XDG_DIRECTORY), "packages.conf")
if args.cache_path is None:
args.cache_path = join(save_cache_path(XDG_DIRECTORY), "packages.cache")
return args
def main():
'''Program entry point'''
try:
# set logger config
hdlr = StreamHandler()
hdlr.setFormatter(ABFormatter())
getLogger().addHandler(hdlr)
getLogger().setLevel(INFO)
# parse command line
args = parse_argv()
# parse package list
packages = ConfigParser()
packages.read(args.packages_path)
# load cache
cache = JsonDictFile(args.cache_path)
# tell to systemd we are ready
notify("READY=1\n")
# while 42
event_loop(packages, cache, args.sleep)
except KeyboardInterrupt:
exit(ERR_ABORT)
# except BaseError as exp:
# error(exp)
# exit(ERR_FATAL)
except Exception as exp:
error(exp)
if getLogger().getEffectiveLevel() != DEBUG:
error("Unknown error. Please report it with --debug.")
else:
raise
exit(ERR_UNKNOWN)
if __name__ == '__main__':