#!/usr/bin/python3 # coding: utf-8 # agetpkg - Archive Get Package # Copyright © 2015 Sébastien Luttringer # # This program is free software; you can redistribute it and/or # modify it under the terms of the GNU General Public License # as published by the Free Software Foundation; either version 2 # of the License, or (at your option) any later version. # # This program is distributed in the hope that it will be useful, # but WITHOUT ANY WARRANTY; without even the implied warranty of # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the # GNU General Public License for more details. # # You should have received a copy of the GNU General Public License # along with this program; if not, write to the Free Software # Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA. '''Archlinux Archive Get Package''' from argparse import ArgumentParser from collections import OrderedDict from email.utils import parsedate from logging import getLogger, error, warn, debug, DEBUG from lzma import open as xzopen from os import stat, uname, getcwd, chdir, geteuid, environ from os.path import basename, exists, join from pprint import pprint from re import match, compile as recompile from shutil import copyfileobj from subprocess import call from tempfile import TemporaryDirectory from time import mktime, time from urllib.request import urlopen, Request from xdg.BaseDirectory import save_cache_path # magics NAME = "agetpkg" VERSION = "0" ARCHIVE_URL = "http://ala.seblu.net/packages/.all/" INDEX_FILENAME = "index.0.xz" PKG_EXT = ".pkg.tar.xz" SIG_EXT = ".sig" class Error(BaseException): """Error""" ERR_USAGE = 1 ERR_FATAL = 2 ERR_ABORT = 3 ERR_UNKNOWN = 4 class Url(object): """Remote Ressource""" HTTP_HEADERS = { "User-Agent": "%s v%s" % (NAME, VERSION), } def __init__(self, url, timeout): self.url = url self.timeout = timeout def __str__(self): return self.url @property def exists(self): try: self.headers return True except Exception: return False @property def size(self): """Return the remote file size""" try: return int(self.headers["Content-Length"]) except Exception as exp: raise Error("Failed to get size of %s: %s" % (self.url, exp)) @property def lastmod(self): try: return int(mktime(parsedate(self.headers["Last-Modified"]))) except Exception as exp: raise Error("Failed to get last modification date of %s: %s" % (self.url, exp)) @property def headers(self): """Return a dict with url headers""" if not hasattr(self, "_headers"): try: debug("Request headers on URL: %s" % self.url) url_req = Request(self.url, method="HEAD", headers=self.HTTP_HEADERS) remote_fd = urlopen(url_req, timeout=self.timeout) self._headers = dict(remote_fd.getheaders()) except Exception as exp: raise Error("Failed to get headers at %s: %s" % (self, exp)) return getattr(self, "_headers") def download(self, destination): """Download URL to destination""" debug("Downloading from : %s" % self.url) debug(" to : %s" % destination) try: url_req = Request(self.url, headers=self.HTTP_HEADERS) remote_fd = urlopen(url_req, timeout=self.timeout) local_fd = open(destination, "wb") copyfileobj(remote_fd, local_fd) except Exception as exp: raise Error("Failed to download %s: %s" % (self, exp)) class Package(Url): """Abstract a multi versionned package""" def __init__(self, url, timeout): self.url = Url(url, timeout) self.sigurl = Url(url + SIG_EXT, timeout) self.timeout = timeout self.filename = basename(url) self.sigfilename = self.filename + SIG_EXT # regex is not strict, but we are not validating something here m = match("^([\w@._+-]+)-((?:(\d+):)?([^-]+)-([^-]+))-(\w+)", self.filename) if m is None: raise Error("Unable to parse package info from filename %s" % self.filename) (self.name, self.full_version, self.epoch, self.version, self.release, self.arch) = m.groups() # no epoch means 0 (man PKGBUILD) if self.epoch is None: self.epoch = 0 def __str__(self): return self.filename def __getitem__(self, key): try: return getattr(self, key) except AttributeError: raise KeyError() @property def size(self): """Return package Content-Length (size in bytes)""" return self.url.size @property def lastmod(self): """Return package Last-Modified date (in seconds since epoch)""" return self.url.lastmod def get(self, sign=True, force=False): """Download the package locally""" if not force: if exists(self.filename): raise Error("Local file %s already exists" % self.filename) if sign and exists(self.sigfilename): raise Error("Local file %s already exists" % self.sigfilename) self.url.download(self.filename) if sign and self.sigurl.exists: self.sigurl.download(self.sigfilename) class Archive(object): """Abstract access to the package Archive""" def __init__(self, url, timeout, update=1): """Init the Archive interface url of the archive (flat style) update = update the local index cache (0: never, 1: when needed, 2: always) timeout = the socket timeout for network requests """ if url[-1] != "/": raise Error("Archive URL must end with a /") self.url = url self.remote_index = Url(self.url + INDEX_FILENAME, timeout) self.local_index = join(save_cache_path(NAME), INDEX_FILENAME) self.timeout = timeout if update > 0: self.update_index(update == 2) self._load_index() def _load_index(self): debug("Loading index from %s" % self.local_index) fd = xzopen(self.local_index, "rb") self._index = OrderedDict() for line in fd.readlines(): key = line.decode().rstrip() self._index[key] = Package("%s%s%s" % (self.url, key, PKG_EXT), self.timeout) debug("Index loaded: %s packages" % len(self._index)) def update_index(self, force=False): """Update index remotely when needed""" debug("Check remote index for upgrade") if force: return self.remote_index.download(self.local_index) # get remote info try: remote_size = self.remote_index.size remote_lastmod = self.remote_index.lastmod except Exception as exp: debug("Failed to get remote index size/lastmod: %s" % exp) return self.remote_index.download(self.local_index) # get local info try: local_st = stat(self.local_index) except Exception as exp: debug("Failed to get local stat: %s" % exp) return self.remote_index.download(self.local_index) # compare size if remote_size != local_st.st_size: debug("Size differ between remote and local index (%s vs %s)" % (remote_size, local_st.st_size)) return self.remote_index.download(self.local_index) # compare date elif remote_lastmod > local_st.st_mtime: debug("Remote index is newer than local, updating it (%s vs %s)" % (remote_lastmod, local_st.st_mtime)) return self.remote_index.download(self.local_index) debug("Remote and local indexes seems equal. No update.") def search(self, name_pattern, version_pattern, release_pattern, arch_list=None): """Search for a package """ name_regex = recompile(name_pattern) version_regex = recompile(version_pattern) if version_pattern is not None else None release_regex = recompile(release_pattern) if release_pattern is not None else None res = list() for pkg in self._index.values(): if name_regex.search(pkg.name): # check against arch if arch_list is not None and len(arch_list) > 0: if pkg.arch not in arch_list: continue # check against version if version_regex is not None: if not version_regex.search(pkg.version): continue # check against release if release_regex is not None: if not release_regex.search(pkg.release): continue res += [pkg] return res def which(binary): """lookup if bin exists into PATH""" dirs = environ.get("PATH", "").split(":") for d in dirs: if exists(join(d, binary)): return True return False def pacman(args, asroot=True): """execute pacman (optionally as root)""" cmd = ["pacman" ] + args # add sudo or su if not root and if asroot and geteuid() != 0: if which("sudo"): cmd = ["sudo"] + cmd elif which("su"): cmd = ["su", "root", "-c=%s" % " ".join(cmd) ] else: error("Unable to execute as root: %s" % " ".join(cmd)) debug("calling: %s" % cmd) call(cmd, close_fds=True) def list_packages(packages, long=False): """display a list of packages on stdout""" if long: pattern = "%(name)s %(full_version)s %(arch)s %(size)s %(lastmod)s %(url)s" else: pattern = "%(name)s %(full_version)s %(arch)s" for package in packages: print(pattern % package) def select_packages(packages, select_all=False): """select a package in a list""" # shortcut to one package if len(packages) == 1: yield packages[0] elif select_all: for pkg in packages: yield pkg else: # display a list of packages to select index = dict(enumerate(packages)) pad = len("%d" % max(index.keys())) for i, pkg in index.items(): print("{:{pad}} {}".format(i, pkg, pad=pad)) selection = "" while not match("^(\d+ ){0,}\d+$", selection): selection = input("Select packages (* for all): ").strip() if selection == "": return # shortcut to select all packages if selection == "*": for pkg in packages: yield pkg return # parse selection numbers = [ int(x) for x in selection.split(" ") ] for num in numbers: if num in index.keys(): yield index[num] else: warn("No package n°%s" % num) def get_packages(packages, select_all=False): """download packages""" for pkg in select_packages(packages, select_all): pkg.get() def install_packages(packages, select_all=False): """install packages in one shot to allow deps to work""" packages = list(select_packages(packages, select_all)) with TemporaryDirectory() as tmpdir: cwd = getcwd() chdir(tmpdir) for pkg in packages: pkg.get() pacman(["-U"] + [ pkg.filename for pkg in packages ]) chdir(cwd) def parse_argv(): '''Parse command line arguments''' local_arch = uname().machine p_main = ArgumentParser() # update index options g_update = p_main.add_mutually_exclusive_group() g_update.add_argument("-u", "--force-update", action="store_const", dest="update", const=2, default=1, help="force index update") g_update.add_argument("-U", "--no-update", action="store_const", dest="update", const=0, help="disable index update") # action mode options g_action = p_main.add_mutually_exclusive_group() g_action.add_argument("-g", "--get", action="store_const", dest="mode", const="get", help="get matching packages (default mode)") g_action.add_argument("-l", "--list", action="store_const", dest="mode", const="list", help="only list matching packages") g_action.add_argument("-i", "--install", action="store_const", dest="mode", const="install", help="install matching packages") # common options p_main.add_argument("-a", "--all", action="store_true", help="select all packages without prompting") p_main.add_argument("-A", "--arch", nargs="*", default=[local_arch, "any"], help="filter by architectures (default: %s and any. empty means all)" % local_arch) p_main.add_argument("-v", "--verbose", action="store_true", help="display more information") p_main.add_argument("--url", help="archive URL, default: %s" % ARCHIVE_URL, default=environ.get("ARCHIVE_URL", ARCHIVE_URL)) p_main.add_argument("-t", "--timeout", default=10, help="connection timeout (default: 10s)") p_main.add_argument("--version", action="version", version="%(prog)s version " + VERSION) p_main.add_argument("--debug", action="store_true", help="debug mode") # positional args p_main.add_argument("package", help="regex to match a package name") p_main.add_argument("version", nargs="?", help="regex to match a package version") p_main.add_argument("release", nargs="?", help="regex to match a package release") return p_main.parse_args() def main(): '''Program entry point''' try: # parse command line args = parse_argv() # set global debug mode if args.debug: getLogger().setLevel(DEBUG) # init archive interface archive = Archive(args.url, args.timeout, args.update) # select target pacakges packages = archive.search(args.package, args.version, args.release, args.arch) if len(packages) == 0: print("No match found.") exit(0) if args.mode == "list": list_packages(packages, long=args.verbose) elif args.mode == "install": install_packages(packages, args.all) else: get_packages(packages, args.all) except KeyboardInterrupt: exit(Error.ERR_ABORT) except Error as exp: error(exp) exit(Error.ERR_FATAL) except Exception as exp: error("Unknown error. Please report it with --debug") error("at https://github.com/seblu/agetpkg/issues.") error(exp) if getLogger().getEffectiveLevel() == DEBUG: raise exit(Error.ERR_UNKNOWN) if __name__ == '__main__': main() # vim:set ts=4 sw=4 et ai: