Newer
Older
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
# coding: utf-8
# archversion - Archlinux Version Controller
# Copyright © 2012 Sébastien Luttringer
#
# This program is free software; you can redistribute it and/or
# modify it under the terms of the GNU General Public License
# as published by the Free Software Foundation; either version 2
# of the License, or (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program; if not, write to the Free Software
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
'''Controller Module'''
from archversion import USER_AGENT
from archversion.error import ConfigFileError, InvalidConfigFile, VersionNotFound
from urllib.request import urlopen, Request
import distutils.version
import json
import logging
import re
import sys
class VersionController(object):
'''
Handle version detection of packages
'''
AUR_RPC = "http://aur.archlinux.org/rpc.php"
def __init__(self, packages, cache):
self.packages = packages
# set cache
if cache is None:
cache = {}
self.cache = cache
# populate compare table
# need to be done manually to avoid get_upstream to be in
self.compare_table = {
"archlinux": self.get_version_archlinux,
"aur": self.get_version_aur,
"cache": self.get_version_cache,
"none": self.get_version_none
}
@staticmethod
def get_version_upstream(name, value):
'''Return upstream version'''
logging.debug("Get upstream version")
# check upstream param
if "url" not in value:
logging.error("No url specified for %s" % name)
raise InvalidConfigFile("Missing url in config file")
url = value["url"]
regex = value.get("regex", "%s[-_]v?(%s)%s" % (
value.get("regex_name", name),
value.get("regex_version", "[-.\w]+"),
value.get("regex_ext",
"\.(?:tar(?:\.gz|\.bz2|\.xz)?|tgz|tbz2|zip)")))
# retrieve config timeout
timeout = float(value.get("timeout", None))
# do the job
try:
logging.debug("Requesting url: %s" % url)
logging.debug("Timeout is %f" % timeout)
url_req = Request(url, headers={"User-Agent": USER_AGENT})
url_fd = urlopen(url_req, timeout=timeout)
logging.debug("Version regex: %s" % regex)
v = re.findall(regex, url_fd.read().decode("utf-8"))
if v is None:
raise VersionNotFound("No regex match on upstream")
# remove duplicity
v = list(set(v))
# list all found versions
logging.debug("Found versions: %s" % v)
v = sorted(v, key=distutils.version.LooseVersion, reverse=True)[0]
# list selected version
logging.debug("Upstream version is : %s" % v)
return v
except Exception as exp:
raise VersionNotFound("Upstream check failed: %s" % exp)
assert(False)
@staticmethod
def get_version_archlinux(name, value):
'''Return archlinux version'''
logging.debug("Get archlinux version")
# if arch is specified
archs = value.get("arch", "x86_64,i686,any").split(",")
# if archlinux repository is specified
repos = value.get("repo",
"community-testing,community,testing,extra,core"
).split(",")
# retrieve config timeout
timeout = float(value.get("timeout", None))
for arch in archs:
for repo in repos:
url = "http://www.archlinux.org/packages/%s/%s/%s/json" % (
repo, arch, name)
url_req = Request(url, headers={"User-Agent": USER_AGENT})
logging.debug("Requesting url: %s" % url)
logging.debug("Timeout is %f" % timeout)
try:
url_fd = urlopen(url_req, timeout=timeout)
d = json.loads(url_fd.read().decode("utf-8"))
v = d["pkgver"]
logging.debug("Archlinux version is : %s" % v)
return v
except Exception as exp:
logging.debug("Archlinux check failed: %s" % exp)
raise VersionNotFound("No Archlinux package found")
@staticmethod
def get_version_aur(name, value):
'''Return archlinux user repository version'''
logging.debug("Get AUR version")
try:
# retrieve config timeout
timeout = float(value.get("timeout", None))
url = "%s?type=info&arg=%s" % (VersionController.AUR_RPC, name)
url_req = Request(url, headers={"User-Agent": USER_AGENT})
logging.debug("Requesting url: %s" % url)
logging.debug("Timeout is %f" % timeout)
url_fd = urlopen(url_req, timeout=timeout)
d = json.loads(url_fd.read().decode("utf-8"))
v = d["results"]["Version"].rsplit("-")[0]
logging.debug("AUR version is : %s" % v)
return v
except Exception as exp:
raise VersionNotFound("AUR check failed: %s" % exp)
assert(False)
def get_version_cache(self, name, value):
'''Return cache version'''
v_cache = self.cache.get(name, None)
logging.debug("Cache version is : %s" % v_cache)
return v_cache
@staticmethod
def get_version_none(name, value):
'''Return cache version'''
return None
def check_versions(self, only_new=False, not_in_cache=False):
'''Check versions against according to compare mode'''
for name, value in self.packages.items():
try:
# get compare mode
compare = value.get("compare", None)
if compare is None:
raise InvalidConfigFile("No defined compare mode")
if compare not in self.compare_table:
raise InvalidConfigFile("Invalid compare mode")
# get upstream version
v_upstream = self.get_version_upstream(name, value)
# apply eval to upstream
e_upstream = value.get("eval_upstream", None)
if e_upstream is not None:
v_upstream = eval(e_upstream, {}, {"version": v_upstream})
logging.debug("eval_upstream produce version: %s" %
v_upstream)
# check upstream validity
if v_upstream is None:
raise VersionNotFound("Upstream")
# get cached version
v_cache = self.cache.get(name, None)
# only not in cache mode
if not_in_cache and v_cache == v_upstream:
logging.debug("%s: skipped by not in cache mode" % name)
continue
# get compared version
v_compare = self.compare_table[compare](name, value)
# apply eval to compared
e_compare = value.get("eval_compare", None)
if e_compare is not None:
v_compare = eval(e_compare, {}, {"version": v_compare})
logging.debug("eval_compare produce version: %s" %
v_compare)
# save version to cache after getting compared version
# to avoid interfering with cache mode
self.cache[name] = v_upstream
# only new version mode
if only_new and (v_compare is None or v_upstream == v_compare):
logging.debug("%s: skipped by only new mode" % name)
continue
yield (name, v_upstream, v_compare)
except VersionNotFound as exp:
logging.warning("%s: Version not found: %s" % (name, exp))
except ConfigFileError as exp:
logging.warning("%s: Invalid configuration: %s" % (name, exp))
def print_names(self):
'''Print packages name'''
for name in self.packages.keys():
print(name)
def print_cache(self):
'''Print cache name and version'''
for name, version in self.cache.items():
print(name, ":", version)
def print_modes(self):
'''Print comparaison modes'''
for name in sorted(self.compare_table.keys()):
print(name)
def print_versions(self, only_new=False, not_in_cache=False):
'''Print versions'''
for name, v_upstream, v_compare in self.check_versions(only_new,
not_in_cache):
self.print_version(name, v_upstream, v_compare)
def print_version(self, name, v1, v2=None):
'''Handle printing of 2 versions'''
# define used color
c_blue = c_white = c_yellow = c_compare = c_reset = ''
if sys.stdout.isatty():
if v2 is None: c_compare = '\033[1;33m'
elif v1 == v2: c_compare = '\033[1;32m'
else: c_compare = '\033[1;31m'
c_blue = '\033[1;34m'
c_white = '\033[1;37m'
c_yellow = '\033[1;33m'
c_reset = '\033[m'
# print package name
toprint = "%s[%s%s%s]" % (c_blue, c_white, name, c_blue)
# print upstream
toprint += " %sup: %s " % (c_yellow, v1)
if v2 is not None:
# print separator
toprint += "%s|" % c_blue
origin = self.packages.get(name,{}).get("compare", "other")
toprint += " %s%s: %s" % (c_compare, origin, v2)
toprint += c_reset
print(toprint)
# vim:set ts=4 sw=4 et ai: