Skip to content
Snippets Groups Projects
Commit e935e833 authored by Marek Chrastina's avatar Marek Chrastina
Browse files

Serious bugs in pipdeptree, do not use it anymore

parent 6db51879
No related branches found
No related tags found
No related merge requests found
Pipeline #9059 failed
...@@ -2,22 +2,30 @@ ...@@ -2,22 +2,30 @@
pipdeps pipdeps
""" """
import argparse import argparse
import itertools
import json import json
import distutils.version
import os import os
import platform
import pprint import pprint
import re import re
import subprocess import subprocess
import sys import sys
import urllib2
import tarfile import tarfile
import tempfile import tempfile
import urllib2
import zipfile import zipfile
import wheel.metadata
import tabulate import tabulate
import packaging.specifiers import packaging.specifiers
import packaging.version import packaging.version
import pip._internal.utils.misc
import wheel.metadata
# https://www.python.org/dev/peps/pep-0508/#environment-markers
PY_VER = ".".join(map(str, sys.version_info[:2]))
SYS_PLAT = sys.platform
PLAT_PY_IMPL = platform.python_implementation()
def arg_parse(): def arg_parse():
""" """
...@@ -35,117 +43,267 @@ def arg_parse(): ...@@ -35,117 +43,267 @@ def arg_parse():
action='store_true', action='store_true',
help="upgrade upgradeable packages") help="upgrade upgradeable packages")
group.add_argument('-s', '--show', group.add_argument('-s', '--show',
nargs='+', nargs='*',
help="show detailed info about upgradeable packages") help="show detailed info about upgradeable packages")
return parser.parse_args() return parser.parse_args()
def get_pyver(): def upgrade_package(data):
""" """
return running python version pip install --upgrade "<package>==<versions>"
""" """
return ".".join(map(str, sys.version_info[:3])) to_upgrade = []
for package, version in data:
to_upgrade.append("%s==%s" % (package, version))
subprocess.check_call(
["pip", "install", "--upgrade", " ".join(to_upgrade)],
stderr=subprocess.STDOUT
)
def is_strict_version(version): def get_json(url):
""" """
Return true if version is strict, otherwise return false Return url json
""" """
try: return json.load(urllib2.urlopen(urllib2.Request(url)))
distutils.version.StrictVersion(version)
except ValueError: def file_download(url):
return False """
Download file from url as temporary file
It returns file object
"""
tmp_file = tempfile.NamedTemporaryFile(delete=False)
rfile = urllib2.urlopen(url)
with tmp_file as output:
output.write(rfile.read())
return tmp_file
def merge_two_dicts(x, y):
"""
Return merge of two dictionaries
"""
z = x.copy()
z.update(y)
return z
def is_version(version):
"""
Return true if version satisfy regex, otherwise return false
"""
if re.compile(r'^(\d+) \. (\d+) (\. (\d+))? (\. (\d+))?$', re.VERBOSE).search(version) or \
re.compile(r'^(\d+) \. (\d+) (\. (\d+))? (rc(\d+))?$', re.VERBOSE).search(version):
return True return True
return False
def version_conform_specifiers(version, specifiers): def is_in_specifiers(version, specifiers):
""" """
check if version conforms specifiers Return true if version satisfy specifiers, otherwise return false
""" """
if not specifiers: if not specifiers:
return True return True
elif version is None: elif version is None:
return True return True
else: else:
ver = packaging.version.Version(version) # https://github.com/pypa/packaging/pull/92
spec = packaging.specifiers.SpecifierSet(",".join(specifiers)) ver = packaging.version.LegacyVersion(version)
if spec.contains(ver): specifiers = [
packaging.specifiers.LegacySpecifier(s.strip()) for s in specifiers if s.strip()]
return all(s.contains(ver) for s in specifiers)
def is_in_conditions(condition):
"""
Return true if condition satisfy sys_platform and python_version and
platform_python_implementation, otherwise return false
"""
if not condition:
return True return True
return False return eval(
condition.replace("sys_platform", '"%s"' % SYS_PLAT) \
.replace("python_version", '"%s"' % PY_VER) \
.replace("platform_python_implementation", '"%s"' % PLAT_PY_IMPL))
def upgrade_package(package, versions): def is_in_extra(extra, req_extra):
""" """
pip install --upgrade "<package><versions>" Return true if extra satisfy, otherwise return false
""" """
subprocess.check_call( if extra is None or extra in req_extra:
["pip", "install", "--upgrade", "%s==%s" % (package, "".join(versions))], return True
stderr=subprocess.STDOUT return False
)
def get_pip_list(): def specifiers_intersection(specifiers):
""" """
pip list Return intersection of specifiers, otherwise return None
""" """
outdated_packages = subprocess.check_output(["pip", "list"]) if not specifiers:
return [line.split()[0] for line in outdated_packages.strip().split("\n")[2:]] return []
specifiers = [packaging.specifiers.LegacySpecifier(s.strip()) for s in specifiers if s.strip()]
left_boarder = [s for s in specifiers if s.operator in ['>', '>=']]
if left_boarder:
max_left = sorted([s.version for s in left_boarder],
key=packaging.specifiers.LegacyVersion, reverse=True)[0]
max_left_op = [s.operator for s in left_boarder if s.version == max_left]
if '>' in max_left_op:
max_left_op = '>'
else:
max_left_op = '>='
right_boarder = [s for s in specifiers if s.operator in ['<', '<=']]
if right_boarder:
min_right = sorted([s.version for s in right_boarder],
key=packaging.specifiers.LegacyVersion)[0]
min_right_op = [s.operator for s in right_boarder if s.version == min_right]
if '<' in min_right_op:
min_right_op = '<'
else:
min_right_op = '<='
equals = [s for s in specifiers if s.operator in ['==']]
if equals:
cmp_v = list(set([s.version for s in equals]))[0]
if all([packaging.version.LegacyVersion(cmp_v) == packaging.version.LegacyVersion(item) \
for item in list(set([s.version for s in equals]))]):
equals = cmp_v
else:
return None
notequals = [s for s in specifiers if s.operator in ['!=']]
notequals = list(set([s.version for s in notequals]))
boarders = []
if left_boarder and right_boarder:
if packaging.version.LegacyVersion(max_left) > packaging.version.LegacyVersion(min_right):
return None
elif packaging.version.LegacyVersion(max_left) == packaging.version.LegacyVersion(min_right): # pylint: disable=line-too-long
if max_left_op in ['>='] and min_right_op in ['<=']:
max_left_op = '=='
right_boarder = None
else:
return None
if left_boarder:
boarders.append("%s%s" % (max_left_op, max_left))
if right_boarder:
boarders.append("%s%s" % (min_right_op, min_right))
if boarders and notequals:
for item in notequals:
if is_in_specifiers(item, boarders):
boarders.append("!=%s" % item)
elif not boarders and notequals:
for item in notequals:
boarders.append("!=%s" % item)
if boarders and equals:
if is_in_specifiers(equals, boarders):
return ["==%s" % equals]
else:
return None
elif not boarders and equals:
return ["==%s" % equals]
return boarders
def file_download(url): def select_upkgs(data, rkey):
""" """
Download file from url as temporary file Return data packages having requested key
It returns file object
""" """
tmp_file = tempfile.NamedTemporaryFile(delete=False) result = []
rfile = urllib2.urlopen(url) for pkg, pkg_data in data.iteritems():
with tmp_file as output: if rkey in pkg_data.keys():
output.write(rfile.read()) result.append(pkg)
return tmp_file return result
def get_jsonpipdeptree(): def print_list(data):
""" """
pipdeptree --json-tree Print upgradeable versions
""" """
pipdeptree = subprocess.check_output( upkgs = select_upkgs(data, 'upgradeable_version')
["pipdeptree", "--json-tree"], if upkgs:
stderr=subprocess.STDOUT tab_data = []
for pkg in sorted(upkgs):
tab_data.append([pkg, data[pkg]['installed_version'], data[pkg]['upgradeable_version']])
print tabulate.tabulate(
tab_data,
['package', 'installed_version', 'upgradeable_version']
) )
return json.loads(pipdeptree.strip()) return 1
else:
print "There is nothing to upgrade."
return 0
def get_json(url): def pkginfo(data, req_extra=[], repair=False):
""" """
Return url json Return parsed pkginfo
""" """
return json.load(urllib2.urlopen(urllib2.Request(url))) extra_match = re.compile(
"""^(?P<package>.*?)(;\s*(?P<condition>.*?)(extra == '(?P<extra>.*?)')?)$""").search(data)
if extra_match:
groupdict = extra_match.groupdict()
condition = groupdict['condition']
extra = groupdict['extra']
package = groupdict['package']
if condition.endswith(' and '):
condition = condition[:-5]
mysearch = re.compile(r'(extra == .*)').search(condition)
if mysearch:
extra = mysearch.group(1)
condition = condition.replace(extra, '')
if not condition:
condition = None
extra = re.compile(r'extra == (.*)').search(extra).group(1).replace('"', "")
else:
condition, extra = None, None
package = data
if not is_in_conditions(condition):
return None
pkg_name, pkg_extra, pkg_ver = re.compile(
r'([\w\.\-]*)(\[\w*\])?(.*)').search(package).groups()
if pkg_extra:
pkg_extra = pkg_extra.replace("[", "").replace("]", "").lower()
pkg_ver = pkg_ver.replace("(", "").replace(")", "").strip()
if not pkg_ver:
pkg_ver = []
else:
if repair:
try:
pkg_ver = re.compile(r'^(\d.*)$').search(pkg_ver).group(1)
except AttributeError:
pass
pkg_ver = pkg_ver.split(",")
if not is_in_extra(extra, req_extra):
return None
return (pkg_name.lower(), pkg_ver, pkg_extra)
def json_search(jsonpipdeptree, package, key): def insert_extras(data):
""" """
find package dependencies in json tree Insert extras
""" """
if isinstance(jsonpipdeptree, dict): for key in data.keys():
keys = jsonpipdeptree.keys() extra = []
if 'package_name' in keys and key in keys: for pkg, pkg_data in data.iteritems():
if re.search(r'^%s$' % package, jsonpipdeptree['package_name'], re.IGNORECASE): for dep in pkg_data['requires']:
yield jsonpipdeptree[key] if dep[0] == key:
for child_val in json_search(jsonpipdeptree['dependencies'], package, key): if dep[2]:
yield child_val extra.append(dep[2])
elif isinstance(jsonpipdeptree, list): data[key]['extras'] = extra
for item in jsonpipdeptree: if extra:
for item_val in json_search(item, package, key): for pkg in pip._internal.utils.misc.get_installed_distributions():
yield item_val pkg_name, pkg_ver, pkg_extra = pkginfo(str(pkg))
if pkg_name == key:
data[key]['requires'] += [pkginfo(str(dep), repair=True, req_extra=extra) \
for dep in pkg.requires(extras=extra)]
return data
def get_highest_version(package, data): def insert_availables(data):
""" """
Return upgradeable version if possible, otherwise return installed version Insert available versions
""" """
for pkg, pkg_data in data.iteritems():
if 'available_version' in pkg_data.keys():
continue
try: try:
version = data[package]['upgradeable_version'] data[pkg]['available_version'] = get_available_vers(pkg)
except KeyError: except urllib2.HTTPError:
version = data[package]['installed_version'] data[pkg]['available_version'] = []
return version return data
def find_available_vers(package_name, pyver): def get_available_vers(package):
""" """
Return descending list of available strict version Return descending list of public available strict version
""" """
versions = [] versions = []
try: try:
data = get_json("https://pypi.python.org/pypi/%s/json" % (package_name,)) data = get_json("https://pypi.python.org/pypi/%s/json" % (package))
except urllib2.HTTPError, err: except urllib2.HTTPError, err:
print "%s %s" % (err, err.url) print "%s %s" % (err, err.url)
raise urllib2.HTTPError(err.url, err.code, None, err.hdrs, err.fp) raise urllib2.HTTPError(err.url, err.code, None, err.hdrs, err.fp)
...@@ -154,41 +312,109 @@ def find_available_vers(package_name, pyver): ...@@ -154,41 +312,109 @@ def find_available_vers(package_name, pyver):
requires_python = [] requires_python = []
for item in data["releases"][release]: for item in data["releases"][release]:
if item['requires_python'] is not None: if item['requires_python'] is not None:
requires_python.append(item['requires_python']) for reqpyt in item['requires_python'].split(","):
if is_strict_version(release) and version_conform_specifiers(pyver, requires_python): requires_python.append(reqpyt.strip())
if requires_python:
requires_python = list(set(requires_python))
if is_version(release) and is_in_specifiers(PY_VER, requires_python):
versions.append(release) versions.append(release)
return sorted(versions, key=distutils.version.StrictVersion, reverse=True) return sorted(versions, key=packaging.specifiers.LegacyVersion, reverse=True)
def get_newer_vers(available_version, required_version, installed_version=None): def select_news(available_version, installed_version=None):
""" """
Return list of newer versions which conforms pipdeptree dependencies, otherwise return none. Select versions newer than installed version, if it is known
""" """
if required_version is None: if installed_version is None:
result = [aver for aver in list(available_version)] return sorted(available_version, key=packaging.specifiers.LegacyVersion, reverse=True)
return sorted(result, key=distutils.version.StrictVersion, reverse=True)
if [rver for rver in required_version if re.search(r'(^==.*|^\d.*)', rver) is not None]:
return None
result = []
av_version = list(available_version)
while True:
try:
version = av_version.pop(0)
except IndexError:
break
aver = packaging.version.Version(version)
rver = packaging.specifiers.SpecifierSet(",".join(required_version))
if rver.contains(aver):
if installed_version is not None:
iver = packaging.version.Version(installed_version) iver = packaging.version.Version(installed_version)
if aver == iver: return sorted([aver for aver in available_version if packaging.version.Version(aver) > iver],
break key=packaging.specifiers.LegacyVersion, reverse=True)
elif aver > iver:
result.append(version) def insert_news(data):
else: """
result.append(version) Insert new versions
if result: """
return sorted(result, key=distutils.version.StrictVersion, reverse=True) for pkg, pkg_data in data.iteritems():
return None if 'new_version' in pkg_data.keys():
continue
try:
new_version = select_news(pkg_data['available_version'], pkg_data['installed_version'])
except KeyError:
new_version = select_news(pkg_data['available_version'])
if new_version:
res = {}
for version in new_version:
content = parse_metadata(get_metadata(pkg, version), pkg_data['extras'])
res[version] = content
if res:
pkg_data['new_version'] = res
return data
def new_packages(data):
"""
Return new packages as dictionary
"""
out = {}
arr = []
pkg_list = data.keys()
for pkg, pkg_data in data.iteritems():
try:
for ver, ver_data in pkg_data['new_version'].iteritems():
for dep in ver_data:
if dep[0] not in pkg_list:
arr.append(dep)
except KeyError:
pass
for item in list(set([_[0] for _ in arr])):
extras = []
for pkg, req, extra in arr:
if pkg == item and extra is not None:
extras.append(extra)
out[item] = {'extras': extras}
return out
def check_new_extras(data):
"""
Check if there are new extras
"""
extra_pkgs = []
pkg_list = data.keys()
for pkg, pkg_data in data.iteritems():
try:
for ver, ver_data in pkg_data['new_version'].iteritems():
for dep in ver_data:
if dep[0] in pkg_list and dep[2] is not None:
extra_pkgs.append(dep)
except KeyError:
pass
for pkg, req, extra in extra_pkgs:
if extra not in data[pkg]['extras']:
raise Exception('There are new extras!')
def check_extras(data):
"""
Check if there are extras in upgradeable packages
"""
upkgs = select_upkgs(data, 'upgradeable_version')
for package in select_upkgs(data, 'upgradeable_version'):
if data[package]['extras']:
raise Exception('There are extras in upgradeable packages!')
def check_co_branches(data):
"""
Check if there branches with intersection of packages
"""
co_branches = []
package_branches = get_branches(data)
for branch in package_branches.keys():
for pkg, reqs in package_branches.iteritems():
if pkg == branch:
continue
if len(package_branches[branch]+reqs) != len(list(set(package_branches[branch]+reqs))):
co_branches.append(branch)
co_branches = list(set(co_branches))
if co_branches:
raise Exception('There are branches with intersection of packages!')
def write_metadata(tmp_file): def write_metadata(tmp_file):
""" """
...@@ -249,7 +475,7 @@ def get_metadata(package, version): ...@@ -249,7 +475,7 @@ def get_metadata(package, version):
break break
return metadata return metadata
def parse_metadata(metadata, pyver): def parse_metadata(metadata, extra):
""" """
Return dependencies parsed from metadata Return dependencies parsed from metadata
""" """
...@@ -257,157 +483,459 @@ def parse_metadata(metadata, pyver): ...@@ -257,157 +483,459 @@ def parse_metadata(metadata, pyver):
if 'Metadata-Version' in line.decode('utf-8'): if 'Metadata-Version' in line.decode('utf-8'):
metadata_version = line.replace('Metadata-Version:', '').strip() metadata_version = line.replace('Metadata-Version:', '').strip()
break break
if packaging.version.Version(metadata_version) >= packaging.version.Version('2.0'): arr = []
if metadata_version and \
packaging.version.Version(metadata_version) >= packaging.version.Version('2.0'):
arr = []
lines = [line.replace('Requires-Dist:', '').strip() \
for line in metadata if re.search(r'^Requires-Dist:', line)]
for line in lines:
data = pkginfo(str(line), req_extra=extra, repair=True)
if data:
arr.append(pkginfo(str(line), req_extra=extra, repair=True))
return arr
def pvector(package, data):
"""
Return vector of package versions
"""
out = [] out = []
for dep in [ if 'new_version' not in data[package].keys():
line.replace('Requires-Dist:', '').strip() \ out.append((package, data[package]['installed_version']))
for line in metadata if re.search(r'^Requires-Dist:', line)]:
if ';' in dep:
dep = dep.split(';')
if 'python_version' in dep[1]:
if packaging.specifiers.SpecifierSet(
dep[1].replace('python_version', '').replace('"', '').strip()) \
.contains(packaging.version.Version(pyver)):
dep = dep[0]
else: else:
continue if 'upgradeable_version' in data[package].keys():
out.append((package, data[package]['upgradeable_version']))
else: else:
continue if 'installed_version' in data[package].keys():
dep = dep.split() out.append((package, data[package]['installed_version']))
try: for ver in sorted(data[package]['new_version'].keys(),
pkg = re.search(r'(.*)(\[.*\])', dep[0]).group(1) key=packaging.specifiers.LegacyVersion):
except AttributeError: if 'upgradeable_version' in data[package].keys():
pkg = dep[0] if packaging.specifiers.LegacyVersion(ver) > \
try: packaging.specifiers.LegacyVersion(data[package]['upgradeable_version']):
pkg = re.search(r'(^[\w\.\-]*)(.*)', dep[0]).group(1) out.append((package, ver))
dep.append(re.search(r'(^[\w\.\-]*)(.*)', dep[0]).group(2)) else:
except AttributeError: out.append((package, ver))
pkg = dep[0]
try:
ver = dep[1].replace('(', '').replace(')', '').replace(';', '')
except IndexError:
ver = None
out.append((pkg, ver))
return out return out
def find_new_dependencies(package, version, package_list, pyver): def single_multi(data):
""" """
Return package dependencies parsed from pypi json Return list of packages with new versions and list of packages without new versions
""" """
content = parse_metadata(get_metadata(package, version), pyver) pkg_list, single, multi = [], [], []
for pkg, ver in content: for pkg, pkg_data in data.iteritems():
try: if 'requires' in pkg_data.keys():
if pkg in package_list: pkg_list.append(pkg)
yield (pkg, ver) for pkg in pkg_list:
vec = pvector(pkg, data)
if len(vec) == 1:
single.append(*vec)
elif len(vec) > 1:
multi.append(vec)
single = list(set([item[0] for item in single]))
multi = list(set([item[0] for pkg_data in multi for item in pkg_data]))
return single, multi
def incompatible(data, to_delete):
"""
Move new version to incompatible
"""
if not to_delete:
return data
for package, version in to_delete:
if 'incompatible_version' not in data[package].keys():
data[package]['incompatible_version'] = {}
data[package]['incompatible_version'][version] = data[package]['new_version'][version]
del data[package]['new_version'][version]
if not data[package]['new_version']:
del data[package]['new_version']
return data
def del_hards(data):
"""
Return list of packages and their versions that does not satisfy
requirements of packages without new version
"""
package_no_news, package_with_news = single_multi(data)
deps = []
for package in package_no_news:
if 'requires' in data[package].keys():
if 'upgradeable_version' in data[package].keys():
uver = data[package]['upgradeable_version']
deps += [dep for dep in data[package]['new_version'][uver] if dep[1] or dep[2]]
else: else:
deps += [dep for dep in data[package]['requires'] if dep[1] or dep[2]]
hard_requirements = {}
for item in list(set([pkg[0] for pkg in deps])):
reqs, extras = [], []
for pkg, req, extra in deps:
if pkg == item:
reqs += req
if extra:
extras += extra
hard_requirements[item] = {'installed_version': data[item]['installed_version'],
'requirements': list(set(reqs)),
'extras': list(set(extras))}
to_delete = []
for pkg in package_with_news+not_installed(data):
for ver, ver_data in data[pkg]['new_version'].iteritems():
for dep, req, extra in ver_data:
if dep in hard_requirements.keys():
if specifiers_intersection(req+hard_requirements[dep]['requirements']) is None:
to_delete.append((pkg, ver))
return to_delete
def del_no_news(data):
"""
Return list of packages and their versions that does not satisfy packages without new version
"""
to_delete = []
package_no_news, package_with_news = single_multi(data)
for package in package_with_news+not_installed(data):
deps = []
for pkg, pkg_data in data.iteritems():
if pkg != package and 'requires' in data[pkg].keys() and pkg in package_no_news:
if 'upgradeable_version' in data[pkg].keys():
uver = data[pkg]['upgradeable_version']
deps += [item for item in data[pkg]['new_version'][uver] if item[0] == package]
else:
deps += [item for item in data[pkg]['requires'] if item[0] == package]
specifiers = specifiers_intersection([i for i in itertools.chain(*[dep[1] for dep in deps])])
versions = [pkg[1] for pkg in pvector(package, data)]
incver = sorted([version \
for version in versions if not is_in_specifiers(version, specifiers)],
key=packaging.specifiers.LegacyVersion, reverse=True)
for version in versions:
if version in incver:
to_delete.append((package,version))
return to_delete
def del_one_ver(data):
"""
If all packages requirements lead to one specific version, return list of thath packages
"""
to_delete = []
package_no_news, package_with_news = single_multi(data)
for package in package_with_news+not_installed(data):
deps = []
for pkg, pkg_data in data.iteritems():
uver = None
if pkg != package and 'requires' in data[pkg].keys():
if 'upgradeable_version' in data[pkg].keys():
uver = data[pkg]['upgradeable_version']
for req in data[pkg]['new_version'][uver]:
if req[0] == package:
deps.append(req)
else:
for req in data[pkg]['requires']:
if req[0] == package:
deps.append(req)
if 'new_version' in data[pkg].keys():
for ver, ver_data in data[pkg]['new_version'].iteritems():
if uver:
if packaging.specifiers.LegacyVersion(ver) <= packaging.specifiers.LegacyVersion(uver):
continue
for req in ver_data:
if req[0] == package:
deps.append(req)
specifiers = specifiers_intersection([i for i in itertools.chain(*[dep[1] for dep in deps])])
if specifiers:
if len(specifiers) ==1 and '==' in specifiers[0]:
versions = [pkg[1] for pkg in pvector(package, data)]
versions.remove(specifiers[0].replace('==', ''))
for version in versions:
to_delete.append((package, version))
return to_delete
def get_deps(data, package):
"""
Return package deep requirements
"""
try: try:
for child in find_new_dependencies( content = data[package]
pkg, except KeyError:
get_newer_vers(find_available_vers(pkg, pyver), ver, None)[0], content = []
package_list, for pkg in content:
pyver yield pkg
): for child in get_deps(data, pkg):
yield child yield child
except TypeError:
pass
except AttributeError:
pass
def depless_vers(res): def not_installed(data):
"""
Return not installed packages
""" """
If there is no dependencies or versionless dependencies, return the upgradeable version, not_installed = []
otherwise return None for pkg, pkg_data in data.iteritems():
if 'requires' not in pkg_data.keys():
not_installed.append(pkg)
return not_installed
def get_no_news_req(data):
""" """
depless = [] Return requirements of packages without new versions
for ver, deps in res.iteritems(): """
if not deps: reqs = {}
depless.append(ver) package_no_news, package_with_news = single_multi(data)
for package in package_no_news:
version = pvector(package, data)[0][1]
reqs = save_version(reqs, data, package, version)
return reqs
def save_version(r_data, p_data, pkg, ver):
"""
Save the highest package version
"""
if 'installed_version' in p_data[pkg].keys() and p_data[pkg]['installed_version'] == ver:
r_data[pkg] = p_data[pkg]['requires']
else: else:
if not [dep for dep in deps if dep[1] is not None]: r_data[pkg] = p_data[pkg]['new_version'][ver]
depless.append(ver) return r_data
if depless:
depless = sorted(depless, key=distutils.version.StrictVersion, reverse=True)[0] def add_reqs(reqs, data, pkg=None, onlypkg=False):
"""
Append requirements
"""
for dep, req, extra in data:
if pkg and dep != pkg:
continue
if onlypkg:
if isinstance(onlypkg, basestring):
reqs.append(onlypkg)
else: else:
depless = None reqs.append(dep)
return depless else:
reqs.append((dep, req, extra))
def collect_packages(package_list, jsonpipdeptree, pyver=None):
"""
Collect data about packages as dictionary
"""
result = {}
for package in package_list:
installed_version = "".join(list(set(
[_ for _ in json_search(jsonpipdeptree, package, 'installed_version')])))
required_version = []
for dep in list(set(
[_ for _ in json_search(jsonpipdeptree, package, 'required_version')]
)):
if 'Any' not in dep:
required_version.append(dep)
try:
available_version = find_available_vers(package, pyver)
except urllib2.HTTPError:
available_version = [installed_version]
newer_version = get_newer_vers(available_version, required_version, installed_version)
rev = {'installed_version': installed_version,
'required_version': required_version,
'available_version': available_version}
if newer_version is not None:
res = {}
for version in newer_version:
res[version] = [
_ for _ in find_new_dependencies(package, version, package_list, pyver)]
rev['newer_version'] = res
depless = depless_vers(res) def save_ic(out, package, incompatible=None, compatible=None):
if depless: """
rev['upgradeable_version'] = depless Save compatible/incompatible version
"""
if package not in out.keys():
out[package] = {'incompatible': [], 'compatible': None}
if incompatible:
out[package]['incompatible'].append(incompatible)
if compatible:
out[package]['compatible'] = compatible
return out
result[package] = rev def phase_one(data):
return result """
First phase of resolving upgra
"""
no_requires_no_deps = []
out, no_requires, only_no_news_requires, no_requires_deps = {}, {}, {}, {}
package_no_news, package_with_news = single_multi(data)
for pkg in package_with_news:
dep_in_no_news_vers = []
dep_vers = []
uver = None
if 'upgradeable_version' in data[pkg].keys():
uver = data[pkg]['upgradeable_version']
if 'new_version' in data[pkg].keys():
for ver, ver_data in data[pkg]['new_version'].iteritems():
if uver:
if packaging.specifiers.LegacyVersion(ver) <= packaging.specifiers.LegacyVersion(uver):
continue
if ver_data:
reqs = [dep[0] for dep in ver_data]
reqs_in_no_news = [dep for dep in reqs if dep in package_no_news]
reqs_not_in_no_news = [dep for dep in reqs if dep not in package_no_news]
if not reqs_not_in_no_news:
dep_in_no_news_vers.append(ver)
else:
dep_vers.append(ver)
if dep_vers:
no_requires[pkg] = dep_vers
if dep_in_no_news_vers:
only_no_news_requires[pkg] = dep_in_no_news_vers
for package, version in no_requires.iteritems():
reqs = []
for pkg, pkg_data in data.iteritems():
uver = None
if pkg != package and 'requires' in data[pkg].keys():
if 'upgradeable_version' in data[pkg].keys():
uver = data[pkg]['upgradeable_version']
add_reqs(reqs, data[pkg]['new_version'][uver], pkg=package, onlypkg=pkg)
else:
add_reqs(reqs, data[pkg]['requires'], pkg=package, onlypkg=pkg)
if 'new_version' in data[pkg].keys():
for ver, ver_data in data[pkg]['new_version'].iteritems():
if uver:
if packaging.specifiers.LegacyVersion(ver) <= packaging.specifiers.LegacyVersion(uver):
continue
add_reqs(reqs, ver_data, pkg=package, onlypkg=pkg)
if reqs:
no_requires_deps[package] = list(set(reqs))
else:
out = save_ic(out, package, compatible=sorted(no_requires[package], key=packaging.specifiers.LegacyVersion, reverse=True)[0])
for package, dep in no_requires_deps.iteritems():
if all([pkg in package_no_news for pkg in dep]):
reqs = []
for pkg in dep:
if 'upgradeable_version' in data[pkg].keys():
add_reqs(reqs, data[pkg]['new_version'][data[pkg]['upgradeable_version']], pkg=package)
else:
add_reqs(reqs, data[pkg]['requires'], pkg=package)
specifiers = specifiers_intersection([i for i in itertools.chain(*[req[1] for req in reqs])])
versions = no_requires[package]
compatible = sorted([version for version in versions if is_in_specifiers(version, specifiers)], key=packaging.specifiers.LegacyVersion, reverse=True)
for version in versions:
if version not in compatible:
out = save_ic(out, package, incompatible=version)
if compatible:
out = save_ic(out, package, compatible=compatible[0])
def check_deps(deps, packages_data): for package, versions in only_no_news_requires.iteritems():
reqs = []
for pkg, pkg_data in data.iteritems():
uver = None
if pkg != package and 'requires' in data[pkg].keys():
if 'upgradeable_version' in data[pkg].keys():
uver = data[pkg]['upgradeable_version']
add_reqs(reqs, data[pkg]['new_version'][uver], pkg=package, onlypkg=pkg)
else:
add_reqs(reqs, data[pkg]['requires'], pkg=package, onlypkg=pkg)
if 'new_version' in data[pkg].keys():
for ver, ver_data in data[pkg]['new_version'].iteritems():
if uver:
if packaging.specifiers.LegacyVersion(ver) <= packaging.specifiers.LegacyVersion(uver):
continue
add_reqs(reqs, ver_data, pkg=package, onlypkg=pkg)
if all([item in package_no_news for item in list(set(reqs))]):
out = save_ic(out, package, compatible=sorted(versions, key=packaging.specifiers.LegacyVersion, reverse=True)[0])
return out
def get_branches(data):
""" """
Return true, if all package dependencies conforms Return branches
""" """
ndeps = [] branches = []
for item in deps: all_package_req, package_reqs, package_branches = {}, {}, {}
if item[1] is not None: package_no_news, package_with_news = single_multi(data)
ndeps.append( packages_new = not_installed(data)
version_conform_specifiers( for package in data:
get_highest_version(item[0], packages_data), reqs = []
packages_data[item[0]]['required_version']+[item[1]] uver = None
) if 'upgradeable_version' in data[package].keys():
) uver = data[package]['upgradeable_version']
return all(ndeps) add_reqs(reqs, data[package]['new_version'][uver], onlypkg=True)
elif 'requires' in data[package].keys():
add_reqs(reqs, data[package]['requires'], onlypkg=True)
if 'new_version' in data[package].keys():
for ver, ver_data in data[package]['new_version'].iteritems():
if uver:
if packaging.specifiers.LegacyVersion(ver) <= packaging.specifiers.LegacyVersion(uver):
continue
add_reqs(reqs, ver_data, onlypkg=True)
all_package_req[package] = list(set(reqs))
for package in package_with_news+packages_new:
package_reqs[package] = list(set([i for i in get_deps(all_package_req, package)]))
for package in package_with_news+packages_new:
res = []
for pkg, deps in package_reqs.iteritems():
if pkg == package:
continue
if package in deps:
res.append(pkg)
if not res:
branches.append(package)
for branch in branches:
package_branches[branch] = [i for i in package_reqs[branch] if i in package_with_news or i in packages_new]
return package_branches
def select_pkgs(packages_data, rkey): def cross_packages(data):
""" """
Return data packages having requested key Return cross packages
""" """
result = {} cross_branches = []
for pkg, pkg_data in packages_data.iteritems(): out, final_result, pkg_reqs = {}, {}, {}
if rkey in pkg_data.keys(): no_news_req = get_no_news_req(data)
result[pkg] = pkg_data package_branches = get_branches(data)
return result package_no_news, package_with_news = single_multi(data)
for package in package_with_news+not_installed(data):
res = []
for pkg, reqs in package_branches.iteritems():
if package in reqs:
res.append(pkg)
if len(res) > 1:
cross_branches.append(package)
for package in package_with_news+not_installed(data):
if package not in cross_branches:
version = pvector(package, data)[0][1]
pkg_reqs = save_version(pkg_reqs, data, package, version)
ff = merge_two_dicts(pkg_reqs, no_news_req)
for package in cross_branches:
reqs = []
for pkg, pkg_data in ff.iteritems():
add_reqs(reqs, pkg_data, pkg=package)
specifiers = specifiers_intersection([i for i in itertools.chain(*[req[1] for req in reqs])])
versions = [pkg[1] for pkg in pvector(package, data)]
compatible = sorted([version for version in versions if is_in_specifiers(version, specifiers)], key=packaging.specifiers.LegacyVersion, reverse=True)
if compatible:
out = save_ic(out, package, compatible=compatible[0])
return out
def print_list(upgradeable_pkgs):
def ibranch(data, fix=False):
""" """
Provides list option Return upgradeable versions of independent branch
""" """
if upgradeable_pkgs: co_branches = []
data = [] out, final_result = {}, {}
for pkg, pkg_data in sorted(upgradeable_pkgs.iteritems(), key=lambda x: x[0].lower()): no_news_req = get_no_news_req(data)
data.append([pkg, pkg_data['installed_version'], pkg_data['upgradeable_version']]) package_branches = get_branches(data)
print tabulate.tabulate( package_no_news, package_with_news = single_multi(data)
data, for branch in package_branches.keys():
['package', 'installed_version', 'upgradeable_version'] for pkg, reqs in package_branches.iteritems():
) if pkg == branch:
sys.exit(1) continue
if len(package_branches[branch]+reqs) != len(list(set(package_branches[branch]+reqs))):
co_branches.append(branch)
co_branches = list(set(co_branches))
for branch in package_branches.keys():
if branch in co_branches:
continue
if fix:
version = pvector(branch, data)[0][1]
pkg_reqs = save_version({}, data, branch, version)
ff = merge_two_dicts(pkg_reqs, no_news_req)
packages = [pvector(pkg, data)[:2] for pkg in package_branches[branch] if pkg in package_with_news]
else: else:
print "There is nothing to upgrade." ff = no_news_req.copy()
sys.exit(0) packages = [pvector(branch, data)]+[pvector(pkg, data) for pkg in package_branches[branch] if pkg in package_with_news]
for comb in list(itertools.product(*packages)):
pkg_reqs = {}
for package, version in comb:
pkg_reqs = save_version(pkg_reqs, data, package, version)
ff = merge_two_dicts(ff, pkg_reqs)
for package, version in comb:
reqs = []
for pkg, pkg_data in ff.iteritems():
add_reqs(reqs, pkg_data, pkg=package)
specifiers = specifiers_intersection([i for i in itertools.chain(*[req[1] for req in reqs])])
final_result[comb] = is_in_specifiers(version, specifiers)
for comb in final_result.keys():
if final_result[comb]:
sumary = 0
for package, version in comb:
sumary += pvector(package, data).index((package, version))
final_result[comb] = sumary
high = 0
for comb, summary in final_result.iteritems():
if summary > high:
high = summary
if high >0:
res = []
for comb, summary in final_result.iteritems():
if summary == high:
res.append(comb)
for package,version in res[0]:
if data[package]['installed_version'] != version:
out = save_ic(out, package, compatible=version)
return out
def main(): def main():
""" """
...@@ -415,38 +943,82 @@ def main(): ...@@ -415,38 +943,82 @@ def main():
""" """
os.environ["PYTHONWARNINGS"] = "ignore:DEPRECATION" os.environ["PYTHONWARNINGS"] = "ignore:DEPRECATION"
arguments = arg_parse() arguments = arg_parse()
pyver = get_pyver() packages_data = {}
pkglist = get_pip_list() for pkg in pip._internal.utils.misc.get_installed_distributions():
jsonpipdeptree = get_jsonpipdeptree() pkg_name, pkg_ver, pkg_extra = pkginfo(str(pkg))
packages_data = collect_packages(pkglist, jsonpipdeptree, pyver=pyver) rev = {'installed_version': pkg_ver,
for pkg, pkg_data in sorted( 'requires': [pkginfo(str(dep), repair=True) for dep in pkg.requires()]}
select_pkgs(packages_data, 'newer_version').iteritems(), key=lambda x: x[0].lower() packages_data[pkg_name] = rev
): packages_data = insert_extras(packages_data)
pkg_keys = pkg_data.keys() packages_data = insert_availables(packages_data)
if 'newer_version' in pkg_keys and 'upgradeable_version' not in pkg_keys: packages_data = insert_news(packages_data)
for ver, deps in sorted(
pkg_data['newer_version'].iteritems(), while True:
key=lambda x: distutils.version.StrictVersion(x[0]), new_packages_data = new_packages(packages_data)
reverse=True if not new_packages_data:
):
ndeps = check_deps(deps, packages_data)
if ndeps:
packages_data[pkg]['upgradeable_version'] = ver
break break
upgradeable_pkgs = select_pkgs(packages_data, 'upgradeable_version') new_packages_data = insert_availables(new_packages_data)
new_packages_data = insert_news(new_packages_data)
packages_data = merge_two_dicts(packages_data, new_packages_data)
check_new_extras(packages_data)
while True:
to_delete_hards = del_hards(packages_data)
packages_data = incompatible(packages_data, to_delete_hards)
to_delete_no_news = del_no_news(packages_data)
packages_data = incompatible(packages_data, to_delete_no_news)
to_delete_one_ver = del_one_ver(packages_data)
packages_data = incompatible(packages_data, to_delete_one_ver)
phase_one_packages = phase_one(packages_data)
for package, data in phase_one_packages.iteritems():
if data['compatible']:
packages_data[package]['upgradeable_version'] = data['compatible']
if data['incompatible']:
for version in data['incompatible']:
if data['compatible'] and version not in data['compatible']:
packages_data = incompatible(packages_data, [(package, version)])
elif not data['compatible']:
packages_data = incompatible(packages_data, [(package, version)])
cross_pkgs = cross_packages(packages_data)
for package, data in cross_pkgs.iteritems():
if data['compatible']:
packages_data[package]['upgradeable_version'] = data['compatible']
if data['incompatible']:
for version in data['incompatible']:
if data['compatible'] and version not in data['compatible']:
packages_data = incompatible(packages_data, (package, version))
i_branch = ibranch(packages_data, fix=True)
for package, data in i_branch.iteritems():
if data['compatible']:
packages_data[package]['upgradeable_version'] = data['compatible']
if not to_delete_hards and not to_delete_no_news and not to_delete_one_ver and not phase_one_packages and not cross_pkgs and not i_branch:
break
i_branch = ibranch(packages_data)
for package, data in i_branch.iteritems():
if data['compatible']:
packages_data[package]['upgradeable_version'] = data['compatible']
check_co_branches(packages_data)
check_extras(packages_data)
if arguments.list: if arguments.list:
print_list(upgradeable_pkgs) sys.exit(print_list(packages_data))
if arguments.show: if arguments.show is not None:
for pkg in arguments.show: if arguments.show: pkgs = arguments.show
else: pkgs = packages_data
for pkg in pkgs:
pprint.pprint({pkg: packages_data[pkg]}) pprint.pprint({pkg: packages_data[pkg]})
sys.exit(0) sys.exit(0)
if arguments.upgrade: if arguments.upgrade:
if 'pip' in upgradeable_pkgs.keys(): upkgs = select_upkgs(data, upgradeable_version)
upgrade_package('pip', upgradeable_pkgs['pip']['upgradeable_version']) to_upgrade =[]
del upgradeable_pkgs['pip'] for pkg in sorted(upkgs):
for pkg, pkg_data in sorted(upgradeable_pkgs.iteritems(), key=lambda x: x[0].lower()): to_upgrade.append((pkg, packages_data[pkg]['upgradeable_version']))
upgrade_package(pkg, pkg_data['upgradeable_version']) upgrade_package(to_upgrade)
print "Done." print "Done."
if __name__ == "__main__": if __name__ == "__main__":
......
...@@ -28,8 +28,8 @@ setup( ...@@ -28,8 +28,8 @@ setup(
setup_requires=['mustache', 'pystache', 'setuptools-git-version', 'setuptools-markdown'], setup_requires=['mustache', 'pystache', 'setuptools-git-version', 'setuptools-markdown'],
install_requires=[ install_requires=[
'packaging', 'packaging',
'pipdeptree',
'tabulate', 'tabulate',
'wheel',
], ],
entry_points={ entry_points={
'console_scripts': [ 'console_scripts': [
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please to comment