Newer
Older
"""
pipdeps
"""
import argparse
import json
import distutils.version
import os

Marek Chrastina
committed
import tarfile
import tempfile
import zipfile
import packaging.specifiers
import packaging.version
def arg_parse():
"""
argument parser
"""
parser = argparse.ArgumentParser(
description="Pipdeps shows/upgrades outdated packages with respect to existing \
dependencies."
)
group = parser.add_mutually_exclusive_group(required=True)
group.add_argument('-l', '--list',
action='store_true',

Marek Chrastina
committed
help="show upgradeable packages and versions")
group.add_argument('-u', '--upgrade',
action='store_true',

Marek Chrastina
committed
help="upgrade upgradeable packages")
group.add_argument('-s', '--show',
nargs='+',
help="show detailed info about upgradeable packages")
def get_pyver():
"""
return running python version
"""
return ".".join(map(str, sys.version_info[:3]))

Marek Chrastina
committed
def is_strict_version(version):
"""
Return true if version is strict, otherwise return false
"""
try:
distutils.version.StrictVersion(version)
except ValueError:
return False
return True

Marek Chrastina
committed
def version_conform_specifiers(version, specifiers):

Marek Chrastina
committed
check if version conforms specifiers

Marek Chrastina
committed
if not specifiers:

Marek Chrastina
committed
elif version is None:

Marek Chrastina
committed
ver = packaging.version.Version(version)
spec = packaging.specifiers.SpecifierSet(",".join(specifiers))
if spec.contains(ver):
return True
return False
def upgrade_package(package, versions):
"""
pip install --upgrade "<package><versions>"
"""
subprocess.check_call(
["pip", "install", "--upgrade", "%s==%s" % (package, "".join(versions))],
stderr=subprocess.STDOUT

Marek Chrastina
committed
def get_pip_list():
outdated_packages = subprocess.check_output(["pip", "list"])
return [line.split()[0] for line in outdated_packages.strip().split("\n")[2:]]

Marek Chrastina
committed
def file_download(url):
"""
Download file from url as temporary file
It returns file object
"""
tmp_file = tempfile.NamedTemporaryFile(delete=False)
rfile = urllib2.urlopen(url)
with tmp_file as output:
output.write(rfile.read())
return tmp_file
"""
pipdeptree --json-tree
"""
pipdeptree = subprocess.check_output(
["pipdeptree", "--json-tree"],
stderr=subprocess.STDOUT
)
return json.loads(pipdeptree.strip())

Marek Chrastina
committed
def get_json(url):
"""
Return url json
"""
return json.load(urllib2.urlopen(urllib2.Request(url)))
def json_search(jsonpipdeptree, package, key):
"""
find package dependencies in json tree
"""
if isinstance(jsonpipdeptree, dict):
keys = jsonpipdeptree.keys()
if 'package_name' in keys and key in keys:
if re.search(r'^%s$' % package, jsonpipdeptree['package_name'], re.IGNORECASE):
yield jsonpipdeptree[key]
for child_val in json_search(jsonpipdeptree['dependencies'], package, key):
yield child_val
elif isinstance(jsonpipdeptree, list):
for item in jsonpipdeptree:
for item_val in json_search(item, package, key):
yield item_val

Marek Chrastina
committed
def get_highest_version(package, data):
"""
Return upgradeable version if possible, otherwise return installed version
"""
try:
version = data[package]['upgradeable_version']
except KeyError:
version = data[package]['installed_version']
return version
def find_available_vers(package_name, pyver):
"""
Return descending list of available strict version
"""

Marek Chrastina
committed
data = get_json("https://pypi.python.org/pypi/%s/json" % (package_name,))
releases = data["releases"].keys()
for release in releases:

Marek Chrastina
committed
requires_python = []
for item in data["releases"][release]:
if item['requires_python'] is not None:
requires_python.append(item['requires_python'])
if is_strict_version(release) and version_conform_specifiers(pyver, requires_python):
versions.append(release)
return sorted(versions, key=distutils.version.StrictVersion, reverse=True)

Marek Chrastina
committed
def get_newer_vers(available_version, required_version, installed_version=None):

Marek Chrastina
committed
Return list of newer versions which conforms pipdeptree dependencies, otherwise return none.

Marek Chrastina
committed
if [rver for rver in required_version if re.search(r'(^==.*|^\d.*)', rver) is not None]:

Marek Chrastina
committed
result = []
av_version = list(available_version)
except IndexError:
break
aver = packaging.version.Version(version)
rver = packaging.specifiers.SpecifierSet(",".join(required_version))
if rver.contains(aver):

Marek Chrastina
committed
if installed_version is not None:
iver = packaging.version.Version(installed_version)
if aver == iver:
break
elif aver > iver:
result.append(version)
else:
result.append(version)
if result:
return sorted(result, key=distutils.version.StrictVersion, reverse=True)

Marek Chrastina
committed
def parse_requires_txt(package, version):

Marek Chrastina
committed
Return content of requires.txt until first [ appears

Marek Chrastina
committed
content = None
release_data = get_json("https://pypi.python.org/pypi/%s/%s/json" % (package, version,))
for item in release_data['releases'][version]:
if item['packagetype'] == 'sdist':
tmp_file = file_download(item['url'])
try:
tar_file = tarfile.open(tmp_file.name, 'r')
for member in tar_file.getmembers():
if 'requires.txt' in member.name:
content = tar_file.extractfile(member)
except tarfile.ReadError:
zip_file = zipfile.ZipFile(tmp_file.name, 'r')
for member in zip_file.namelist():
if 'requires.txt' in member:
content = zip_file.read(member)
if content is not None:
par = []
for line in content:
if '[' in line:
break
else:
par.append(line.strip())
content = "\n".join(par)

Marek Chrastina
committed
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
return content
def find_new_dependencies(package, version, package_list, pyver):
"""
Return package dependencies parsed from pypi json
"""
content = parse_requires_txt(package, version)
if content is not None:
for line in content.split("\n"):
try:
pkg = re.search(r'^([a-zA-Z0-9_.-]+)', line).group(0)
dep = line.replace(pkg, '').strip()
if not dep:
dep = None
if pkg in package_list:
yield (pkg, dep)
else:
for child in find_new_dependencies(
pkg,
get_newer_vers(find_available_vers(pkg, pyver), dep, None)[0],
package_list,
pyver
):
yield child
except AttributeError:
pass
def depless_vers(res):
"""
If there is no dependencies or versionless dependencies, return the upgradeable version,
otherwise return None
"""
depless = []
for ver, deps in res.iteritems():
if not deps:
depless.append(ver)
else:
if not [dep for dep in deps if dep[1] is not None]:
depless.append(ver)
if depless:
depless = sorted(depless, key=distutils.version.StrictVersion, reverse=True)[0]
else:
depless = None
return depless
def collect_packages(package_list, jsonpipdeptree, pyver=None):
"""
Collect data about packages as dictionary
"""
result = {}

Marek Chrastina
committed
[_ for _ in json_search(jsonpipdeptree, package, 'installed_version')])))
required_version = []
for dep in list(set(
[_ for _ in json_search(jsonpipdeptree, package, 'required_version')]
)):
if 'Any' not in dep:
required_version.append(dep)
available_version = find_available_vers(package, pyver)
newer_version = get_newer_vers(available_version, required_version, installed_version)
rev = {'installed_version': installed_version,
'required_version': required_version,
'available_version': available_version}

Marek Chrastina
committed
if newer_version is not None:
res = {}
for version in newer_version:
res[version] = [
_ for _ in find_new_dependencies(package, version, package_list, pyver)]
rev['newer_version'] = res
depless = depless_vers(res)
if depless:
rev['upgradeable_version'] = depless
result[package] = rev

Marek Chrastina
committed
def check_deps(deps, packages_data):

Marek Chrastina
committed
Return true, if all package dependencies conforms

Marek Chrastina
committed
ndeps = []
for item in deps:
if item[1] is not None:
ndeps.append(
version_conform_specifiers(
get_highest_version(item[0], packages_data),
packages_data[item[0]]['required_version']+[item[1]]
)
)
return all(ndeps)
def select_pkgs(packages_data, rkey):
"""
Return data packages having requested key
"""
result = {}
for pkg, pkg_data in packages_data.iteritems():
if rkey in pkg_data.keys():
result[pkg] = pkg_data
def print_list(upgradeable_pkgs):
"""
Provides list option
"""
if upgradeable_pkgs:
data = []
for pkg, pkg_data in sorted(upgradeable_pkgs.iteritems(), key=lambda x: x[0].lower()):
data.append([pkg, pkg_data['installed_version'], pkg_data['upgradeable_version']])
print tabulate.tabulate(
data,
['package', 'installed_version', 'upgradeable_version']
)
sys.exit(1)
else:
print "There is nothing to upgrade."
sys.exit(0)
def main():
"""
main function
"""
os.environ["PYTHONWARNINGS"] = "ignore:DEPRECATION"
arguments = arg_parse()

Marek Chrastina
committed
pkglist = get_pip_list()
jsonpipdeptree = get_jsonpipdeptree()
packages_data = collect_packages(pkglist, jsonpipdeptree, pyver=pyver)
for pkg, pkg_data in sorted(
select_pkgs(packages_data, 'newer_version').iteritems(), key=lambda x: x[0].lower()
):
pkg_keys = pkg_data.keys()
if 'newer_version' in pkg_keys and 'upgradeable_version' not in pkg_keys:
for ver, deps in sorted(
pkg_data['newer_version'].iteritems(),
key=lambda x: distutils.version.StrictVersion(x[0]),
reverse=True
):
ndeps = check_deps(deps, packages_data)
if ndeps:
packages_data[pkg]['upgradeable_version'] = ver
break
upgradeable_pkgs = select_pkgs(packages_data, 'upgradeable_version')

Marek Chrastina
committed
if arguments.list:
print_list(upgradeable_pkgs)
if arguments.show:
for pkg in arguments.show:
pprint.pprint({pkg: packages_data[pkg]})
sys.exit(0)

Marek Chrastina
committed
if arguments.upgrade:
if 'pip' in upgradeable_pkgs.keys():
upgrade_package('pip', upgradeable_pkgs['pip']['upgradeable_version'])
del upgradeable_pkgs['pip']
for pkg, pkg_data in sorted(upgradeable_pkgs.iteritems(), key=lambda x: x[0].lower()):
upgrade_package(pkg, pkg_data['upgradeable_version'])
print "Done."