Skip to content
aurbot 7.25 KiB
Newer Older
Seblu's avatar
Seblu committed
#!/usr/bin/python3
# coding: utf-8

# aurbot - Archlinux User Repository Builder Bot
# Copyright © 2014 Sébastien Luttringer
#
# Started, October 30th 2011
#
# This program is free software; you can redistribute it and/or
# modify it under the terms of the GNU General Public License
# as published by the Free Software Foundation; either version 2
# of the License, or (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program; if not, write to the Free Software
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.

from argparse import ArgumentParser
from configparser import ConfigParser
from json import load as jload, dump as jdump, loads as jloads
Seblu's avatar
Seblu committed
from logging import StreamHandler, getLogger, Formatter, DEBUG, INFO
from logging import debug, warning, info, error
Seblu's avatar
Seblu committed
from os import chdir, environ, getcwd
Seblu's avatar
Seblu committed
from os.path import join
from subprocess import check_call
from tarfile import open as tar
from tempfile import TemporaryDirectory
from time import sleep
from urllib.request import urlopen, Request
from xdg.BaseDirectory import save_config_path, save_cache_path
Seblu's avatar
Seblu committed
from systemd.daemon import notify
Seblu's avatar
Seblu committed

AUR_URL = 'https://aur.archlinux.org'
USER_AGENT = "aurbot"
XDG_DIRECTORY = "aurbot"

ERR_USAGE = 1
ERR_FATAL = 2
ERR_ABORT = 3
ERR_UNKNOWN = 4

Seblu's avatar
Seblu committed
class ABFormatter(Formatter):
	'''
	Customer logging formater
	'''
	def __init__(self, fmt="[%(levelname)s] %(msg)s"):
		Formatter.__init__(self, fmt)

	def format(self, record):
		format_orig = self._style._fmt
		if record.levelno == INFO and getLogger(record.name).getEffectiveLevel() != DEBUG:
			self._style._fmt =  "%(msg)s"
		result = Formatter.format(self, record)
		self._style._fmt = format_orig
		return result

Seblu's avatar
Seblu committed

class AURPackage(dict):
	'''
	Abstract AUR package action
	'''

	def __init__(self, name, timeout=None):
		self.name = name
		debug("getting %s aur infos" % self.name)
		url = "%s/rpc.php?type=info&arg=%s" % (AUR_URL, name)
		url_req = Request(url, headers={"User-Agent": USER_AGENT})
		debug("Requesting url: %s" % url)
		debug("Timeout is %s" % timeout)
		url_fd = urlopen(url_req, timeout=timeout)
		d = jloads(url_fd.read().decode("utf-8"))
		if d["version"] != 1:
			raise Exception("Unknown AUR Backend version")
		self._info = d["results"]

	def __getattr__(self, name):
		for k, v in self._info.items():
			if name == k.lower():
				return v
		raise AttributeError()

	def __repr__(self):
Seblu's avatar
Seblu committed
		return "%s %s" % (self.name, self.version)
Seblu's avatar
Seblu committed

	def extract(self, path):
		'''
		Extract aur source tarball inside a directory path
		'''
		fo = urlopen('%s/%s' % (AUR_URL, self.urlpath))
		tarball = tar(mode='r|*', fileobj=fo)
		tarball.extractall(path)
		fo.close()


class JsonDictFile(dict):
	'''Json serialized dict'''

	def __init__(self, path):
		'''Load json file'''
		assert(path is not None)
		try:
			self._fileobj = open(path, "a+")
		except (IOError, OSError) as exp:
			error("Unable to access to json file %s: %s" % (path, exp))
			raise
		if self._fileobj.seek(0, 1) == 0:
			debug("Json file is empty")
		else:
			debug("Loading json file %s" % path)
			try:
				self._fileobj.seek(0, 0)
				dico = jload(self._fileobj)
				self.update(dico)
			except Exception as exp:
				error("Unable to load json file %s: %s" % (path, exp))
				raise

Seblu's avatar
Seblu committed
	def save(self):
Seblu's avatar
Seblu committed
		'''Save current dict into a json file'''
		if len(self) == 0:
			debug("Not saved. Dict is empty")
			return
		if self._fileobj is not None:
			debug("Saving dict into json file")
			try:
				self._fileobj.truncate(0)
Seblu's avatar
Seblu committed
				self._fileobj.seek(0, 0)
Seblu's avatar
Seblu committed
				jdump(self, self._fileobj)
Seblu's avatar
Seblu committed
				self._fileobj.flush()
Seblu's avatar
Seblu committed
			except Exception as exp:
				error("Unable to save json file: %s" % exp)
				raise

def build(localpkg, aurpkg):
Seblu's avatar
Seblu committed
	info("Getting last AUR version")
Seblu's avatar
Seblu committed
	# find build dir
	build_dir = TemporaryDirectory()
Seblu's avatar
Seblu committed
	debug("Build dir is %s" % build_dir.name)
Seblu's avatar
Seblu committed
	# extract tarball
Seblu's avatar
Seblu committed
	debug("Extracting aur tarball")
Seblu's avatar
Seblu committed
	aurpkg.extract(build_dir.name)
	cwd = getcwd()
	try:
		chdir("%s/%s" % (build_dir.name, aurpkg.name))
Seblu's avatar
Seblu committed

		info("Starting build command")
Seblu's avatar
Seblu committed
		check_call(localpkg["build_cmd"], shell=True, close_fds=True)
Seblu's avatar
Seblu committed

		if localpkg.get("commit_cmd", None) is not None:
			info("Starting commit command")
			check_call(localpkg["commit_cmd"], shell=True, close_fds=True)
Seblu's avatar
Seblu committed
	finally:
		chdir(cwd)

def event_loop(packages, cache, timeout):
	'''
	program roundabout
	'''
	while True:
		for name, config in packages.items():
			if name == "DEFAULT":
				continue
Seblu's avatar
Seblu committed
			info("[%s]" % name)
			if config.get("build_cmd", None) is None:
				error("Build_cmd is missing in config file")
				continue
Seblu's avatar
Seblu committed
			pkg = AURPackage(name)
			# For security, if the maintainer has changed we pass
			maintainer = config.get("maintainer", None)
Seblu's avatar
Seblu committed
			if maintainer != pkg.maintainer:
Seblu's avatar
Seblu committed
				error("Invalid maintainer for package %s" % name)
Seblu's avatar
Seblu committed
				debug("registered maintainer: %s" % maintainer)
				debug("AUR maintainer: %s" % pkg.maintainer)
				continue
			# checks update
Seblu's avatar
Seblu committed
			debug("Cache lastmodified: %s" % cache.get(name, 0))
			debug("AUR lastmodified: %s" % pkg.lastmodified)
Seblu's avatar
Seblu committed
			if pkg.lastmodified <= cache.get(name, 0):
Seblu's avatar
Seblu committed
				info("No new version available")
Seblu's avatar
Seblu committed
				continue
			# package needs to be built and commited
			try:
Seblu's avatar
Seblu committed
				info("New version available: %s" % pkg.version)
				build(config, pkg)
Seblu's avatar
Seblu committed
			except Exception as exp:
Seblu's avatar
Seblu committed
				error(exp)
Seblu's avatar
Seblu committed
				continue
			# we save last successful build in cache
			cache[name] = pkg.lastmodified
Seblu's avatar
Seblu committed
			cache.save()
Seblu's avatar
Seblu committed
		# night is coming, save cache
Seblu's avatar
Seblu committed
		debug("waiting for %ds" % timeout)
		sleep(timeout)

def parse_argv():
	'''Parse command line arguments'''
	# load parser
	parser = ArgumentParser()
	parser.add_argument("-p", "--packages-path", help="packages config file path")
	parser.add_argument("-c", "--cache-path", help="cache file path")
	parser.add_argument("-s", "--sleep", type=int, default=86400, help="sleep interval between checks")
	parser.add_argument("-d", "--debug", action="store_true", help="debug mode")
	# parse it!
	args = parser.parse_args()
	# set global debug mode
	if args.debug:
		getLogger().setLevel(DEBUG)
	# set default paths
	if args.packages_path is None:
		args.packages_path = join(save_config_path(XDG_DIRECTORY), "packages.conf")
	if args.cache_path is None:
		args.cache_path = join(save_cache_path(XDG_DIRECTORY), "packages.cache")
	return args

def main():
	'''Program entry point'''
	try:
Seblu's avatar
Seblu committed
		# set logger config
		hdlr = StreamHandler()
		hdlr.setFormatter(ABFormatter())
		getLogger().addHandler(hdlr)
Seblu's avatar
Seblu committed
		getLogger().setLevel(DEBUG if "AURBOT_DEBUG" in environ else INFO)
Seblu's avatar
Seblu committed
		# parse command line
		args = parse_argv()
		# parse package list
		packages = ConfigParser()
		packages.read(args.packages_path)
		# load cache
		cache = JsonDictFile(args.cache_path)
Seblu's avatar
Seblu committed
		# tell to systemd we are ready
		notify("READY=1\n")
		# while 42
Seblu's avatar
Seblu committed
		event_loop(packages, cache, args.sleep)
	except KeyboardInterrupt:
		exit(ERR_ABORT)
	# except BaseError as exp:
	#     error(exp)
	#     exit(ERR_FATAL)
	except Exception as exp:
		error(exp)
		if getLogger().getEffectiveLevel() != DEBUG:
			error("Unknown error. Please report it with --debug.")
		else:
			raise
		exit(ERR_UNKNOWN)

if __name__ == '__main__':
Seblu's avatar
Seblu committed
	main()