Newer
Older
# coding: utf-8
# archversion - Archlinux Version Controller
# Copyright © 2012 Sébastien Luttringer
#
# This program is free software; you can redistribute it and/or
# modify it under the terms of the GNU General Public License
# as published by the Free Software Foundation; either version 2
# of the License, or (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program; if not, write to the Free Software
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
from archversion import HTTP_HEADERS, CONFIG_PACKAGES, CACHE_PACKAGES
from archversion.config import BaseConfigFile
from archversion.database import JsonDatabase
from archversion.error import InvalidConfigFile, VersionNotFound
from collections import OrderedDict
import sys
class VersionController(object):
'''
Handle version detection of packages
'''
self._cache = JsonDatabase()
self._cache.load(CACHE_PACKAGES)
if set(self._cache.keys()) != set(("downstream", "compare", "upstream")):
self._cache.clear()
self._cache["upstream"] = {}
self._cache["downstream"] = {}
self._cache["compare"] = {}
@property
def packages(self):
'''Return list of packages augmented with aliases'''
pkgs = []
for name, data in self._packages.items():
pkgs.append(name)
pkgs += self.alias(data)
return pkgs
@property
def versions(self):
'''Return upstream versions of a package (use cache)'''
ver = OrderedDict()
for name, v_upstream, v_downstream in self.compare():
ver[name] = (v_upstream, v_downstream)
return ver
def select(self, packages):
'''
Remove packages not listed in packages from the processing of
controller future actions
'''
packages = set(packages)
for name, data in OrderedDict(self._packages).items():
names = set((name,)) | set(self.alias(data))
if len(packages & names) == 0:
self._packages.pop(name, None)
def sort(self):
'''
Sort packages by name
Make packages to be upgraded/displayed by alpha order
# do not sort self._cache by recreating the cache object
# destructor is used to save the cache content
def sync(self):
'''
Synchronise local cache with external states
Retrieve upstream and downstream versions and store them
'''
for name, value in self._packages.items():
try:
logging.debug("Syncing versions of package %s" % name)
# get upstream version
v_upstream = self.get_version_upstream(name, value)
# apply eval to upstream
e_upstream = value.get("eval_upstream", None)
if e_upstream is not None:
v_upstream = eval(e_upstream, {"re": re}, {"version": v_upstream})
logging.debug("eval_upstream produce version: %s" % v_upstream)
# save upstream version
if self._cache["upstream"].get(name, {}).get("version", None) != v_upstream:
logging.debug("caching upstream version %s" % v_upstream)
self._cache["upstream"][name] = {"version": v_upstream, "epoch": int(time())}
else:
logging.debug("already cached upstream version %s" % v_upstream)
# get downstream mode
mode = value.get("downstream", None)
if mode is None:
logging.warning("%s: Invalid downstream mode: %s." % (name, mode))
continue
# get downstream version
v_downstream = self.get_version_downstream(name, value, mode)
# apply eval to downstream
e_downstream = value.get("eval_downstream", None)
if e_downstream is not None:
v_downstream = eval(e_downstream, {"re": re}, {"version": v_downstream})
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
logging.debug("eval_downstream produce version: %s" % v_downstream)
# save downstream version
if self._cache["downstream"].get(name, {}).get("version", None) != v_downstream:
logging.debug("caching downstream version %s" % v_downstream)
self._cache["downstream"][name] = {"version": v_downstream, "epoch": int(time())}
else:
logging.debug("already cached downstream version %s" % v_downstream)
except Exception as exp:
logging.error("Sync of %s: %s" % (name, exp))
def compare(self, only_new=False, only_fresh=False):
'''
Compare versions according compare mode
Return an iterator over all packages and their aliases with
upstream and downstream versions.
'''
for name, value in self._packages.items():
logging.debug("Comparing versions of package %s" % name)
# get upstream in cache
v_upstream = self._cache["upstream"].get(name, {}).get("version", None)
if v_upstream is None:
logging.warning("%s: Upstream version not found in cache" % name)
continue
# get downstream in cache
v_downstream = self._cache["downstream"].get(name, {}).get("version", None)
if v_downstream is None:
logging.warning("%s: Downstream version not found in cache" % name)
continue
# only new version mode
if only_new and v_upstream == v_downstream:
logging.debug("%s: skipped by only new mode" % name)
continue
# only fresh version mode
if only_fresh:
last_cmp = self._cache["compare"].get(name, -1)
last_up = self._cache["upstream"].get(name, {}).get("epoch", 0)
last_down = self._cache["downstream"].get(name, {}).get("epoch", 0)
if (last_cmp >= last_up and last_cmp >= last_down):
logging.debug("%s: skipped by only fresh mode" % name)
continue
# save our compare in cache
self._cache["compare"][name] = int(time())
# gen main pacakge
yield (name, v_upstream, v_downstream)
# gen aliases package
for alias in self.alias(value):
yield (alias, v_upstream, v_downstream)
@staticmethod
def alias(pkg):
'''Return the list of aliases of a package'''
return [ al for al in pkg.get("alias", "").split(" ") if al != "" ]
@staticmethod
def sort_dict(larousse):
'''Sort a dictionary into and OrderedDict'''
return OrderedDict(sorted(larousse.items(), key=lambda t: t[0]))
@staticmethod
def get_version_upstream(name, value):
logging.debug("Get upstream version")
# check upstream param
if "url" not in value:
logging.error("No url specified for %s" % name)
raise InvalidConfigFile("Missing url in config file")
url = value["url"]
regex = value.get("regex", "%s[-_]v?(%s)%s" % (
value.get("regex_name", name),
value.get("regex_version", "[-.\w]+"),
value.get("regex_ext",
"\.(?:tar(?:\.gz|\.bz2|\.xz)?|tgz|tbz2|zip)")))
# retrieve config timeout
timeout = float(value["timeout"]) if "timeout" in value else None
# do it retry time + 1
ntry = int(value.get("retry", 0)) + 1
for n in range(1, ntry + 1):
try:
logging.debug("Requesting url: %s (try %d/%d)" % (url, n, ntry))
url_fd = urlopen(url_req, timeout=timeout)
logging.debug("Version regex: %s" % regex)
v = re.findall(regex, url_fd.read().decode("utf-8", "ignore"))
if v is None or len(v) == 0:
raise VersionNotFound("No regex match on upstream")
# remove duplicity
# list all found versions
logging.debug("Found versions: %s" % v)
regex_exclude = value.get("regex_exclude", ".*(rc|beta|alpha|pre).*")
if regex_exclude != "":
logging.debug("Exclusion regex: %s" % regex_exclude)
v -= set(filter(lambda x: re.search(regex_exclude, x), v))
logging.debug("Found versions after exclusion: %s" % v)
# latest version is the highest
v = max(v, key=VersionKey)
# list selected version
logging.debug("Upstream version is : %s" % v)
return v
except Exception as exp:
if n == ntry:
raise VersionNotFound("Upstream check failed: %s" % exp)
def get_version_downstream(name, value, mode):
'''Return dowstream version'''
try:
return getattr(VersionController, "get_version_downstream_%s" % mode)(name, value)
except AttributeError:
raise InvalidConfigFile("Invalid dowstream mode")
@staticmethod
def get_version_downstream_pacman(name, value):
'''Return pacman version'''
logging.debug("Get pacman version")
# Load pacman
allowed_repos = value.get("repo").split(",") if "repo" in value else None
db, pkg = pacman.find_pkg(name, allowed_repos)
if pkg is not None:
epoch, pkgver, pkgrel = re.match("^(?:(\d+)\:)?([^-:]*)(?:-(\d+))?",
pkg.version).groups()
logging.debug("pacman version in %s: %s" % (db.name, pkgver))
return pkgver
'''Return archweb version'''
logging.debug("Get archweb version")
# if arch is specified
archs = value.get("arch", "x86_64,i686,any").split(",")
repos = value.get("repo",
"community-testing,community,testing,extra,core"
).split(",")
# retrieve config timeout
timeout = float(value["timeout"]) if "timeout" in value else None
for arch in archs:
for repo in repos:
url = "http://www.archlinux.org/packages/%s/%s/%s/json" % (
repo, arch, name)
logging.debug("Archweb version is : %s" % v)
logging.debug("Archweb check failed: %s" % exp)
raise VersionNotFound("No Archweb package found")
'''Return archlinux user repository version'''
logging.debug("Get AUR version")
try:
# retrieve config timeout
timeout = float(value["timeout"]) if "timeout" in value else None
url = "http://aur.archlinux.org/rpc.php?type=info&arg=%s" % name
if "version" not in d or d["version"] != 1:
raise VersionNotFound("Unsupported AUR version")
if len(d["results"]) == 0:
raise VersionNotFound("No such package")
v = d["results"]["Version"].rsplit("-")[0]
logging.debug("AUR version is : %s" % v)
return v
except Exception as exp:
raise VersionNotFound("AUR check failed: %s" % exp)
assert(False)
'''Return abs version'''
logging.debug("Get ABS version")
# Get ABS tree path
abspath = value.get("abs_path", "/var/abs")
# Map db and name
repos = [d for d in os.listdir(abspath)
if os.path.isdir(os.path.join(abspath, d))]
# filter if repo is provided
if "repo" in value:
allowed_repos = value.get("repo").split(",")
for r in list(repos):
if r not in allowed_repos:
repos.remove(r)
# looking into db for package name
for repo in repos:
logging.debug("Looking into directory %s" % repo)
repopath = os.path.join(abspath, repo)
packages = [d for d in os.listdir(repopath)]
if name in packages:
pkgpath = os.path.join(repopath, name, "PKGBUILD")
if os.path.isfile(pkgpath):
# use bash to export vars.
# WARNING: CODE IS EXECUTED
pkgdict = parse_pkgbuild(pkgpath)
if "pkgver" in pkgdict:
v = pkgdict["pkgver"]
logging.debug("ABS version is : %s" % v)
return v
raise VersionNotFound("No ABS package found")
def get_version_downstream_none(name, value):
'''Return none version'''
return ""
for mode in fnmatch.filter(dir(VersionController), "get_version_downstream_*"):
print(mode[23:])
def print_versions(self, only_new=False, only_fresh=False):
for name, v_upstream, v_downstream in self.compare(only_new, only_fresh):
self.print_version(name, v_upstream, v_downstream)
def print_version(self, name, v1, v2=None):
'''Handle printing of 2 versions'''
# define used color
c_blue = c_white = c_yellow = c_compare = c_reset = ''
if sys.stdout.isatty():
if v2 is None: c_compare = '\033[1;33m'
elif v1 == v2: c_compare = '\033[1;32m'
else: c_compare = '\033[1;31m'
c_blue = '\033[1;34m'
c_white = '\033[1;37m'
c_yellow = '\033[1;33m'
c_reset = '\033[m'
# print package name
toprint = "%s[%s%s%s]" % (c_blue, c_white, name, c_blue)
# print upstream
toprint += " %sup: %s " % (c_yellow, v1)
origin = self._packages.get(name,{}).get("downstream", "downstream")
toprint += " %s%s: %s" % (c_compare, origin, v2)
toprint += c_reset
print(toprint)
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
class VersionKey(object):
'''Sorting key of a version string'''
def __init__(self, vstring):
self.vstring = vstring
self.vlist = re.findall("([0-9]+|[a-zA-Z]+)", vstring)
def __repr__(self):
return "%s ('%s')" % (self.__class__.__name__, self.vstring)
def __str__(self):
return self.vstring
def __eq__(self, other):
return self.vstring == other.vstring
def __ne__(self, other):
return self.vstring != other.vstring
def __lt__(self, other):
try:
for i, a in enumerate(self.vlist):
b = other.vlist[i]
if a.isdigit() and b.isdigit():
a = int(a)
b = int(b)
if a < b:
return True
elif a > b:
return False
elif a.isdigit():
return False
elif b.isdigit():
return True
else:
if a < b:
return True
elif a > b:
return False
except IndexError:
return False
return True
def __gt__(self, other):
return not self.__lt__(other) and not self.__eq__(other)
def __le__(self, other):
return not self.__gt__(other)
def __ge__(self, other):
return not self.__lt__(other)