Skip to content
version.py 17.7 KiB
Newer Older
Seblu's avatar
Seblu committed
# coding: utf-8

# archversion - Archlinux Version Controller
# Copyright © 2012 Sébastien Luttringer
#
# This program is free software; you can redistribute it and/or
# modify it under the terms of the GNU General Public License
# as published by the Free Software Foundation; either version 2
# of the License, or (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program; if not, write to the Free Software
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.

'''Version Module'''
Seblu's avatar
Seblu committed
from archversion import HTTP_HEADERS, CONFIG_PACKAGES, CACHE_PACKAGES
Seblu's avatar
Seblu committed
from archversion.config import BaseConfigFile
from archversion.database import JsonDatabase
Seblu's avatar
Seblu committed
from archversion.error import InvalidConfigFile, VersionNotFound
Seblu's avatar
Seblu committed
from archversion.pacman import parse_pkgbuild, Pacman
from collections import OrderedDict
Seblu's avatar
Seblu committed
from time import time
Seblu's avatar
Seblu committed
from urllib.request import urlopen, Request
import fnmatch
Seblu's avatar
Seblu committed
import json
import logging
Seblu's avatar
Seblu committed
import os
Seblu's avatar
Seblu committed
import re
Seblu's avatar
Seblu committed
import subprocess
Seblu's avatar
Seblu committed
import sys

class VersionController(object):
    '''
    Handle version detection of packages
    '''

Seblu's avatar
Seblu committed
    def __init__(self):
        # load packages configuration
Seblu's avatar
Seblu committed
        self._packages = BaseConfigFile(CONFIG_PACKAGES)
Seblu's avatar
Seblu committed
        # load cache database
Seblu's avatar
Seblu committed
        self._cache = JsonDatabase()
        self._cache.load(CACHE_PACKAGES)
Seblu's avatar
Seblu committed
        # set cache
Seblu's avatar
Seblu committed
        if set(self._cache.keys()) != set(("downstream", "compare", "upstream")):
Seblu's avatar
Seblu committed
            logging.debug("Invalid cache, purging it")
Seblu's avatar
Seblu committed
            self._cache.clear()
            self._cache["upstream"] = {}
            self._cache["downstream"] = {}
            self._cache["compare"] = {}
Seblu's avatar
Seblu committed

Seblu's avatar
Seblu committed
    @property
    def packages(self):
        '''Return list of packages augmented with aliases'''
        pkgs = []
        for name, data in self._packages.items():
            pkgs.append(name)
            pkgs += self.alias(data)
        return pkgs
Seblu's avatar
Seblu committed
    @property
    def versions(self):
        '''Return upstream versions of a package (use cache)'''
        ver = OrderedDict()
        for name, v_upstream, v_downstream in self.compare():
            ver[name] = (v_upstream, v_downstream)
        return ver
Seblu's avatar
Seblu committed
    def select(self, packages):
        '''
        Remove packages not listed in packages from the processing of
        controller future actions
        '''
        packages = set(packages)
Seblu's avatar
Seblu committed
        for name, data in OrderedDict(self._packages).items():
Seblu's avatar
Seblu committed
            names = set((name,)) | set(self.alias(data))
            if len(packages & names) == 0:
                self._packages.pop(name, None)

    def sort(self):
        '''
Seblu's avatar
Seblu committed
        Sort packages by name
        Make packages to be upgraded/displayed by alpha order
Seblu's avatar
Seblu committed
        '''
        self._packages = self.sort_dict(self._packages)
Seblu's avatar
Seblu committed
        # do not sort self._cache by recreating the cache object
        # destructor is used to save the cache content
Seblu's avatar
Seblu committed

    def sync(self):
        '''
        Synchronise local cache with external states
        Retrieve upstream and downstream versions and store them
        '''
        for name, value in self._packages.items():
            try:
                logging.debug("Syncing versions of package %s" % name)
                # get upstream version
                v_upstream = self.get_version_upstream(name, value)
                # apply eval to upstream
                e_upstream = value.get("eval_upstream", None)
                if e_upstream is not None:
                    v_upstream = eval(e_upstream, {"re": re}, {"version": v_upstream})
Seblu's avatar
Seblu committed
                    logging.debug("eval_upstream produce version: %s" % v_upstream)
                # save upstream version
                if self._cache["upstream"].get(name, {}).get("version", None) != v_upstream:
                    logging.debug("caching upstream version %s" % v_upstream)
                    self._cache["upstream"][name] = {"version": v_upstream, "epoch": int(time())}
                else:
                    logging.debug("already cached upstream version %s" % v_upstream)
                # get downstream mode
                mode = value.get("downstream", None)
                if mode is None:
                    logging.warning("%s: Invalid downstream mode: %s." % (name, mode))
                    continue
                # get downstream version
                v_downstream = self.get_version_downstream(name, value, mode)
                # apply eval to downstream
Seblu's avatar
Seblu committed
                e_downstream = value.get("eval_downstream", None)
                if e_downstream is not None:
                    v_downstream = eval(e_downstream, {"re": re}, {"version": v_downstream})
Seblu's avatar
Seblu committed
                    logging.debug("eval_downstream produce version: %s" % v_downstream)
                # save downstream version
                if self._cache["downstream"].get(name, {}).get("version", None) != v_downstream:
                    logging.debug("caching downstream version %s" % v_downstream)
                    self._cache["downstream"][name] = {"version": v_downstream, "epoch": int(time())}
                else:
                    logging.debug("already cached downstream version %s" % v_downstream)
            except Exception as exp:
                logging.error("Sync of %s: %s" % (name, exp))

    def compare(self, only_new=False, only_fresh=False):
        '''
        Compare versions according compare mode
        Return an iterator over all packages and their aliases with
        upstream and downstream versions.
        '''
        for name, value in self._packages.items():
            logging.debug("Comparing versions of package %s" % name)
            # get upstream in cache
            v_upstream = self._cache["upstream"].get(name, {}).get("version", None)
            if v_upstream is None:
                logging.warning("%s: Upstream version not found in cache" % name)
                continue
            # get downstream in cache
            v_downstream = self._cache["downstream"].get(name, {}).get("version", None)
            if v_downstream is None:
                logging.warning("%s: Downstream version not found in cache" % name)
                continue
            # only new version mode
            if only_new and v_upstream == v_downstream:
                logging.debug("%s: skipped by only new mode" % name)
                continue
            # only fresh version mode
            if only_fresh:
                last_cmp = self._cache["compare"].get(name, -1)
                last_up = self._cache["upstream"].get(name, {}).get("epoch", 0)
                last_down = self._cache["downstream"].get(name, {}).get("epoch", 0)
                if (last_cmp >= last_up and last_cmp >= last_down):
                    logging.debug("%s: skipped by only fresh mode" % name)
                    continue
            # save our compare in cache
            self._cache["compare"][name] = int(time())
            # gen main pacakge
            yield (name, v_upstream, v_downstream)
            # gen aliases package
            for alias in self.alias(value):
                yield (alias, v_upstream, v_downstream)

    @staticmethod
    def alias(pkg):
        '''Return the list of aliases of a package'''
        return [ al for al in pkg.get("alias", "").split(" ") if al != "" ]

    @staticmethod
    def sort_dict(larousse):
        '''Sort a dictionary into and OrderedDict'''
        return OrderedDict(sorted(larousse.items(), key=lambda t: t[0]))

Seblu's avatar
Seblu committed
    @staticmethod
    def get_version_upstream(name, value):
Seblu's avatar
Seblu committed
        '''Fetch upstream version'''
Seblu's avatar
Seblu committed
        logging.debug("Get upstream version")
        # check upstream param
        if "url" not in value:
            logging.error("No url specified for %s" % name)
            raise InvalidConfigFile("Missing url in config file")
        url = value["url"]
        regex = value.get("regex", "%s[-_]v?(%s)%s" % (
                    value.get("regex_name", name),
                    value.get("regex_version", "[-.\w]+"),
                    value.get("regex_ext",
                              "\.(?:tar(?:\.gz|\.bz2|\.xz)?|tgz|tbz2|zip)")))
        # retrieve config timeout
        timeout = float(value["timeout"]) if "timeout" in value else None
Seblu's avatar
Seblu committed
        # do it retry time + 1
        ntry = int(value.get("retry", 0)) + 1
Seblu's avatar
Seblu committed
        # do the job
Seblu's avatar
Seblu committed
        for n in range(1, ntry + 1):
            try:
                logging.debug("Requesting url: %s (try %d/%d)" % (url, n, ntry))
                logging.debug("Timeout is %s" % timeout)
Seblu's avatar
Seblu committed
                url_req = Request(url, headers=HTTP_HEADERS)
Seblu's avatar
Seblu committed
                url_fd = urlopen(url_req, timeout=timeout)
                logging.debug("Version regex: %s" % regex)
Seblu's avatar
Seblu committed
                v = re.findall(regex, url_fd.read().decode("utf-8", "ignore"))
Seblu's avatar
Seblu committed
                if v is None or len(v) == 0:
                    raise VersionNotFound("No regex match on upstream")
                # remove duplicity
Seblu's avatar
Seblu committed
                v = set(v)
Seblu's avatar
Seblu committed
                # list all found versions
                logging.debug("Found versions: %s" % v)
Seblu's avatar
Seblu committed
                # exclude versions
                regex_exclude = value.get("regex_exclude", ".*(rc|beta|alpha|pre).*")
Seblu's avatar
Seblu committed
                if regex_exclude != "":
                    logging.debug("Exclusion regex: %s" % regex_exclude)
                    v -= set(filter(lambda x: re.search(regex_exclude, x), v))
Seblu's avatar
Seblu committed
                    logging.debug("Found versions after exclusion: %s" % v)
                # latest version is the highest
Seblu's avatar
Seblu committed
                v = max(v, key=VersionKey)
                # list selected version
                logging.debug("Upstream version is : %s" % v)
                return v
            except Exception as exp:
                if n == ntry:
                    raise VersionNotFound("Upstream check failed: %s" % exp)
Seblu's avatar
Seblu committed
        assert(False)

Seblu's avatar
Seblu committed
    @staticmethod
Seblu's avatar
Seblu committed
    def get_version_downstream(name, value, mode):
        '''Return dowstream version'''
        try:
            return getattr(VersionController, "get_version_downstream_%s" % mode)(name, value)
        except AttributeError:
            raise InvalidConfigFile("Invalid dowstream mode")

    @staticmethod
    def get_version_downstream_pacman(name, value):
Seblu's avatar
Seblu committed
        '''Return pacman version'''
        logging.debug("Get pacman version")
        # Load pacman
Seblu's avatar
Seblu committed
        pacman = Pacman()
Seblu's avatar
Seblu committed
        # filter if repo is provided
Seblu's avatar
Seblu committed
        allowed_repos = value.get("repo").split(",") if "repo" in value else None
Seblu's avatar
Seblu committed
        # looking into db for package name
Seblu's avatar
Seblu committed
        db, pkg = pacman.find_pkg(name, allowed_repos)
        if pkg is not None:
            epoch, pkgver, pkgrel = re.match("^(?:(\d+)\:)?([^-:]*)(?:-(\d+))?",
                pkg.version).groups()
            logging.debug("pacman version in %s: %s" % (db.name, pkgver))
            return pkgver
Seblu's avatar
Seblu committed
        raise VersionNotFound("No pacman package found")

Seblu's avatar
Seblu committed
    @staticmethod
Seblu's avatar
Seblu committed
    def get_version_downstream_archweb(name, value):
        '''Return archweb version'''
        logging.debug("Get archweb version")
Seblu's avatar
Seblu committed
        # if arch is specified
        archs = value.get("arch", "x86_64,i686,any").split(",")
        # if archweb repository is specified
Seblu's avatar
Seblu committed
        repos = value.get("repo",
                          "community-testing,community,testing,extra,core"
                          ).split(",")
        # retrieve config timeout
        timeout = float(value["timeout"]) if "timeout" in value else None
Seblu's avatar
Seblu committed
        for arch in archs:
            for repo in repos:
                url = "http://www.archlinux.org/packages/%s/%s/%s/json" % (
                    repo, arch, name)
Seblu's avatar
Seblu committed
                url_req = Request(url, headers=HTTP_HEADERS)
Seblu's avatar
Seblu committed
                logging.debug("Requesting url: %s" % url)
                logging.debug("Timeout is %s" % timeout)
Seblu's avatar
Seblu committed
                try:
                    url_fd = urlopen(url_req, timeout=timeout)
Seblu's avatar
Seblu committed
                    d = json.loads(url_fd.read().decode("utf-8", "ignore"))
Seblu's avatar
Seblu committed
                    v = d["pkgver"]
                    logging.debug("Archweb version is : %s" % v)
Seblu's avatar
Seblu committed
                    return v
                except Exception as exp:
                    logging.debug("Archweb check failed: %s" % exp)
        raise VersionNotFound("No Archweb package found")
Seblu's avatar
Seblu committed

    @staticmethod
Seblu's avatar
Seblu committed
    def get_version_downstream_aur(name, value):
Seblu's avatar
Seblu committed
        '''Return archlinux user repository version'''
        logging.debug("Get AUR version")
        try:
            # retrieve config timeout
            timeout = float(value["timeout"]) if "timeout" in value else None
Seblu's avatar
Seblu committed
            url = "http://aur.archlinux.org/rpc.php?type=info&arg=%s" % name
Seblu's avatar
Seblu committed
            url_req = Request(url, headers=HTTP_HEADERS)
Seblu's avatar
Seblu committed
            logging.debug("Requesting url: %s" % url)
            logging.debug("Timeout is %s" % timeout)
Seblu's avatar
Seblu committed
            url_fd = urlopen(url_req, timeout=timeout)
Seblu's avatar
Seblu committed
            d = json.loads(url_fd.read().decode("utf-8", "ignore"))
Seblu's avatar
Seblu committed
            if "version" not in d or d["version"] != 1:
                raise VersionNotFound("Unsupported AUR version")
            if len(d["results"]) == 0:
                raise VersionNotFound("No such package")
Seblu's avatar
Seblu committed
            v = d["results"]["Version"].rsplit("-")[0]
            logging.debug("AUR version is : %s" % v)
            return v
        except Exception as exp:
            raise VersionNotFound("AUR check failed: %s" % exp)
        assert(False)

Seblu's avatar
Seblu committed
    @staticmethod
Seblu's avatar
Seblu committed
    def get_version_downstream_abs(name, value):
Seblu's avatar
Seblu committed
        '''Return abs version'''
        logging.debug("Get ABS version")
        # Get ABS tree path
        abspath = value.get("abs_path", "/var/abs")
        # Map db and name
        repos = [d for d in os.listdir(abspath)
                 if os.path.isdir(os.path.join(abspath, d))]
        # filter if repo is provided
        if "repo" in value:
            allowed_repos = value.get("repo").split(",")
            for r in list(repos):
                if r not in allowed_repos:
                    repos.remove(r)
        # looking into db for package name
        for repo in repos:
            logging.debug("Looking into directory %s" % repo)
            repopath = os.path.join(abspath, repo)
            packages = [d for d in os.listdir(repopath)]
            if name in packages:
                pkgpath = os.path.join(repopath, name, "PKGBUILD")
                if os.path.isfile(pkgpath):
                    # use bash to export vars.
                    # WARNING: CODE IS EXECUTED
                    pkgdict = parse_pkgbuild(pkgpath)
                    if "pkgver" in pkgdict:
                        v = pkgdict["pkgver"]
Seblu's avatar
Seblu committed
                        logging.debug("ABS version is : %s" % v)
                        return v
        raise VersionNotFound("No ABS package found")

Seblu's avatar
Seblu committed
    @staticmethod
Seblu's avatar
Seblu committed
    def get_version_downstream_none(name, value):
        '''Return none version'''
        return ""

    def print_names(self):
Seblu's avatar
Seblu committed
        '''Print packages name'''
Seblu's avatar
Seblu committed
        for name in self.packages:
            print(name)
Seblu's avatar
Seblu committed

    @staticmethod
    def print_modes():
Seblu's avatar
Seblu committed
        '''Print comparaison modes'''
        for mode in fnmatch.filter(dir(VersionController), "get_version_downstream_*"):
            print(mode[23:])
Seblu's avatar
Seblu committed

    def print_versions(self, only_new=False, only_fresh=False):
Seblu's avatar
Seblu committed
        '''Print versions'''
Seblu's avatar
Seblu committed
        for name, v_upstream, v_downstream in self.compare(only_new, only_fresh):
            self.print_version(name, v_upstream, v_downstream)
Seblu's avatar
Seblu committed

    def print_version(self, name, v1, v2=None):
        '''Handle printing of 2 versions'''
        # define used color
        c_blue =  c_white =  c_yellow =  c_compare =  c_reset = ''
        if sys.stdout.isatty():
            if v2 is None:   c_compare = '\033[1;33m'
            elif v1 == v2:   c_compare = '\033[1;32m'
            else:            c_compare = '\033[1;31m'
            c_blue = '\033[1;34m'
            c_white = '\033[1;37m'
            c_yellow = '\033[1;33m'
            c_reset = '\033[m'
        # print package name
        toprint = "%s[%s%s%s]" % (c_blue, c_white, name, c_blue)
        # print upstream
        toprint += " %sup: %s " % (c_yellow, v1)
        # print downstream
Seblu's avatar
Seblu committed
        if v2 != "":
Seblu's avatar
Seblu committed
            # print separator
            toprint += "%s|" % c_blue
Seblu's avatar
Seblu committed
            origin = self._packages.get(name,{}).get("downstream", "downstream")
Seblu's avatar
Seblu committed
            toprint += " %s%s: %s" % (c_compare, origin, v2)
        toprint += c_reset
        print(toprint)

Seblu's avatar
Seblu committed

class VersionKey(object):
    '''Sorting key of a version string'''

    def __init__(self, vstring):
        self.vstring = vstring
        self.vlist = re.findall("([0-9]+|[a-zA-Z]+)", vstring)

    def __repr__(self):
        return "%s ('%s')" % (self.__class__.__name__, self.vstring)

    def __str__(self):
        return self.vstring

    def __eq__(self, other):
        return self.vstring == other.vstring

    def __ne__(self, other):
        return self.vstring != other.vstring

    def __lt__(self, other):
        try:
            for i, a in enumerate(self.vlist):
                b = other.vlist[i]
                if a.isdigit() and b.isdigit():
                    a = int(a)
                    b = int(b)
                    if a < b:
                        return True
                    elif a > b:
                        return False
                elif a.isdigit():
                    return False
                elif b.isdigit():
                    return True
                else:
                    if a < b:
                        return True
                    elif a > b:
                        return False
        except IndexError:
            return False
        return True

    def __gt__(self, other):
        return not self.__lt__(other) and not self.__eq__(other)

    def __le__(self, other):
        return not self.__gt__(other)

    def __ge__(self, other):
        return not self.__lt__(other)

Seblu's avatar
Seblu committed
# vim:set ts=4 sw=4 et ai: