Skip to content
GitLab
Projects
Groups
Snippets
Help
Loading...
Help
Help
Support
Community forum
Keyboard shortcuts
?
Submit feedback
Contribute to GitLab
Sign in
Toggle navigation
agetpkg
Project overview
Project overview
Details
Activity
Releases
Repository
Repository
Files
Commits
Branches
Tags
Contributors
Graph
Compare
CI / CD
CI / CD
Pipelines
Jobs
Schedules
Operations
Operations
Environments
Analytics
Analytics
CI / CD
Repository
Value Stream
Members
Members
Collapse sidebar
Close sidebar
Activity
Graph
Jobs
Commits
Open sidebar
archlinux
agetpkg
Commits
ae274d32
Commit
ae274d32
authored
Oct 14, 2015
by
Seblu
Browse files
Options
Browse Files
Download
Email Patches
Plain Diff
Initial commit
With a working PoC.
parents
Changes
2
Hide whitespace changes
Inline
Side-by-side
Showing
2 changed files
with
340 additions
and
0 deletions
+340
-0
PKGBUILD
PKGBUILD
+17
-0
agetpkg
agetpkg
+323
-0
No files found.
PKGBUILD
0 → 100644
View file @
ae274d32
# Maintainer: Sébastien Luttringer
pkgname
=
agetpkg-git
pkgver
=
"
$(
git log
--pretty
=
format:
''
|wc
-l
)
"
pkgrel
=
1
pkgdesc
=
'Archlinux Archive Get Package (Git version)'
arch
=(
'any'
)
url
=
'https://github.com/seblu/agetpkg'
license
=(
'GPL2'
)
depends
=(
'python'
'python-xdg'
)
package
()
{
cd
"
$startdir
"
install
-Dm755
agetpkg
"
$pkgdir
/usr/bin/agetpkg"
}
# vim:set ts=2 sw=2 et:
agetpkg
0 → 100755
View file @
ae274d32
#!/usr/bin/python3
# coding: utf-8
# agetpkg - Archive Get Package
# Copyright © 2015 Sébastien Luttringer
#
# This program is free software; you can redistribute it and/or
# modify it under the terms of the GNU General Public License
# as published by the Free Software Foundation; either version 2
# of the License, or (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program; if not, write to the Free Software
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
'''Archlinux Archive Get Package'''
from
argparse
import
ArgumentParser
from
collections
import
OrderedDict
from
email.utils
import
parsedate
from
logging
import
getLogger
,
error
,
debug
,
DEBUG
from
lzma
import
open
as
xzopen
from
os
import
stat
,
uname
from
os.path
import
basename
,
exists
,
join
from
pprint
import
pprint
from
re
import
match
,
compile
as
recompile
from
shutil
import
copyfileobj
from
time
import
mktime
,
time
from
urllib.request
import
urlopen
,
Request
from
xdg.BaseDirectory
import
save_cache_path
# magics
NAME
=
"agetpkg"
VERSION
=
"0"
ARCHIVE_URL
=
"http://ala.seblu.net/packages/.all/"
INDEX_FILENAME
=
"index.0.xz"
PKG_EXT
=
".pkg.tar.xz"
SIG_EXT
=
".sig"
class
Error
(
BaseException
):
"""Error"""
ERR_USAGE
=
1
ERR_FATAL
=
2
ERR_ABORT
=
3
ERR_UNKNOWN
=
4
class
Url
(
object
):
"""Remote Ressource"""
HTTP_HEADERS
=
{
"User-Agent"
:
"%s v%s"
%
(
NAME
,
VERSION
),
}
def
__init__
(
self
,
url
,
timeout
):
self
.
url
=
url
self
.
timeout
=
timeout
def
__str__
(
self
):
return
self
.
url
@
property
def
exists
(
self
):
try
:
self
.
headers
return
True
except
Exception
:
return
False
@
property
def
size
(
self
):
"""Return the remote file size"""
try
:
return
int
(
self
.
headers
[
"Content-Length"
])
except
Exception
as
exp
:
raise
Error
(
"Failed to get size of %s: %s"
%
(
self
.
url
,
exp
))
@
property
def
lastmod
(
self
):
try
:
return
int
(
mktime
(
parsedate
(
self
.
headers
[
"Last-Modified"
])))
except
Exception
as
exp
:
raise
Error
(
"Failed to get last modification date of %s: %s"
%
(
self
.
url
,
exp
))
@
property
def
headers
(
self
):
"""Return a dict with url headers"""
if
not
hasattr
(
self
,
"_headers"
):
try
:
debug
(
"Request headers on URL: %s"
%
self
.
url
)
url_req
=
Request
(
self
.
url
,
method
=
"HEAD"
,
headers
=
self
.
HTTP_HEADERS
)
remote_fd
=
urlopen
(
url_req
,
timeout
=
self
.
timeout
)
self
.
_headers
=
dict
(
remote_fd
.
getheaders
())
except
Exception
as
exp
:
raise
Error
(
"Failed to get headers at %s: %s"
%
(
self
,
exp
))
return
getattr
(
self
,
"_headers"
)
def
download
(
self
,
destination
):
"""Download URL to destination"""
debug
(
"Downloading from : %s"
%
self
.
url
)
debug
(
" to : %s"
%
destination
)
try
:
url_req
=
Request
(
self
.
url
,
headers
=
self
.
HTTP_HEADERS
)
remote_fd
=
urlopen
(
url_req
,
timeout
=
self
.
timeout
)
local_fd
=
open
(
destination
,
"wb"
)
copyfileobj
(
remote_fd
,
local_fd
)
except
Exception
as
exp
:
raise
Error
(
"Failed to download %s: %s"
%
(
self
,
exp
))
class
Package
(
Url
):
"""Abstract a multi versionned package"""
def
__init__
(
self
,
url
,
timeout
):
self
.
url
=
Url
(
url
,
timeout
)
self
.
sigurl
=
Url
(
url
+
SIG_EXT
,
timeout
)
self
.
timeout
=
timeout
self
.
filename
=
basename
(
url
)
self
.
sigfilename
=
self
.
filename
+
SIG_EXT
# regex is not strict, but we are not validating something here
m
=
match
(
"^([\w@._+-]+)-((?:(\d+):)?([^-]+)-([^-]+))-(\w+)"
,
self
.
filename
)
if
m
is
None
:
raise
Error
(
"Unable to parse package info from filename %s"
%
self
.
filename
)
(
self
.
name
,
self
.
full_version
,
self
.
epoch
,
self
.
version
,
self
.
release
,
self
.
arch
)
=
m
.
groups
()
# no epoch means 0 (man PKGBUILD)
if
self
.
epoch
is
None
:
self
.
epoch
=
0
def
__str__
(
self
):
return
self
.
filename
def
__getitem__
(
self
,
key
):
try
:
return
getattr
(
self
,
key
)
except
AttributeError
:
raise
KeyError
()
@
property
def
size
(
self
):
"""Return package Content-Length (size in bytes)"""
return
self
.
url
.
size
@
property
def
lastmod
(
self
):
"""Return package Last-Modified date (in seconds since epoch)"""
return
self
.
url
.
lastmod
def
get
(
self
,
sign
=
True
,
force
=
False
):
"""Download the package locally"""
if
not
force
:
if
exists
(
self
.
filename
):
raise
Error
(
"Local file %s already exists"
%
self
.
filename
)
if
sign
and
exists
(
self
.
sigfilename
):
raise
Error
(
"Local file %s already exists"
%
self
.
sigfilename
)
self
.
url
.
download
(
self
.
filename
)
if
sign
and
self
.
sigurl
.
exists
:
self
.
sigurl
.
download
(
self
.
sigfilename
)
class
Archive
(
object
):
"""Abstract access to the package Archive"""
def
__init__
(
self
,
url
,
timeout
,
update
=
1
):
"""Init the Archive interface
url of the archive (flat style)
update = update the local index cache (0: never, 1: when needed, 2: always)
timeout = the socket timeout for network requests
"""
if
url
[
-
1
]
!=
"/"
:
raise
Error
(
"Archive URL must end with a /"
)
self
.
url
=
url
self
.
remote_index
=
Url
(
self
.
url
+
INDEX_FILENAME
,
timeout
)
self
.
local_index
=
join
(
save_cache_path
(
NAME
),
INDEX_FILENAME
)
self
.
timeout
=
timeout
if
update
>
0
:
self
.
update_index
(
update
==
2
)
self
.
_load_index
()
def
_load_index
(
self
):
debug
(
"Loading index from %s"
%
self
.
local_index
)
fd
=
xzopen
(
self
.
local_index
,
"rb"
)
self
.
_index
=
OrderedDict
()
for
line
in
fd
.
readlines
():
key
=
line
.
decode
().
rstrip
()
self
.
_index
[
key
]
=
Package
(
"%s%s%s"
%
(
self
.
url
,
key
,
PKG_EXT
),
self
.
timeout
)
debug
(
"Index loaded: %s packages"
%
len
(
self
.
_index
))
def
update_index
(
self
,
force
=
False
):
"""Update index remotely when needed"""
debug
(
"Check remote index for upgrade"
)
if
force
:
return
self
.
remote_index
.
download
(
self
.
local_index
)
# get remote info
try
:
remote_size
=
self
.
remote_index
.
size
remote_lastmod
=
self
.
remote_index
.
lastmod
except
Exception
as
exp
:
debug
(
"Failed to get remote index size/lastmod: %s"
%
exp
)
return
self
.
remote_index
.
download
(
self
.
local_index
)
# get local info
try
:
local_st
=
stat
(
self
.
local_index
)
except
Exception
as
exp
:
debug
(
"Failed to get local stat: %s"
%
exp
)
return
self
.
remote_index
.
download
(
self
.
local_index
)
# compare size
if
remote_size
!=
local_st
.
st_size
:
debug
(
"Size differ between remote and local index (%s vs %s)"
%
(
remote_size
,
local_st
.
st_size
))
return
self
.
remote_index
.
download
(
self
.
local_index
)
# compare date
elif
remote_lastmod
>
local_st
.
st_mtime
:
debug
(
"Remote index is newer than local, updating it (%s vs %s)"
%
(
remote_lastmod
,
local_st
.
st_mtime
))
return
self
.
remote_index
.
download
(
self
.
local_index
)
debug
(
"Remote and local indexes seems equal. No update."
)
def
search
(
self
,
name_pattern
,
version_pattern
,
arch_list
=
None
):
"""Search for a package """
name_regex
=
recompile
(
name_pattern
)
version_regex
=
recompile
(
version_pattern
)
if
version_pattern
is
not
None
else
None
res
=
list
()
for
pkg
in
self
.
_index
.
values
():
if
name_regex
.
search
(
pkg
.
name
):
# check against arch
if
arch_list
is
not
None
and
len
(
arch_list
)
>
0
:
if
pkg
.
arch
not
in
arch_list
:
continue
# check against version
if
version_regex
is
not
None
:
if
not
version_regex
.
search
(
pkg
.
full_version
):
continue
res
+=
[
pkg
]
return
res
def
list_packages
(
packages
,
long
=
False
):
"""display a list of packages on stdout"""
if
long
:
pattern
=
"%(name)s %(full_version)s %(arch)s %(size)s %(lastmod)s %(url)s"
else
:
pattern
=
"%(name)s %(full_version)s %(arch)s"
for
package
in
packages
:
print
(
pattern
%
package
)
def
get_packages
(
packages
):
"""retrieve packages"""
if
len
(
packages
)
==
1
:
packages
[
0
].
get
()
else
:
index
=
dict
(
enumerate
(
packages
))
for
i
,
pkg
in
index
.
items
():
print
(
i
,
pkg
)
n
=
int
(
input
(
"Which number? "
))
index
[
n
].
get
()
def
parse_argv
():
'''Parse command line arguments'''
local_arch
=
uname
().
machine
p_main
=
ArgumentParser
()
p_main
.
add_argument
(
"--version"
,
action
=
"version"
,
version
=
"%(prog)s version "
+
VERSION
)
p_main
.
add_argument
(
"--debug"
,
action
=
"store_true"
,
help
=
"debug mode"
)
g_update
=
p_main
.
add_mutually_exclusive_group
()
g_update
.
add_argument
(
"-u"
,
"--force-update"
,
action
=
"store_const"
,
dest
=
"update"
,
const
=
2
,
default
=
1
,
help
=
"force index update"
)
g_update
.
add_argument
(
"-U"
,
"--no-update"
,
action
=
"store_const"
,
dest
=
"update"
,
const
=
0
,
help
=
"disable index update"
)
p_main
.
add_argument
(
"-l"
,
"--list"
,
action
=
"store_const"
,
dest
=
"mode"
,
const
=
"list"
,
help
=
"only list matching packages"
)
p_main
.
add_argument
(
"-g"
,
"--get"
,
action
=
"store_const"
,
dest
=
"mode"
,
const
=
"get"
,
help
=
"get matching packages (default)"
)
p_main
.
add_argument
(
"-a"
,
"--arch"
,
nargs
=
"*"
,
default
=
[
local_arch
,
"any"
],
help
=
"filter by architectures (default is %s and any. empty means all)"
%
local_arch
)
p_main
.
add_argument
(
"-v"
,
"--verbose"
,
action
=
"store_true"
,
help
=
"display more information"
)
p_main
.
add_argument
(
"--url"
,
help
=
"archive URL"
,
default
=
ARCHIVE_URL
)
p_main
.
add_argument
(
"-t"
,
"--timeout"
,
default
=
5
,
help
=
"connection timeout (10s)"
)
p_main
.
add_argument
(
"package"
,
help
=
"regex to match a package name"
)
p_main
.
add_argument
(
"version"
,
nargs
=
"?"
,
help
=
"regex to match a package version"
)
return
p_main
.
parse_args
()
def
main
():
'''Program entry point'''
try
:
# parse command line
args
=
parse_argv
()
# set global debug mode
if
args
.
debug
:
getLogger
().
setLevel
(
DEBUG
)
# init archive interface
archive
=
Archive
(
args
.
url
,
args
.
timeout
,
args
.
update
)
# select target pacakges
packages
=
archive
.
search
(
args
.
package
,
args
.
version
,
args
.
arch
)
if
len
(
packages
)
==
0
:
print
(
"No match found."
)
exit
(
0
)
if
args
.
mode
==
"list"
:
list_packages
(
packages
,
long
=
args
.
verbose
)
else
:
get_packages
(
packages
)
except
KeyboardInterrupt
:
exit
(
Error
.
ERR_ABORT
)
except
Error
as
exp
:
error
(
exp
)
exit
(
Error
.
ERR_FATAL
)
except
Exception
as
exp
:
error
(
"Unknown error. Please report it with --debug."
)
error
(
exp
)
if
getLogger
().
getEffectiveLevel
()
==
DEBUG
:
raise
exit
(
Error
.
ERR_UNKNOWN
)
if
__name__
==
'__main__'
:
main
()
# vim:set ts=4 sw=4 et ai:
Write
Preview
Markdown
is supported
0%
Try again
or
attach a new file
.
Attach a file
Cancel
You are about to add
0
people
to the discussion. Proceed with caution.
Finish editing this message first!
Cancel
Please
register
or
sign in
to comment