Newer
Older
# coding: utf-8
# archversion - Archlinux Version Controller
# Copyright © 2012 Sébastien Luttringer
#
# This program is free software; you can redistribute it and/or
# modify it under the terms of the GNU General Public License
# as published by the Free Software Foundation; either version 2
# of the License, or (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program; if not, write to the Free Software
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
from archversion.pacman import parse_pkgbuild, Pacman
from archversion.error import InvalidConfigFile, VersionNotFound
from collections import OrderedDict
import sys
class VersionController(object):
'''
Handle version detection of packages
'''
def __init__(self, packages, cache):
self.packages = packages
# set cache
if set(cache.keys()) != set(("downstream", "report", "upstream")):
logging.debug("Invalid cache, purging it")
cache.clear()
cache["upstream"] = {}
cache["downstream"] = {}
cache["report"] = {}
def reduce_packages(self, packages):
'''Keep only the give packages list'''
for pkg in list(self.packages):
if pkg not in packages:
self.packages.pop(pkg, None)
def sort_packages(self):
'''Sort package list by name'''
self.packages = self.sort_dict(self.packages)
def sort_cache(self):
'''Sort package list by name'''
self.cache = self.sort_dict(self.cache)
@staticmethod
def sort_dict(larousse):
'''Sort a dictionary into and OrderedDict'''
return OrderedDict(sorted(larousse.items(), key=lambda t: t[0]))
@staticmethod
def get_version_upstream(name, value):
'''Return upstream version'''
logging.debug("Get upstream version")
# check upstream param
if "url" not in value:
logging.error("No url specified for %s" % name)
raise InvalidConfigFile("Missing url in config file")
url = value["url"]
regex = value.get("regex", "%s[-_]v?(%s)%s" % (
value.get("regex_name", name),
value.get("regex_version", "[-.\w]+"),
value.get("regex_ext",
"\.(?:tar(?:\.gz|\.bz2|\.xz)?|tgz|tbz2|zip)")))
# retrieve config timeout
timeout = float(value.get("timeout", None))
# do it retry time + 1
ntry = int(value.get("retry", 0)) + 1
for n in range(1, ntry + 1):
try:
logging.debug("Requesting url: %s (try %d/%d)" % (url, n, ntry))
logging.debug("Timeout is %f" % timeout)
url_req = Request(url, headers={"User-Agent": USER_AGENT})
url_fd = urlopen(url_req, timeout=timeout)
logging.debug("Version regex: %s" % regex)
v = re.findall(regex, url_fd.read().decode("utf-8"))
if v is None or len(v) == 0:
raise VersionNotFound("No regex match on upstream")
# remove duplicity
# list all found versions
logging.debug("Found versions: %s" % v)
regex_exclude = value.get("regex_exclude", ".*(rc|beta|alpha).*")
if regex_exclude != "":
logging.debug("Exclusion regex: %s" % regex_exclude)
v -= set(filter(lambda x: re.match(regex_exclude, x), v))
logging.debug("Found versions after exclusion: %s" % v)
# latest version is the highest
v = max(v, key=VersionKey)
# list selected version
logging.debug("Upstream version is : %s" % v)
return v
except Exception as exp:
if n == ntry:
raise VersionNotFound("Upstream check failed: %s" % exp)
def get_version_downstream(name, value, mode):
'''Return dowstream version'''
try:
return getattr(VersionController, "get_version_downstream_%s" % mode)(name, value)
except AttributeError:
raise InvalidConfigFile("Invalid dowstream mode")
@staticmethod
def get_version_downstream_pacman(name, value):
'''Return pacman version'''
logging.debug("Get pacman version")
# Load pacman
allowed_repos = value.get("repo").split(",") if "repo" in value else None
db, pkg = pacman.find_pkg(name, allowed_repos)
if pkg is not None:
epoch, pkgver, pkgrel = re.match("^(?:(\d+)\:)?([^-:]*)(?:-(\d+))?",
pkg.version).groups()
logging.debug("pacman version in %s: %s" % (db.name, pkgver))
return pkgver
'''Return archweb version'''
logging.debug("Get archweb version")
# if arch is specified
archs = value.get("arch", "x86_64,i686,any").split(",")
repos = value.get("repo",
"community-testing,community,testing,extra,core"
).split(",")
# retrieve config timeout
timeout = float(value.get("timeout", None))
for arch in archs:
for repo in repos:
url = "http://www.archlinux.org/packages/%s/%s/%s/json" % (
repo, arch, name)
url_req = Request(url, headers={"User-Agent": USER_AGENT})
logging.debug("Requesting url: %s" % url)
logging.debug("Timeout is %f" % timeout)
try:
url_fd = urlopen(url_req, timeout=timeout)
d = json.loads(url_fd.read().decode("utf-8"))
v = d["pkgver"]
logging.debug("Archweb version is : %s" % v)
logging.debug("Archweb check failed: %s" % exp)
raise VersionNotFound("No Archweb package found")
'''Return archlinux user repository version'''
logging.debug("Get AUR version")
try:
# retrieve config timeout
timeout = float(value.get("timeout", None))
url = "http://aur.archlinux.org/rpc.php?type=info&arg=%s" % name
url_req = Request(url, headers={"User-Agent": USER_AGENT})
logging.debug("Requesting url: %s" % url)
logging.debug("Timeout is %f" % timeout)
url_fd = urlopen(url_req, timeout=timeout)
d = json.loads(url_fd.read().decode("utf-8"))
v = d["results"]["Version"].rsplit("-")[0]
logging.debug("AUR version is : %s" % v)
return v
except Exception as exp:
raise VersionNotFound("AUR check failed: %s" % exp)
assert(False)
'''Return abs version'''
logging.debug("Get ABS version")
# Get ABS tree path
abspath = value.get("abs_path", "/var/abs")
# Map db and name
repos = [d for d in os.listdir(abspath)
if os.path.isdir(os.path.join(abspath, d))]
# filter if repo is provided
if "repo" in value:
allowed_repos = value.get("repo").split(",")
for r in list(repos):
if r not in allowed_repos:
repos.remove(r)
# looking into db for package name
for repo in repos:
logging.debug("Looking into directory %s" % repo)
repopath = os.path.join(abspath, repo)
packages = [d for d in os.listdir(repopath)]
if name in packages:
pkgpath = os.path.join(repopath, name, "PKGBUILD")
if os.path.isfile(pkgpath):
# use bash to export vars.
# WARNING: CODE IS EXECUTED
pkgdict = parse_pkgbuild(pkgpath)
if "pkgver" in pkgdict:
v = pkgdict["pkgver"]
logging.debug("ABS version is : %s" % v)
return v
raise VersionNotFound("No ABS package found")
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
def get_version_downstream_none(name, value):
'''Return none version'''
return ""
def sync_packages(self):
'''
Retrieve upstream and downstream versions and store them in cache
'''
try:
for name, value in self.packages.items():
logging.debug("Syncing versions of package %s" % name)
# get upstream version
v_upstream = self.get_version_upstream(name, value)
# apply eval to upstream
e_upstream = value.get("eval_upstream", None)
if e_upstream is not None:
v_upstream = eval(e_upstream, {}, {"version": v_upstream})
logging.debug("eval_upstream produce version: %s" % v_upstream)
# save upstream version
if v_upstream is not None:
self.cache["upstream"][name] = {"version": v_upstream, "epoch": int(time())}
else:
logging.warning("%s: Upstream version not found." % name)
# get downstream mode
mode = value.get("downstream", None)
if mode is None:
logging.warning("%s: Invalid downstream mode: %s." % (name, mode))
continue
# get downstream version
v_downstream = self.get_version_downstream(name, value, mode)
# apply eval to downstream
e_compare = value.get("eval_downstream", None)
if e_compare is not None:
v_compare = eval(e_compare, {}, {"version": v_compare})
logging.debug("eval_downstream produce version: %s" % v_downstream)
# save downstream version
if v_downstream is not None:
self.cache["downstream"][name] = {"version": v_downstream, "epoch": int(time())}
else:
logging.warning("%s: Downstream version not found." % name)
finally:
self.cache.save()
def check_versions(self, only_new=False, not_in_cache=False):
'''
Check versions against according to compare mode
Return a generator!
'''
logging.debug("Checking versions of package %s" % name)
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
try:
# get compare mode
compare = value.get("compare", None)
if compare is None:
raise InvalidConfigFile("No defined compare mode")
if compare not in self.compare_table:
raise InvalidConfigFile("Invalid compare mode")
# get upstream version
v_upstream = self.get_version_upstream(name, value)
# apply eval to upstream
e_upstream = value.get("eval_upstream", None)
if e_upstream is not None:
v_upstream = eval(e_upstream, {}, {"version": v_upstream})
logging.debug("eval_upstream produce version: %s" %
v_upstream)
# check upstream validity
if v_upstream is None:
raise VersionNotFound("Upstream")
# get cached version
v_cache = self.cache.get(name, None)
# only not in cache mode
if not_in_cache and v_cache == v_upstream:
logging.debug("%s: skipped by not in cache mode" % name)
continue
# get compared version
v_compare = self.compare_table[compare](name, value)
# apply eval to compared
e_compare = value.get("eval_compare", None)
if e_compare is not None:
v_compare = eval(e_compare, {}, {"version": v_compare})
logging.debug("eval_compare produce version: %s" %
v_compare)
# save version to cache after getting compared version
# to avoid interfering with cache mode
self.cache[name] = v_upstream
# only new version mode
if only_new and (v_compare is None or v_upstream == v_compare):
logging.debug("%s: skipped by only new mode" % name)
continue
yield (name, v_upstream, v_compare)
except VersionNotFound as exp:
logging.warning("%s: Version not found: %s" % (name, exp))
logging.warning("%s: Invalid configuration: %s" % (name, exp))
def print_names(self, cached=False):
'''Print packages name'''
for name in self.packages.keys():
if cached:
print("%s : %s" % (name, self.cache.get(name, "Unknow")))
else:
print(name)
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
def print_modes(self):
'''Print comparaison modes'''
for name in sorted(self.compare_table.keys()):
print(name)
def print_versions(self, only_new=False, not_in_cache=False):
'''Print versions'''
for name, v_upstream, v_compare in self.check_versions(only_new,
not_in_cache):
self.print_version(name, v_upstream, v_compare)
def print_version(self, name, v1, v2=None):
'''Handle printing of 2 versions'''
# define used color
c_blue = c_white = c_yellow = c_compare = c_reset = ''
if sys.stdout.isatty():
if v2 is None: c_compare = '\033[1;33m'
elif v1 == v2: c_compare = '\033[1;32m'
else: c_compare = '\033[1;31m'
c_blue = '\033[1;34m'
c_white = '\033[1;37m'
c_yellow = '\033[1;33m'
c_reset = '\033[m'
# print package name
toprint = "%s[%s%s%s]" % (c_blue, c_white, name, c_blue)
# print upstream
toprint += " %sup: %s " % (c_yellow, v1)
if v2 is not None:
# print separator
toprint += "%s|" % c_blue
origin = self.packages.get(name,{}).get("compare", "other")
toprint += " %s%s: %s" % (c_compare, origin, v2)
toprint += c_reset
print(toprint)
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
class VersionKey(object):
'''Sorting key of a version string'''
def __init__(self, vstring):
self.vstring = vstring
self.vlist = re.findall("([0-9]+|[a-zA-Z]+)", vstring)
def __repr__(self):
return "%s ('%s')" % (self.__class__.__name__, self.vstring)
def __str__(self):
return self.vstring
def __eq__(self, other):
return self.vstring == other.vstring
def __ne__(self, other):
return self.vstring != other.vstring
def __lt__(self, other):
try:
for i, a in enumerate(self.vlist):
b = other.vlist[i]
if a.isdigit() and b.isdigit():
a = int(a)
b = int(b)
if a < b:
return True
elif a > b:
return False
elif a.isdigit():
return False
elif b.isdigit():
return True
else:
if a < b:
return True
elif a > b:
return False
except IndexError:
return False
return True
def __gt__(self, other):
return not self.__lt__(other) and not self.__eq__(other)
def __le__(self, other):
return not self.__gt__(other)
def __ge__(self, other):
return not self.__lt__(other)