Skip to content
version.py 14.2 KiB
Newer Older
Seblu's avatar
Seblu committed
# coding: utf-8

# archversion - Archlinux Version Controller
# Copyright © 2012 Sébastien Luttringer
#
# This program is free software; you can redistribute it and/or
# modify it under the terms of the GNU General Public License
# as published by the Free Software Foundation; either version 2
# of the License, or (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program; if not, write to the Free Software
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.

'''Version Module'''
Seblu's avatar
Seblu committed


from archversion import USER_AGENT
Seblu's avatar
Seblu committed
from archversion.pacman import parse_pkgbuild, Pacman
Seblu's avatar
Seblu committed
from archversion.error import InvalidConfigFile, VersionNotFound
from collections import OrderedDict
Seblu's avatar
Seblu committed
from urllib.request import urlopen, Request
import json
import logging
Seblu's avatar
Seblu committed
import os
Seblu's avatar
Seblu committed
import re
Seblu's avatar
Seblu committed
import subprocess
Seblu's avatar
Seblu committed
import sys

class VersionController(object):
    '''
    Handle version detection of packages
    '''

    def __init__(self, packages, cache):
        self.packages = packages
        # set cache
        if cache is None:
            cache = {}
        self.cache = cache
        # populate compare table
        # need to be done manually to avoid get_upstream to be in
        self.compare_table = {
Seblu's avatar
Seblu committed
            "pacman": self.get_version_pacman,
            "archweb": self.get_version_archweb,
Seblu's avatar
Seblu committed
            "aur": self.get_version_aur,
Seblu's avatar
Seblu committed
            "abs": self.get_version_abs,
Seblu's avatar
Seblu committed
            "cache": self.get_version_cache,
            "none": self.get_version_none
            }

    def reduce_packages(self, packages):
        '''Keep only the give packages list'''
        for pkg in list(self.packages):
            if pkg not in packages:
                self.packages.pop(pkg, None)

    def sort_packages(self):
        '''Sort package list by name'''
        self.packages = self.sort_dict(self.packages)

    def sort_cache(self):
        '''Sort package list by name'''
        self.cache = self.sort_dict(self.cache)

    @staticmethod
    def sort_dict(larousse):
        '''Sort a dictionary into and OrderedDict'''
        return OrderedDict(sorted(larousse.items(), key=lambda t: t[0]))

Seblu's avatar
Seblu committed
    @staticmethod
    def get_version_upstream(name, value):
        '''Return upstream version'''
        logging.debug("Get upstream version")
        # check upstream param
        if "url" not in value:
            logging.error("No url specified for %s" % name)
            raise InvalidConfigFile("Missing url in config file")
        url = value["url"]
        regex = value.get("regex", "%s[-_]v?(%s)%s" % (
                    value.get("regex_name", name),
                    value.get("regex_version", "[-.\w]+"),
                    value.get("regex_ext",
                              "\.(?:tar(?:\.gz|\.bz2|\.xz)?|tgz|tbz2|zip)")))
        # retrieve config timeout
        timeout = float(value.get("timeout", None))
Seblu's avatar
Seblu committed
        # do it retry time + 1
        ntry = int(value.get("retry", 0)) + 1
Seblu's avatar
Seblu committed
        # do the job
Seblu's avatar
Seblu committed
        for n in range(1, ntry + 1):
            try:
                logging.debug("Requesting url: %s (try %d/%d)" % (url, n, ntry))
                logging.debug("Timeout is %f" % timeout)
                url_req = Request(url, headers={"User-Agent": USER_AGENT})
                url_fd = urlopen(url_req, timeout=timeout)
                logging.debug("Version regex: %s" % regex)
                v = re.findall(regex, url_fd.read().decode("utf-8"))
                if v is None or len(v) == 0:
                    raise VersionNotFound("No regex match on upstream")
                # remove duplicity
                v = list(set(v))
                # list all found versions
                logging.debug("Found versions: %s" % v)
                v = max(v, key=VersionKey)
                # list selected version
                logging.debug("Upstream version is : %s" % v)
                return v
            except Exception as exp:
                if n == ntry:
                    raise VersionNotFound("Upstream check failed: %s" % exp)
Seblu's avatar
Seblu committed
        assert(False)

Seblu's avatar
Seblu committed
    @staticmethod
    def get_version_pacman(name, value):
        '''Return pacman version'''
        logging.debug("Get pacman version")
        # Load pacman
Seblu's avatar
Seblu committed
        pacman = Pacman()
Seblu's avatar
Seblu committed
        # filter if repo is provided
Seblu's avatar
Seblu committed
        allowed_repos = value.get("repo").split(",") if "repo" in value else None
Seblu's avatar
Seblu committed
        # looking into db for package name
Seblu's avatar
Seblu committed
        db, pkg = pacman.find_pkg(name, allowed_repos)
        if pkg is not None:
            v = pkg.version.rsplit("-")[0]
            logging.debug("pacman version in %s: %s" % (db.name, v))
            return v
Seblu's avatar
Seblu committed
        raise VersionNotFound("No pacman package found")

Seblu's avatar
Seblu committed
    @staticmethod
    def get_version_archweb(name, value):
        '''Return archweb version'''
        logging.debug("Get archweb version")
Seblu's avatar
Seblu committed
        # if arch is specified
        archs = value.get("arch", "x86_64,i686,any").split(",")
        # if archweb repository is specified
Seblu's avatar
Seblu committed
        repos = value.get("repo",
                          "community-testing,community,testing,extra,core"
                          ).split(",")
        # retrieve config timeout
        timeout = float(value.get("timeout", None))
        for arch in archs:
            for repo in repos:
                url = "http://www.archlinux.org/packages/%s/%s/%s/json" % (
                    repo, arch, name)
                url_req = Request(url, headers={"User-Agent": USER_AGENT})
                logging.debug("Requesting url: %s" % url)
                logging.debug("Timeout is %f" % timeout)
                try:
                    url_fd = urlopen(url_req, timeout=timeout)
                    d = json.loads(url_fd.read().decode("utf-8"))
                    v = d["pkgver"]
                    logging.debug("Archweb version is : %s" % v)
Seblu's avatar
Seblu committed
                    return v
                except Exception as exp:
                    logging.debug("Archweb check failed: %s" % exp)
        raise VersionNotFound("No Archweb package found")
Seblu's avatar
Seblu committed

    @staticmethod
    def get_version_aur(name, value):
        '''Return archlinux user repository version'''
        logging.debug("Get AUR version")
        try:
            # retrieve config timeout
            timeout = float(value.get("timeout", None))
Seblu's avatar
Seblu committed
            url = "http://aur.archlinux.org/rpc.php?type=info&arg=%s" % name
Seblu's avatar
Seblu committed
            url_req = Request(url, headers={"User-Agent": USER_AGENT})
            logging.debug("Requesting url: %s" % url)
            logging.debug("Timeout is %f" % timeout)
            url_fd = urlopen(url_req, timeout=timeout)
            d = json.loads(url_fd.read().decode("utf-8"))
            v = d["results"]["Version"].rsplit("-")[0]
            logging.debug("AUR version is : %s" % v)
            return v
        except Exception as exp:
            raise VersionNotFound("AUR check failed: %s" % exp)
        assert(False)

Seblu's avatar
Seblu committed
    @staticmethod
    def get_version_abs(name, value):
        '''Return abs version'''
        logging.debug("Get ABS version")
        # Get ABS tree path
        abspath = value.get("abs_path", "/var/abs")
        # Map db and name
        repos = [d for d in os.listdir(abspath)
                 if os.path.isdir(os.path.join(abspath, d))]
        # filter if repo is provided
        if "repo" in value:
            allowed_repos = value.get("repo").split(",")
            for r in list(repos):
                if r not in allowed_repos:
                    repos.remove(r)
        # looking into db for package name
        for repo in repos:
            logging.debug("Looking into directory %s" % repo)
            repopath = os.path.join(abspath, repo)
            packages = [d for d in os.listdir(repopath)]
            if name in packages:
                pkgpath = os.path.join(repopath, name, "PKGBUILD")
                if os.path.isfile(pkgpath):
                    # use bash to export vars.
                    # WARNING: CODE IS EXECUTED
                    pkgdict = parse_pkgbuild(pkgpath)
                    if "pkgver" in pkgdict:
                        v = pkgdict["pkgver"]
Seblu's avatar
Seblu committed
                        logging.debug("ABS version is : %s" % v)
                        return v
        raise VersionNotFound("No ABS package found")

Seblu's avatar
Seblu committed
    def get_version_cache(self, name, value):
        '''Return cache version'''
        v_cache = self.cache.get(name, None)
        logging.debug("Cache version is : %s" % v_cache)
        return v_cache

    @staticmethod
    def get_version_none(name, value):
        '''Return cache version'''
        return None

    def check_versions(self, only_new=False, not_in_cache=False):
        '''Check versions against according to compare mode'''
        for name, value in self.packages.items():
            try:
                # get compare mode
                compare = value.get("compare", None)
                if compare is None:
                    raise InvalidConfigFile("No defined compare mode")
                if compare not in self.compare_table:
                    raise InvalidConfigFile("Invalid compare mode")
                # get upstream version
                v_upstream = self.get_version_upstream(name, value)
                # apply eval to upstream
                e_upstream = value.get("eval_upstream", None)
                if e_upstream is not None:
                    v_upstream = eval(e_upstream, {}, {"version": v_upstream})
                    logging.debug("eval_upstream produce version: %s" %
                                  v_upstream)
                # check upstream validity
                if v_upstream is None:
                    raise VersionNotFound("Upstream")
                # get cached version
                v_cache = self.cache.get(name, None)
                # only not in cache mode
                if not_in_cache and v_cache == v_upstream:
                    logging.debug("%s: skipped by not in cache mode" % name)
                    continue
                # get compared version
                v_compare = self.compare_table[compare](name, value)
                # apply eval to compared
                e_compare = value.get("eval_compare", None)
                if e_compare is not None:
                    v_compare = eval(e_compare, {}, {"version": v_compare})
                    logging.debug("eval_compare produce version: %s" %
                                  v_compare)
                # save version to cache after getting compared version
                # to avoid interfering with cache mode
                self.cache[name] = v_upstream
                # only new version mode
                if only_new and (v_compare is None or v_upstream == v_compare):
                    logging.debug("%s: skipped by only new mode" % name)
                    continue
                yield (name, v_upstream, v_compare)
            except VersionNotFound as exp:
                logging.warning("%s: Version not found: %s" % (name, exp))
Seblu's avatar
Seblu committed
            except InvalidConfigFile as exp:
Seblu's avatar
Seblu committed
                logging.warning("%s: Invalid configuration: %s" % (name, exp))

    def print_names(self):
        '''Print packages name'''
        for name in self.packages.keys():
            print(name)

    def print_cache(self):
        '''Print cache name and version'''
        for name, version in self.cache.items():
            print(name, ":", version)

    def print_modes(self):
        '''Print comparaison modes'''
        for name in sorted(self.compare_table.keys()):
            print(name)

    def print_versions(self, only_new=False, not_in_cache=False):
        '''Print versions'''
        for name, v_upstream, v_compare in self.check_versions(only_new,
                                                               not_in_cache):
            self.print_version(name, v_upstream, v_compare)

    def print_version(self, name, v1, v2=None):
        '''Handle printing of 2 versions'''
        # define used color
        c_blue =  c_white =  c_yellow =  c_compare =  c_reset = ''
        if sys.stdout.isatty():
            if v2 is None:   c_compare = '\033[1;33m'
            elif v1 == v2:   c_compare = '\033[1;32m'
            else:            c_compare = '\033[1;31m'
            c_blue = '\033[1;34m'
            c_white = '\033[1;37m'
            c_yellow = '\033[1;33m'
            c_reset = '\033[m'
        # print package name
        toprint = "%s[%s%s%s]" % (c_blue, c_white, name, c_blue)
        # print upstream
        toprint += " %sup: %s " % (c_yellow, v1)
        if v2 is not None:
            # print separator
            toprint += "%s|" % c_blue
            origin = self.packages.get(name,{}).get("compare", "other")
            toprint += " %s%s: %s" % (c_compare, origin, v2)
        toprint += c_reset
        print(toprint)

Seblu's avatar
Seblu committed

class VersionKey(object):
    '''Sorting key of a version string'''

    def __init__(self, vstring):
        self.vstring = vstring
        self.vlist = re.findall("([0-9]+|[a-zA-Z]+)", vstring)

    def __repr__(self):
        return "%s ('%s')" % (self.__class__.__name__, self.vstring)

    def __str__(self):
        return self.vstring

    def __eq__(self, other):
        return self.vstring == other.vstring

    def __ne__(self, other):
        return self.vstring != other.vstring

    def __lt__(self, other):
        try:
            for i, a in enumerate(self.vlist):
                b = other.vlist[i]
                if a.isdigit() and b.isdigit():
                    a = int(a)
                    b = int(b)
                    if a < b:
                        return True
                    elif a > b:
                        return False
                elif a.isdigit():
                    return False
                elif b.isdigit():
                    return True
                else:
                    if a < b:
                        return True
                    elif a > b:
                        return False
        except IndexError:
            return False
        return True

    def __gt__(self, other):
        return not self.__lt__(other) and not self.__eq__(other)

    def __le__(self, other):
        return not self.__gt__(other)

    def __ge__(self, other):
        return not self.__lt__(other)

Seblu's avatar
Seblu committed
# vim:set ts=4 sw=4 et ai: