Skip to content
controller.py 9.94 KiB
Newer Older
Seblu's avatar
Seblu committed
# coding: utf-8

# archversion - Archlinux Version Controller
# Copyright © 2012 Sébastien Luttringer
#
# This program is free software; you can redistribute it and/or
# modify it under the terms of the GNU General Public License
# as published by the Free Software Foundation; either version 2
# of the License, or (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program; if not, write to the Free Software
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.

'''Controller Module'''


from archversion import USER_AGENT
from archversion.error import ConfigFileError, InvalidConfigFile, VersionNotFound
from urllib.request import urlopen, Request
import distutils.version
import json
import logging
import re
import sys

class VersionController(object):
    '''
    Handle version detection of packages
    '''

    AUR_RPC = "http://aur.archlinux.org/rpc.php"

    def __init__(self, packages, cache):
        self.packages = packages
        # set cache
        if cache is None:
            cache = {}
        self.cache = cache
        # populate compare table
        # need to be done manually to avoid get_upstream to be in
        self.compare_table = {
            "archlinux": self.get_version_archlinux,
            "aur": self.get_version_aur,
            "cache": self.get_version_cache,
            "none": self.get_version_none
            }

    @staticmethod
    def get_version_upstream(name, value):
        '''Return upstream version'''
        logging.debug("Get upstream version")
        # check upstream param
        if "url" not in value:
            logging.error("No url specified for %s" % name)
            raise InvalidConfigFile("Missing url in config file")
        url = value["url"]
        regex = value.get("regex", "%s[-_]v?(%s)%s" % (
                    value.get("regex_name", name),
                    value.get("regex_version", "[-.\w]+"),
                    value.get("regex_ext",
                              "\.(?:tar(?:\.gz|\.bz2|\.xz)?|tgz|tbz2|zip)")))
        # retrieve config timeout
        timeout = float(value.get("timeout", None))
        # do the job
        try:
            logging.debug("Requesting url: %s" % url)
            logging.debug("Timeout is %f" % timeout)
            url_req = Request(url, headers={"User-Agent": USER_AGENT})
            url_fd = urlopen(url_req, timeout=timeout)
            logging.debug("Version regex: %s" % regex)
            v = re.findall(regex, url_fd.read().decode("utf-8"))
            if v is None:
                raise VersionNotFound("No regex match on upstream")
            # remove duplicity
            v = list(set(v))
            # list all found versions
            logging.debug("Found versions: %s" % v)
            v =  sorted(v, key=distutils.version.LooseVersion, reverse=True)[0]
            # list selected version
            logging.debug("Upstream version is : %s" % v)
            return v
        except Exception as exp:
            raise VersionNotFound("Upstream check failed: %s" % exp)
        assert(False)

    @staticmethod
    def get_version_archlinux(name, value):
        '''Return archlinux version'''
        logging.debug("Get archlinux version")
        # if arch is specified
        archs = value.get("arch", "x86_64,i686,any").split(",")
        # if archlinux repository is specified
        repos = value.get("repo",
                          "community-testing,community,testing,extra,core"
                          ).split(",")
        # retrieve config timeout
        timeout = float(value.get("timeout", None))
        for arch in archs:
            for repo in repos:
                url = "http://www.archlinux.org/packages/%s/%s/%s/json" % (
                    repo, arch, name)
                url_req = Request(url, headers={"User-Agent": USER_AGENT})
                logging.debug("Requesting url: %s" % url)
                logging.debug("Timeout is %f" % timeout)
                try:
                    url_fd = urlopen(url_req, timeout=timeout)
                    d = json.loads(url_fd.read().decode("utf-8"))
                    v = d["pkgver"]
                    logging.debug("Archlinux version is : %s" % v)
                    return v
                except Exception as exp:
                    logging.debug("Archlinux check failed: %s" % exp)
        raise VersionNotFound("No Archlinux package found")

    @staticmethod
    def get_version_aur(name, value):
        '''Return archlinux user repository version'''
        logging.debug("Get AUR version")
        try:
            # retrieve config timeout
            timeout = float(value.get("timeout", None))
            url = "%s?type=info&arg=%s" % (VersionController.AUR_RPC, name)
            url_req = Request(url, headers={"User-Agent": USER_AGENT})
            logging.debug("Requesting url: %s" % url)
            logging.debug("Timeout is %f" % timeout)
            url_fd = urlopen(url_req, timeout=timeout)
            d = json.loads(url_fd.read().decode("utf-8"))
            v = d["results"]["Version"].rsplit("-")[0]
            logging.debug("AUR version is : %s" % v)
            return v
        except Exception as exp:
            raise VersionNotFound("AUR check failed: %s" % exp)
        assert(False)

    def get_version_cache(self, name, value):
        '''Return cache version'''
        v_cache = self.cache.get(name, None)
        logging.debug("Cache version is : %s" % v_cache)
        return v_cache

    @staticmethod
    def get_version_none(name, value):
        '''Return cache version'''
        return None

    def check_versions(self, only_new=False, not_in_cache=False):
        '''Check versions against according to compare mode'''
        for name, value in self.packages.items():
            try:
                # get compare mode
                compare = value.get("compare", None)
                if compare is None:
                    raise InvalidConfigFile("No defined compare mode")
                if compare not in self.compare_table:
                    raise InvalidConfigFile("Invalid compare mode")
                # get upstream version
                v_upstream = self.get_version_upstream(name, value)
                # apply eval to upstream
                e_upstream = value.get("eval_upstream", None)
                if e_upstream is not None:
                    v_upstream = eval(e_upstream, {}, {"version": v_upstream})
                    logging.debug("eval_upstream produce version: %s" %
                                  v_upstream)
                # check upstream validity
                if v_upstream is None:
                    raise VersionNotFound("Upstream")
                # get cached version
                v_cache = self.cache.get(name, None)
                # only not in cache mode
                if not_in_cache and v_cache == v_upstream:
                    logging.debug("%s: skipped by not in cache mode" % name)
                    continue
                # get compared version
                v_compare = self.compare_table[compare](name, value)
                # apply eval to compared
                e_compare = value.get("eval_compare", None)
                if e_compare is not None:
                    v_compare = eval(e_compare, {}, {"version": v_compare})
                    logging.debug("eval_compare produce version: %s" %
                                  v_compare)
                # save version to cache after getting compared version
                # to avoid interfering with cache mode
                self.cache[name] = v_upstream
                # only new version mode
                if only_new and (v_compare is None or v_upstream == v_compare):
                    logging.debug("%s: skipped by only new mode" % name)
                    continue
                yield (name, v_upstream, v_compare)
            except VersionNotFound as exp:
                logging.warning("%s: Version not found: %s" % (name, exp))
            except ConfigFileError as exp:
                logging.warning("%s: Invalid configuration: %s" % (name, exp))

    def print_names(self):
        '''Print packages name'''
        for name in self.packages.keys():
            print(name)

    def print_cache(self):
        '''Print cache name and version'''
        for name, version in self.cache.items():
            print(name, ":", version)

    def print_modes(self):
        '''Print comparaison modes'''
        for name in sorted(self.compare_table.keys()):
            print(name)

    def print_versions(self, only_new=False, not_in_cache=False):
        '''Print versions'''
        for name, v_upstream, v_compare in self.check_versions(only_new,
                                                               not_in_cache):
            self.print_version(name, v_upstream, v_compare)

    def print_version(self, name, v1, v2=None):
        '''Handle printing of 2 versions'''
        # define used color
        c_blue =  c_white =  c_yellow =  c_compare =  c_reset = ''
        if sys.stdout.isatty():
            if v2 is None:   c_compare = '\033[1;33m'
            elif v1 == v2:   c_compare = '\033[1;32m'
            else:            c_compare = '\033[1;31m'
            c_blue = '\033[1;34m'
            c_white = '\033[1;37m'
            c_yellow = '\033[1;33m'
            c_reset = '\033[m'
        # print package name
        toprint = "%s[%s%s%s]" % (c_blue, c_white, name, c_blue)
        # print upstream
        toprint += " %sup: %s " % (c_yellow, v1)
        if v2 is not None:
            # print separator
            toprint += "%s|" % c_blue
            origin = self.packages.get(name,{}).get("compare", "other")
            toprint += " %s%s: %s" % (c_compare, origin, v2)
        toprint += c_reset
        print(toprint)

# vim:set ts=4 sw=4 et ai: