Newer
Older
#!/usr/bin/python3
# coding: utf-8
# aurbot - Archlinux User Repository Builder Bot
# Copyright © 2014 Sébastien Luttringer
#
# Started, October 30th 2011
#
# This program is free software; you can redistribute it and/or
# modify it under the terms of the GNU General Public License
# as published by the Free Software Foundation; either version 2
# of the License, or (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program; if not, write to the Free Software
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
from argparse import ArgumentParser
from configparser import ConfigParser
from json import load as jload, dump as jdump, loads as jloads
from logging import getLogger, debug, warning, error, DEBUG
from os import getcwd, chdir
from os.path import join
from subprocess import check_call
from tarfile import open as tar
from tempfile import TemporaryDirectory
from time import sleep
from urllib.request import urlopen, Request
from xdg.BaseDirectory import save_config_path, save_cache_path
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
AUR_URL = 'https://aur.archlinux.org'
USER_AGENT = "aurbot"
XDG_DIRECTORY = "aurbot"
ERR_USAGE = 1
ERR_FATAL = 2
ERR_ABORT = 3
ERR_UNKNOWN = 4
class AURPackage(dict):
'''
Abstract AUR package action
'''
def __init__(self, name, timeout=None):
self.name = name
debug("getting %s aur infos" % self.name)
url = "%s/rpc.php?type=info&arg=%s" % (AUR_URL, name)
url_req = Request(url, headers={"User-Agent": USER_AGENT})
debug("Requesting url: %s" % url)
debug("Timeout is %s" % timeout)
url_fd = urlopen(url_req, timeout=timeout)
d = jloads(url_fd.read().decode("utf-8"))
if d["version"] != 1:
raise Exception("Unknown AUR Backend version")
self._info = d["results"]
def __getattr__(self, name):
for k, v in self._info.items():
if name == k.lower():
return v
raise AttributeError()
def __repr__(self):
return "%s v%s (%s)" % (self.name, self.version, self.description)
def extract(self, path):
'''
Extract aur source tarball inside a directory path
'''
fo = urlopen('%s/%s' % (AUR_URL, self.urlpath))
tarball = tar(mode='r|*', fileobj=fo)
tarball.extractall(path)
fo.close()
class JsonDictFile(dict):
'''Json serialized dict'''
def __init__(self, path):
'''Load json file'''
assert(path is not None)
try:
self._fileobj = open(path, "a+")
except (IOError, OSError) as exp:
error("Unable to access to json file %s: %s" % (path, exp))
raise
if self._fileobj.seek(0, 1) == 0:
debug("Json file is empty")
else:
debug("Loading json file %s" % path)
try:
self._fileobj.seek(0, 0)
dico = jload(self._fileobj)
self.update(dico)
except Exception as exp:
error("Unable to load json file %s: %s" % (path, exp))
raise
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
'''Save current dict into a json file'''
if len(self) == 0:
debug("Not saved. Dict is empty")
return
if self._fileobj is not None:
debug("Saving dict into json file")
try:
self._fileobj.seek(0, 0)
self._fileobj.truncate(0)
jdump(self, self._fileobj)
except Exception as exp:
error("Unable to save json file: %s" % exp)
raise
def build(localpkg, aurpkg):
debug("we build %s" % aurpkg.name)
# find build dir
build_dir = TemporaryDirectory()
debug("build dir is %s" % build_dir.name)
# extract tarball
debug("extracting aur tarball")
aurpkg.extract(build_dir.name)
cwd = getcwd()
try:
chdir("%s/%s" % (build_dir.name, aurpkg.name))
check_call(localpkg["build_cmd"], shell=True, close_fds=True)
# build was succesful
check_call(localpkg["commit_cmd"], shell=True, close_fds=True)
finally:
chdir(cwd)
def event_loop(packages, cache, timeout):
'''
program roundabout
'''
while True:
for name in packages.sections():
debug("Checking %s" % name)
pkg = AURPackage(name)
print(pkg)
# For security, if the maintainer has changed we pass
maintainer = packages[name].get("maintainer", None)
if maintainer != pkg.maintainer:
warning("Invalid maintainer for package %s" % name)
debug("registered maintainer: %s" % maintainer)
debug("AUR maintainer: %s" % pkg.maintainer)
continue
# checks update
if pkg.lastmodified <= cache.get(name, 0):
debug("%s was already built" % name)
continue
# package needs to be built and commited
try:
build(packages[name], pkg)
except Exception as exp:
warning("chiche: %s" % exp)
continue
# we save last successful build in cache
cache[name] = pkg.lastmodified
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
debug("waiting for %ds" % timeout)
sleep(timeout)
def parse_argv():
'''Parse command line arguments'''
# load parser
parser = ArgumentParser()
parser.add_argument("-p", "--packages-path", help="packages config file path")
parser.add_argument("-c", "--cache-path", help="cache file path")
parser.add_argument("-s", "--sleep", type=int, default=86400, help="sleep interval between checks")
parser.add_argument("-d", "--debug", action="store_true", help="debug mode")
# parse it!
args = parser.parse_args()
# set global debug mode
if args.debug:
getLogger().setLevel(DEBUG)
debug("debug on")
# set default paths
if args.packages_path is None:
args.packages_path = join(save_config_path(XDG_DIRECTORY), "packages.conf")
if args.cache_path is None:
args.cache_path = join(save_cache_path(XDG_DIRECTORY), "packages.cache")
return args
def main():
'''Program entry point'''
try:
# parse command line
args = parse_argv()
# parse package list
packages = ConfigParser()
packages.read(args.packages_path)
# load cache
cache = JsonDictFile(args.cache_path)
# tell to systemd we are ready
notify("READY=1\n")
# while 42
event_loop(packages, cache, args.sleep)
except KeyboardInterrupt:
exit(ERR_ABORT)
# except BaseError as exp:
# error(exp)
# exit(ERR_FATAL)
except Exception as exp:
error(exp)
if getLogger().getEffectiveLevel() != DEBUG:
error("Unknown error. Please report it with --debug.")
else:
raise
exit(ERR_UNKNOWN)
if __name__ == '__main__':