Skip to content
Snippets Groups Projects

requires.txt is not mandatory for metadata generator

Merged Marek Chrastina requested to merge extras into master
Compare and
2 files
+ 904
261
Compare changes
  • Side-by-side
  • Inline
Files
2
+ 903
260
@@ -2,22 +2,33 @@
@@ -2,22 +2,33 @@
pipdeps
pipdeps
"""
"""
import argparse
import argparse
 
import collections
 
import itertools
import json
import json
import distutils.version
import os
import os
 
import platform
import pprint
import pprint
import re
import re
import subprocess
import subprocess
import sys
import sys
import urllib2
import tarfile
import tarfile
import tempfile
import tempfile
 
import urllib2
import zipfile
import zipfile
import wheel.metadata
import tabulate
import tabulate
import packaging.specifiers
import packaging.specifiers
import packaging.version
import packaging.version
 
import pip._internal.utils.misc
 
import wheel.metadata
 
 
# https://www.python.org/dev/peps/pep-0508/#environment-markers
 
PY_VER = ".".join(map(str, sys.version_info[:2]))
 
SYS_PLAT = sys.platform
 
PLAT_PY_IMPL = platform.python_implementation()
 
 
SBoarder = collections.namedtuple("SBoarder", ["boarders", "extrem", "extrem_op"])
 
def arg_parse():
def arg_parse():
"""
"""
@@ -35,117 +46,306 @@ def arg_parse():
@@ -35,117 +46,306 @@ def arg_parse():
action='store_true',
action='store_true',
help="upgrade upgradeable packages")
help="upgrade upgradeable packages")
group.add_argument('-s', '--show',
group.add_argument('-s', '--show',
nargs='+',
nargs='*',
help="show detailed info about upgradeable packages")
help="show detailed info about upgradeable packages")
return parser.parse_args()
return parser.parse_args()
def get_pyver():
def upgrade_package(data):
"""
"""
return running python version
pip install --upgrade "<package>==<versions>"
"""
"""
return ".".join(map(str, sys.version_info[:3]))
to_upgrade = []
 
for package, version in data:
 
to_upgrade.append("%s==%s" % (package, version))
 
subprocess.check_call(
 
["pip", "install", "--upgrade", " ".join(to_upgrade)],
 
stderr=subprocess.STDOUT
 
)
def is_strict_version(version):
def get_json(url):
"""
"""
Return true if version is strict, otherwise return false
Return url json
"""
"""
try:
return json.load(urllib2.urlopen(urllib2.Request(url)))
distutils.version.StrictVersion(version)
except ValueError:
def file_download(url):
return False
"""
return True
Download file from url as temporary file
 
It returns file object
 
"""
 
tmp_file = tempfile.NamedTemporaryFile(delete=False)
 
rfile = urllib2.urlopen(url)
 
with tmp_file as output:
 
output.write(rfile.read())
 
return tmp_file
 
 
def merge_two_dicts(in_x, in_y):
 
"""
 
Return merge of two dictionaries
 
"""
 
out = in_x.copy()
 
out.update(in_y)
 
return out
def version_conform_specifiers(version, specifiers):
def is_version(version):
"""
"""
check if version conforms specifiers
Return true if version satisfy regex, otherwise return false
 
"""
 
if re.compile(r'^(\d+) \. (\d+) (\. (\d+))? (\. (\d+))?$', re.VERBOSE).search(version) or \
 
re.compile(r'^(\d+) \. (\d+) (\. (\d+))? (rc(\d+))?$', re.VERBOSE).search(version):
 
return True
 
return False
 
 
def is_in_specifiers(version, specifiers):
 
"""
 
Return true if version satisfy specifiers, otherwise return false
"""
"""
if not specifiers:
if not specifiers:
return True
return True
elif version is None:
elif version is None:
return True
return True
else:
else:
ver = packaging.version.Version(version)
# https://github.com/pypa/packaging/pull/92
spec = packaging.specifiers.SpecifierSet(",".join(specifiers))
ver = packaging.version.LegacyVersion(version)
if spec.contains(ver):
specifiers = [
return True
packaging.specifiers.LegacySpecifier(s.strip()) for s in specifiers if s.strip()]
 
return all(s.contains(ver) for s in specifiers)
 
 
def is_in_conditions(condition):
 
"""
 
Return true if condition satisfy sys_platform and python_version and
 
platform_python_implementation, otherwise return false
 
"""
 
if not condition:
 
return True
 
# pylint: disable=eval-used
 
return eval(
 
condition.replace("sys_platform", '"%s"' % SYS_PLAT) \
 
.replace("python_version", '"%s"' % PY_VER) \
 
.replace("platform_python_implementation", '"%s"' % PLAT_PY_IMPL))
 
 
def is_in_extra(extra, req_extra):
 
"""
 
Return true if extra satisfy, otherwise return false
 
"""
 
if extra is None or extra in req_extra:
 
return True
return False
return False
def upgrade_package(package, versions):
# pylint: disable=too-many-branches
 
def specifier_boarders(specifiers):
"""
"""
pip install --upgrade "<package><versions>"
Return specifier boarders, equals and notequals
"""
"""
subprocess.check_call(
left = SBoarder([s for s in specifiers if s.operator in ['>', '>=']], None, None)
["pip", "install", "--upgrade", "%s==%s" % (package, "".join(versions))],
right = SBoarder([s for s in specifiers if s.operator in ['<', '<=']], None, None)
stderr=subprocess.STDOUT
if left.boarders:
)
left = left._replace(extrem=sorted([s.version for s in left.boarders],
 
key=packaging.specifiers.LegacyVersion,
 
reverse=True)[0])
 
left = left._replace(extrem_op=[s.operator for s in left.boarders \
 
if s.version == left.extrem])
 
if '>' in left.extrem_op:
 
left = left._replace(extrem_op='>')
 
else:
 
left = left._replace(extrem_op='>=')
 
if right.boarders:
 
right = right._replace(extrem=sorted([s.version for s in right.boarders],
 
key=packaging.specifiers.LegacyVersion)[0])
 
right = right._replace(extrem_op=[s.operator for s in right.boarders \
 
if s.version == right.extrem])
 
if '<' in right.extrem_op:
 
right = right._replace(extrem_op='<')
 
else:
 
right = right._replace(extrem_op='<=')
 
if left.boarders and right.boarders:
 
if packaging.version.LegacyVersion(left.extrem) > \
 
packaging.version.LegacyVersion(right.extrem):
 
left, right = None, None
 
elif packaging.version.LegacyVersion(left.extrem) == \
 
packaging.version.LegacyVersion(right.extrem):
 
if left.extrem_op in ['>='] and right.extrem_op in ['<=']:
 
left = left._replace(extrem_op='==')
 
right = right._replace(boarders=None)
 
else:
 
left, right = None, None
 
equals = [s for s in specifiers if s.operator in ['==']]
 
if equals:
 
cmp_v = list(set([s.version for s in equals]))[0]
 
if all([packaging.version.LegacyVersion(cmp_v) == packaging.version.LegacyVersion(item) \
 
for item in list(set([s.version for s in equals]))]):
 
equals = cmp_v
 
else:
 
equals = None
 
notequals = [s for s in specifiers if s.operator in ['!=']]
 
notequals = list(set([s.version for s in notequals]))
 
return left, right, equals, notequals
def get_pip_list():
def specifiers_intersection(specifiers):
"""
"""
pip list
Return intersection of specifiers, otherwise return None
"""
"""
outdated_packages = subprocess.check_output(["pip", "list"])
if not specifiers:
return [line.split()[0] for line in outdated_packages.strip().split("\n")[2:]]
return []
 
specifiers = [packaging.specifiers.LegacySpecifier(s.strip()) for s in specifiers if s.strip()]
 
left, right, equals, notequals = specifier_boarders(specifiers)
 
if (left is None and right is None) or equals is None:
 
return None
 
boarders = []
 
for item in [left, right]:
 
if item.boarders:
 
boarders.append("%s%s" % (item.extrem_op, item.extrem))
 
if boarders and notequals:
 
for item in notequals:
 
if is_in_specifiers(item, boarders):
 
boarders.append("!=%s" % item)
 
elif not boarders and notequals:
 
for item in notequals:
 
boarders.append("!=%s" % item)
 
if boarders and equals:
 
if not is_in_specifiers(equals, boarders):
 
return None
 
boarders = ["==%s" % equals]
 
elif not boarders and equals:
 
boarders = ["==%s" % equals]
 
return boarders
def file_download(url):
def select_upkgs(data, rkey):
"""
"""
Download file from url as temporary file
Return data packages having requested key
It returns file object
"""
"""
tmp_file = tempfile.NamedTemporaryFile(delete=False)
result = []
rfile = urllib2.urlopen(url)
for pkg, pkg_data in data.iteritems():
with tmp_file as output:
if rkey in pkg_data.keys():
output.write(rfile.read())
result.append(pkg)
return tmp_file
return result
def get_jsonpipdeptree():
def print_list(data):
"""
"""
pipdeptree --json-tree
Print upgradeable versions
"""
"""
pipdeptree = subprocess.check_output(
upkgs = select_upkgs(data, 'upgradeable_version')
["pipdeptree", "--json-tree"],
if not upkgs:
stderr=subprocess.STDOUT
print "There is nothing to upgrade."
 
return 0
 
tab_data = []
 
for pkg in sorted(upkgs):
 
tab_data.append([pkg, data[pkg]['installed_version'], data[pkg]['upgradeable_version']])
 
print tabulate.tabulate(
 
tab_data,
 
['package', 'installed_version', 'upgradeable_version']
)
)
return json.loads(pipdeptree.strip())
return 1
def get_json(url):
def get_pkg_data():
"""
"""
Return url json
Return package data
"""
"""
return json.load(urllib2.urlopen(urllib2.Request(url)))
packages_data = {}
 
# pylint: disable=protected-access
 
for pkg in pip._internal.utils.misc.get_installed_distributions():
 
pkg_name, pkg_ver, _pkg_extra = pkginfo(str(pkg))
 
rev = {'installed_version': pkg_ver,
 
'requires': [pkginfo(str(dep), repair=True) for dep in pkg.requires()]}
 
packages_data[pkg_name] = rev
 
packages_data = insert_extras(packages_data)
 
packages_data = insert_availables(packages_data)
 
packages_data = insert_news(packages_data)
 
 
while True:
 
new_packages_data = new_packages(packages_data)
 
if not new_packages_data:
 
break
 
new_packages_data = insert_availables(new_packages_data)
 
new_packages_data = insert_news(new_packages_data)
 
packages_data = merge_two_dicts(packages_data, new_packages_data)
 
check_new_extras(packages_data)
 
return packages_data
def json_search(jsonpipdeptree, package, key):
def pkginfo(data, req_extra=None, repair=False):
"""
"""
find package dependencies in json tree
Return parsed pkginfo
"""
"""
if isinstance(jsonpipdeptree, dict):
extra_match = re.compile(
keys = jsonpipdeptree.keys()
r"""^(?P<package>.*?)(;\s*(?P<condition>.*?)(extra == '(?P<extra>.*?)')?)$""").search(data)
if 'package_name' in keys and key in keys:
if extra_match:
if re.search(r'^%s$' % package, jsonpipdeptree['package_name'], re.IGNORECASE):
groupdict = extra_match.groupdict()
yield jsonpipdeptree[key]
condition = groupdict['condition']
for child_val in json_search(jsonpipdeptree['dependencies'], package, key):
extra = groupdict['extra']
yield child_val
package = groupdict['package']
elif isinstance(jsonpipdeptree, list):
if condition.endswith(' and '):
for item in jsonpipdeptree:
condition = condition[:-5]
for item_val in json_search(item, package, key):
mysearch = re.compile(r'(extra == .*)').search(condition)
yield item_val
if mysearch:
 
extra = mysearch.group(1)
 
condition = condition.replace(extra, '')
 
if not condition:
 
condition = None
 
extra = re.compile(r'extra == (.*)').search(extra).group(1).replace('"', "")
 
else:
 
condition, extra = None, None
 
package = data
 
if not is_in_conditions(condition):
 
return None
 
pkg_name, pkg_extra, pkg_ver = re.compile(
 
r'([\w\.\-]*)(\[\w*\])?(.*)').search(package).groups()
 
if pkg_extra:
 
pkg_extra = pkg_extra.replace("[", "").replace("]", "").lower()
 
pkg_ver = pkg_ver.replace("(", "").replace(")", "").strip()
 
if not pkg_ver:
 
pkg_ver = []
 
else:
 
if repair:
 
try:
 
pkg_ver = re.compile(r'^(\d.*)$').search(pkg_ver).group(1)
 
except AttributeError:
 
pass
 
pkg_ver = pkg_ver.split(",")
 
if not is_in_extra(extra, req_extra):
 
return None
 
return (pkg_name.lower(), pkg_ver, pkg_extra)
def get_highest_version(package, data):
def insert_extras(data):
"""
"""
Return upgradeable version if possible, otherwise return installed version
Insert extras
"""
"""
try:
for key in data.keys():
version = data[package]['upgradeable_version']
extra = []
except KeyError:
for pkg, pkg_data in data.iteritems():
version = data[package]['installed_version']
for dep in pkg_data['requires']:
return version
if dep[0] == key:
 
if dep[2]:
 
extra.append(dep[2])
 
data[key]['extras'] = extra
 
if extra:
 
# pylint: disable=protected-access
 
for pkg in pip._internal.utils.misc.get_installed_distributions():
 
pkg_name, _pkg_ver, _pkg_extra = pkginfo(str(pkg))
 
if pkg_name == key:
 
data[key]['requires'] += [pkginfo(str(dep), repair=True, req_extra=extra) \
 
for dep in pkg.requires(extras=extra)]
 
return data
def find_available_vers(package_name, pyver):
def insert_availables(data):
 
"""
 
Insert available versions
"""
"""
Return descending list of available strict version
for pkg, pkg_data in data.iteritems():
 
if 'available_version' in pkg_data.keys():
 
continue
 
try:
 
data[pkg]['available_version'] = get_available_vers(pkg)
 
except urllib2.HTTPError:
 
data[pkg]['available_version'] = []
 
return data
 
 
def get_available_vers(package):
 
"""
 
Return descending list of public available strict version
"""
"""
versions = []
versions = []
try:
try:
data = get_json("https://pypi.python.org/pypi/%s/json" % (package_name,))
data = get_json("https://pypi.python.org/pypi/%s/json" % (package))
except urllib2.HTTPError, err:
except urllib2.HTTPError, err:
print "%s %s" % (err, err.url)
print "%s %s" % (err, err.url)
raise urllib2.HTTPError(err.url, err.code, None, err.hdrs, err.fp)
raise urllib2.HTTPError(err.url, err.code, None, err.hdrs, err.fp)
@@ -154,41 +354,101 @@ def find_available_vers(package_name, pyver):
@@ -154,41 +354,101 @@ def find_available_vers(package_name, pyver):
requires_python = []
requires_python = []
for item in data["releases"][release]:
for item in data["releases"][release]:
if item['requires_python'] is not None:
if item['requires_python'] is not None:
requires_python.append(item['requires_python'])
for reqpyt in item['requires_python'].split(","):
if is_strict_version(release) and version_conform_specifiers(pyver, requires_python):
requires_python.append(reqpyt.strip())
 
if requires_python:
 
requires_python = list(set(requires_python))
 
if is_version(release) and is_in_specifiers(PY_VER, requires_python):
versions.append(release)
versions.append(release)
return sorted(versions, key=distutils.version.StrictVersion, reverse=True)
return sorted(versions, key=packaging.specifiers.LegacyVersion, reverse=True)
def get_newer_vers(available_version, required_version, installed_version=None):
def select_news(available_version, installed_version=None):
"""
"""
Return list of newer versions which conforms pipdeptree dependencies, otherwise return none.
Select versions newer than installed version, if it is known
"""
"""
if required_version is None:
if installed_version is None:
result = [aver for aver in list(available_version)]
return sorted(available_version, key=packaging.specifiers.LegacyVersion, reverse=True)
return sorted(result, key=distutils.version.StrictVersion, reverse=True)
iver = packaging.version.Version(installed_version)
if [rver for rver in required_version if re.search(r'(^==.*|^\d.*)', rver) is not None]:
return sorted([aver for aver in available_version if packaging.version.Version(aver) > iver],
return None
key=packaging.specifiers.LegacyVersion, reverse=True)
result = []
av_version = list(available_version)
def insert_news(data):
while True:
"""
 
Insert new versions
 
"""
 
for pkg, pkg_data in data.iteritems():
 
if 'new_version' in pkg_data.keys():
 
continue
try:
try:
version = av_version.pop(0)
new_version = select_news(pkg_data['available_version'], pkg_data['installed_version'])
except IndexError:
except KeyError:
break
new_version = select_news(pkg_data['available_version'])
aver = packaging.version.Version(version)
if new_version:
rver = packaging.specifiers.SpecifierSet(",".join(required_version))
res = {}
if rver.contains(aver):
for version in new_version:
if installed_version is not None:
content = parse_metadata(get_metadata(pkg, version), pkg_data['extras'])
iver = packaging.version.Version(installed_version)
res[version] = content
if aver == iver:
if res:
break
pkg_data['new_version'] = res
elif aver > iver:
return data
result.append(version)
else:
def new_packages(data):
result.append(version)
"""
if result:
Return new packages as dictionary
return sorted(result, key=distutils.version.StrictVersion, reverse=True)
"""
return None
out = {}
 
arr = []
 
pkg_list = data.keys()
 
for pkg, pkg_data in data.iteritems():
 
try:
 
for _ver, ver_data in pkg_data['new_version'].iteritems():
 
for dep in ver_data:
 
if dep[0] not in pkg_list:
 
arr.append(dep)
 
except KeyError:
 
pass
 
for item in list(set([i[0] for i in arr])):
 
extras = []
 
for pkg, _req, extra in arr:
 
if pkg == item and extra is not None:
 
extras.append(extra)
 
out[item] = {'extras': extras}
 
return out
 
 
def check_new_extras(data):
 
"""
 
Check if there are new extras
 
"""
 
extra_pkgs = []
 
pkg_list = data.keys()
 
for pkg, pkg_data in data.iteritems():
 
try:
 
for _ver, ver_data in pkg_data['new_version'].iteritems():
 
for dep in ver_data:
 
if dep[0] in pkg_list and dep[2] is not None:
 
extra_pkgs.append(dep)
 
except KeyError:
 
pass
 
for pkg, _req, extra in extra_pkgs:
 
if extra not in data[pkg]['extras']:
 
raise Exception('There are new extras!')
 
 
def check_extras(data):
 
"""
 
Check if there are extras in upgradeable packages
 
"""
 
for package in select_upkgs(data, 'upgradeable_version'):
 
if data[package]['extras']:
 
raise Exception('There are extras in upgradeable packages!')
 
 
def check_co_branches(data):
 
"""
 
Check if there branches with intersection of packages
 
"""
 
branches = get_branches(data)
 
co_branches = get_co_branches(branches)
 
if co_branches:
 
raise Exception('There are branches with intersection of packages!')
def write_metadata(tmp_file):
def write_metadata(tmp_file):
"""
"""
@@ -223,7 +483,7 @@ def get_metadata(package, version):
@@ -223,7 +483,7 @@ def get_metadata(package, version):
if item['packagetype'] == 'sdist':
if item['packagetype'] == 'sdist':
tmp_file = file_download(item['url'])
tmp_file = file_download(item['url'])
write_metadata(tmp_file)
write_metadata(tmp_file)
if os.path.isfile('/tmp/requires.txt') and os.path.isfile('/tmp/PKG-INFO'):
if os.path.isfile('/tmp/PKG-INFO'):
metadata = [
metadata = [
line.decode('utf-8') \
line.decode('utf-8') \
for line in wheel.metadata.pkginfo_to_metadata('/tmp', '/tmp/PKG-INFO') \
for line in wheel.metadata.pkginfo_to_metadata('/tmp', '/tmp/PKG-INFO') \
@@ -249,7 +509,7 @@ def get_metadata(package, version):
@@ -249,7 +509,7 @@ def get_metadata(package, version):
break
break
return metadata
return metadata
def parse_metadata(metadata, pyver):
def parse_metadata(metadata, extra):
"""
"""
Return dependencies parsed from metadata
Return dependencies parsed from metadata
"""
"""
@@ -257,157 +517,546 @@ def parse_metadata(metadata, pyver):
@@ -257,157 +517,546 @@ def parse_metadata(metadata, pyver):
if 'Metadata-Version' in line.decode('utf-8'):
if 'Metadata-Version' in line.decode('utf-8'):
metadata_version = line.replace('Metadata-Version:', '').strip()
metadata_version = line.replace('Metadata-Version:', '').strip()
break
break
if packaging.version.Version(metadata_version) >= packaging.version.Version('2.0'):
arr = []
out = []
if metadata_version and \
for dep in [
packaging.version.Version(metadata_version) >= packaging.version.Version('2.0'):
line.replace('Requires-Dist:', '').strip() \
arr = []
for line in metadata if re.search(r'^Requires-Dist:', line)]:
lines = [line.replace('Requires-Dist:', '').strip() \
if ';' in dep:
for line in metadata if re.search(r'^Requires-Dist:', line)]
dep = dep.split(';')
for line in lines:
if 'python_version' in dep[1]:
data = pkginfo(str(line), req_extra=extra, repair=True)
if packaging.specifiers.SpecifierSet(
if data:
dep[1].replace('python_version', '').replace('"', '').strip()) \
arr.append(pkginfo(str(line), req_extra=extra, repair=True))
.contains(packaging.version.Version(pyver)):
return arr
dep = dep[0]
else:
continue
else:
continue
dep = dep.split()
try:
pkg = re.search(r'(.*)(\[.*\])', dep[0]).group(1)
except AttributeError:
pkg = dep[0]
try:
pkg = re.search(r'(^[\w\.\-]*)(.*)', dep[0]).group(1)
dep.append(re.search(r'(^[\w\.\-]*)(.*)', dep[0]).group(2))
except AttributeError:
pkg = dep[0]
try:
ver = dep[1].replace('(', '').replace(')', '').replace(';', '')
except IndexError:
ver = None
out.append((pkg, ver))
return out
def find_new_dependencies(package, version, package_list, pyver):
def pvector(package, data):
"""
"""
Return package dependencies parsed from pypi json
Return vector of package versions
"""
"""
content = parse_metadata(get_metadata(package, version), pyver)
out = []
for pkg, ver in content:
if 'new_version' not in data[package].keys():
try:
out.append((package, data[package]['installed_version']))
if pkg in package_list:
else:
yield (pkg, ver)
if 'upgradeable_version' in data[package].keys():
 
out.append((package, data[package]['upgradeable_version']))
 
else:
 
if 'installed_version' in data[package].keys():
 
out.append((package, data[package]['installed_version']))
 
for ver in sorted(data[package]['new_version'].keys(),
 
key=packaging.specifiers.LegacyVersion):
 
if 'upgradeable_version' in data[package].keys():
 
if packaging.specifiers.LegacyVersion(ver) > \
 
packaging.specifiers.LegacyVersion(data[package]['upgradeable_version']):
 
out.append((package, ver))
else:
else:
try:
out.append((package, ver))
for child in find_new_dependencies(
return out
pkg,
get_newer_vers(find_available_vers(pkg, pyver), ver, None)[0],
package_list,
pyver
):
yield child
except TypeError:
pass
except AttributeError:
pass
def depless_vers(res):
def single_multi(data):
"""
"""
If there is no dependencies or versionless dependencies, return the upgradeable version,
Return list of packages with new versions and list of packages without new versions
otherwise return None
"""
"""
depless = []
pkg_list, single, multi = [], [], []
for ver, deps in res.iteritems():
for pkg, pkg_data in data.iteritems():
if not deps:
if 'requires' in pkg_data.keys():
depless.append(ver)
pkg_list.append(pkg)
else:
for pkg in pkg_list:
if not [dep for dep in deps if dep[1] is not None]:
vec = pvector(pkg, data)
depless.append(ver)
if len(vec) == 1:
if depless:
single.append(*vec)
depless = sorted(depless, key=distutils.version.StrictVersion, reverse=True)[0]
elif len(vec) > 1:
 
multi.append(vec)
 
single = list(set([item[0] for item in single]))
 
multi = list(set([item[0] for pkg_data in multi for item in pkg_data]))
 
return single, multi
 
 
def move_incompatible(data, to_delete):
 
"""
 
Move new version to incompatible
 
"""
 
if not to_delete:
 
return data
 
for package, version in to_delete:
 
if 'incompatible_version' not in data[package].keys():
 
data[package]['incompatible_version'] = {}
 
data[package]['incompatible_version'][version] = data[package]['new_version'][version]
 
del data[package]['new_version'][version]
 
if not data[package]['new_version']:
 
del data[package]['new_version']
 
return data
 
 
def get_compatible(versions, reqs, inverse=False):
 
"""
 
Return compatible versions
 
"""
 
specifiers = specifiers_intersection([i for i in itertools.chain(*[req[1] for req in reqs])])
 
if inverse:
 
v_versions = [version for version in versions if not is_in_specifiers(version, specifiers)]
else:
else:
depless = None
v_versions = [version for version in versions if is_in_specifiers(version, specifiers)]
return depless
return sorted(v_versions, key=packaging.specifiers.LegacyVersion, reverse=True)
def collect_packages(package_list, jsonpipdeptree, pyver=None):
"""
Collect data about packages as dictionary
"""
result = {}
for package in package_list:
installed_version = "".join(list(set(
[_ for _ in json_search(jsonpipdeptree, package, 'installed_version')])))
required_version = []
for dep in list(set(
[_ for _ in json_search(jsonpipdeptree, package, 'required_version')]
)):
if 'Any' not in dep:
required_version.append(dep)
try:
available_version = find_available_vers(package, pyver)
except urllib2.HTTPError:
available_version = [installed_version]
newer_version = get_newer_vers(available_version, required_version, installed_version)
rev = {'installed_version': installed_version,
'required_version': required_version,
'available_version': available_version}
if newer_version is not None:
res = {}
for version in newer_version:
res[version] = [
_ for _ in find_new_dependencies(package, version, package_list, pyver)]
rev['newer_version'] = res
depless = depless_vers(res)
def del_hards(data):
if depless:
"""
rev['upgradeable_version'] = depless
Return list of packages and their versions that does not satisfy
 
requirements of packages without new version
 
"""
 
package_no_news, package_with_news = single_multi(data)
 
deps = get_simple_reqs(data, None, package_no_news)
 
hard_requirements = {}
 
for item in list(set([pkg[0] for pkg in deps])):
 
reqs, extras = [], []
 
for pkg, req, extra in deps:
 
if pkg == item:
 
reqs += req
 
if extra:
 
extras += extra
 
hard_requirements[item] = {'installed_version': data[item]['installed_version'],
 
'requirements': list(set(reqs)),
 
'extras': list(set(extras))}
 
to_delete = []
 
for pkg in package_with_news+not_installed(data):
 
for ver, ver_data in data[pkg]['new_version'].iteritems():
 
for dep, req, extra in ver_data:
 
if dep in hard_requirements.keys():
 
if specifiers_intersection(req+hard_requirements[dep]['requirements']) is None:
 
to_delete.append((pkg, ver))
 
return to_delete
result[package] = rev
def del_no_news(data):
return result
"""
 
Return list of packages and their versions that does not satisfy packages without new version
 
"""
 
to_delete = []
 
package_no_news, package_with_news = single_multi(data)
 
for package in package_with_news+not_installed(data):
 
reqs = get_simple_reqs(data, None, package_no_news)
 
versions = [pkg[1] for pkg in pvector(package, data)]
 
incver = get_compatible(versions, reqs, inverse=True)
 
for version in versions:
 
if version in incver:
 
to_delete.append((package, version))
 
return to_delete
def check_deps(deps, packages_data):
def del_one_ver(data):
"""
"""
Return true, if all package dependencies conforms
If all packages requirements lead to one specific version, return list of that packages
"""
"""
ndeps = []
to_delete = []
for item in deps:
_package_no_news, package_with_news = single_multi(data)
if item[1] is not None:
for package in package_with_news+not_installed(data):
ndeps.append(
reqs = get_reqs(data, package=package)
version_conform_specifiers(
specifiers = specifiers_intersection(
get_highest_version(item[0], packages_data),
[i for i in itertools.chain(*[req[1] for req in reqs])])
packages_data[item[0]]['required_version']+[item[1]]
if specifiers:
)
if len(specifiers) == 1 and '==' in specifiers[0]:
)
versions = [pkg[1] for pkg in pvector(package, data)]
return all(ndeps)
versions.remove(specifiers[0].replace('==', ''))
 
for version in versions:
 
to_delete.append((package, version))
 
return to_delete
def select_pkgs(packages_data, rkey):
def del_notinstalled(data):
"""
"""
Return data packages having requested key
If no package requires notinstalled packages, return list of that packages
"""
"""
result = {}
to_delete = []
for pkg, pkg_data in packages_data.iteritems():
for package in not_installed(data):
if rkey in pkg_data.keys():
reqs = get_reqs(data, package=package)
result[pkg] = pkg_data
if not reqs:
return result
for pkg, version in pvector(package, data):
 
to_delete.append((pkg, version))
 
return to_delete
 
 
def get_deps(data, package):
 
"""
 
Return package deep requirements
 
"""
 
try:
 
content = data[package]
 
except KeyError:
 
content = []
 
for pkg in content:
 
yield pkg
 
for child in get_deps(data, pkg):
 
yield child
 
 
def not_installed(data):
 
"""
 
Return not installed packages
 
"""
 
not_i = []
 
for pkg, pkg_data in data.iteritems():
 
if 'requires' not in pkg_data.keys() and 'new_version' in pkg_data.keys():
 
not_i.append(pkg)
 
return not_i
 
 
def get_no_news_req(data):
 
"""
 
Return requirements of packages without new versions
 
"""
 
reqs = {}
 
package_no_news, _package_with_news = single_multi(data)
 
for package in package_no_news:
 
version = pvector(package, data)[0][1]
 
reqs = save_version(reqs, data, package, version)
 
return reqs
def print_list(upgradeable_pkgs):
def save_version(r_data, p_data, pkg, ver):
"""
"""
Provides list option
Save the highest package version
"""
"""
if upgradeable_pkgs:
if 'installed_version' in p_data[pkg].keys() and p_data[pkg]['installed_version'] == ver:
data = []
r_data[pkg] = p_data[pkg]['requires']
for pkg, pkg_data in sorted(upgradeable_pkgs.iteritems(), key=lambda x: x[0].lower()):
data.append([pkg, pkg_data['installed_version'], pkg_data['upgradeable_version']])
print tabulate.tabulate(
data,
['package', 'installed_version', 'upgradeable_version']
)
sys.exit(1)
else:
else:
print "There is nothing to upgrade."
r_data[pkg] = p_data[pkg]['new_version'][ver]
sys.exit(0)
return r_data
 
 
def add_reqs(reqs, data, pkg=None, addpkg=None):
 
"""
 
Append requirements
 
"""
 
for dep, req, extra in data:
 
if pkg and dep != pkg:
 
continue
 
if addpkg:
 
reqs.append(addpkg)
 
else:
 
reqs.append((dep, req, extra))
 
 
def save_ic(out, package, incompatible=None, compatible=None):
 
"""
 
Save compatible/incompatible version
 
"""
 
if package not in out.keys():
 
out[package] = {'incompatible': [], 'compatible': None}
 
if incompatible:
 
out[package]['incompatible'].append(incompatible)
 
if compatible:
 
out[package]['compatible'] = compatible
 
return out
 
 
def get_reqs(data, package=None, data2=None, addpkg=False):
 
"""
 
Get requirements
 
"""
 
if package:
 
reqs = []
 
if data2:
 
for pkg in data2:
 
if 'upgradeable_version' in data[pkg].keys():
 
uver = data[pkg]['upgradeable_version']
 
add_reqs(reqs, data[pkg]['new_version'][uver], pkg=package)
 
else:
 
add_reqs(reqs, data[pkg]['requires'], pkg=package)
 
else:
 
for pkg, pkg_data in data.iteritems():
 
uver = None
 
if pkg != package:
 
if 'upgradeable_version' in data[pkg].keys():
 
uver = data[pkg]['upgradeable_version']
 
if addpkg:
 
add_reqs(reqs, data[pkg]['new_version'][uver], pkg=package, addpkg=pkg)
 
else:
 
add_reqs(reqs, data[pkg]['new_version'][uver], pkg=package)
 
elif 'requires' in data[pkg].keys():
 
if addpkg:
 
add_reqs(reqs, data[pkg]['requires'], pkg=package, addpkg=pkg)
 
else:
 
add_reqs(reqs, data[pkg]['requires'], pkg=package)
 
if 'new_version' in data[pkg].keys():
 
for ver, ver_data in data[pkg]['new_version'].iteritems():
 
if uver:
 
if packaging.specifiers.LegacyVersion(ver) <= \
 
packaging.specifiers.LegacyVersion(uver):
 
continue
 
if addpkg:
 
add_reqs(reqs, ver_data, pkg=package, addpkg=pkg)
 
else:
 
add_reqs(reqs, ver_data, pkg=package)
 
return reqs
 
else:
 
out = {}
 
for pkg, pkg_data in data.iteritems():
 
reqs = []
 
uver = None
 
if 'upgradeable_version' in pkg_data.keys():
 
uver = pkg_data['upgradeable_version']
 
add_reqs(reqs, pkg_data['new_version'][uver])
 
elif 'requires' in pkg_data.keys():
 
add_reqs(reqs, pkg_data['requires'])
 
if 'new_version' in pkg_data.keys():
 
for ver, ver_data in pkg_data['new_version'].iteritems():
 
if uver:
 
if packaging.specifiers.LegacyVersion(ver) <= \
 
packaging.specifiers.LegacyVersion(uver):
 
continue
 
add_reqs(reqs, ver_data)
 
out[pkg] = list(set([req[0] for req in reqs]))
 
return out
 
 
def get_simple_reqs(data, package_with_news, package_no_news):
 
"""
 
Return no_requires, only_no_news_requires or requirements
 
"""
 
if not package_with_news:
 
reqs = []
 
for pkg in package_no_news:
 
if 'requires' in data[pkg].keys():
 
if 'upgradeable_version' in data[pkg].keys():
 
uver = data[pkg]['upgradeable_version']
 
reqs += [req for req in data[pkg]['new_version'][uver] if req[1] or req[2]]
 
else:
 
reqs += [req for req in data[pkg]['requires'] if req[1] or req[2]]
 
return reqs
 
no_requires, only_no_news_requires, = {}, {}
 
for pkg in package_with_news:
 
dep_in_no_news_vers = []
 
dep_vers = []
 
uver = None
 
if 'upgradeable_version' in data[pkg].keys():
 
uver = data[pkg]['upgradeable_version']
 
if 'new_version' in data[pkg].keys():
 
for ver, ver_data in data[pkg]['new_version'].iteritems():
 
if uver:
 
if packaging.specifiers.LegacyVersion(ver) <= \
 
packaging.specifiers.LegacyVersion(uver):
 
continue
 
if ver_data:
 
reqs = [req[0] for req in ver_data]
 
reqs_not_in_no_news = [req for req in reqs if req not in package_no_news]
 
if not reqs_not_in_no_news:
 
dep_in_no_news_vers.append(ver)
 
else:
 
dep_vers.append(ver)
 
if dep_vers:
 
no_requires[pkg] = dep_vers
 
if dep_in_no_news_vers:
 
only_no_news_requires[pkg] = dep_in_no_news_vers
 
return no_requires, only_no_news_requires
 
 
def phase_one(data):
 
"""
 
Partial resolve upgrades
 
"""
 
out, no_requires_deps = {}, {}
 
package_no_news, package_with_news = single_multi(data)
 
no_requires, only_no_news_requires = get_simple_reqs(data, package_with_news, package_no_news)
 
 
for package, version in no_requires.iteritems():
 
reqs = get_reqs(data, package=package, addpkg=True)
 
if reqs:
 
no_requires_deps[package] = list(set(reqs))
 
else:
 
out = save_ic(out, package,
 
compatible=sorted(no_requires[package],
 
key=packaging.specifiers.LegacyVersion,
 
reverse=True)[0])
 
 
for package, dep in no_requires_deps.iteritems():
 
if all([pkg in package_no_news for pkg in dep]):
 
reqs = get_reqs(data, package=package, data2=dep)
 
compatible = get_compatible(no_requires[package], reqs)
 
for version in no_requires[package]:
 
if version not in compatible:
 
out = save_ic(out, package, incompatible=version)
 
if compatible:
 
out = save_ic(out, package, compatible=compatible[0])
 
 
for package, versions in only_no_news_requires.iteritems():
 
reqs = get_reqs(data, package=package, addpkg=True)
 
if all([item in package_no_news for item in list(set(reqs))]):
 
out = save_ic(out, package,
 
compatible=sorted(versions,
 
key=packaging.specifiers.LegacyVersion,
 
reverse=True)[0])
 
return out
 
 
def get_branches(data):
 
"""
 
Return branches
 
"""
 
branches = []
 
package_reqs = {}
 
_package_no_news, package_with_news = single_multi(data)
 
package_with_news = package_with_news+not_installed(data)
 
package_info = get_reqs(data)
 
for package in package_with_news:
 
package_reqs[package] = list(set([i for i in get_deps(package_info, package)]))
 
for package in package_with_news:
 
res = []
 
for pkg, deps in package_reqs.iteritems():
 
if pkg == package:
 
continue
 
if package in deps:
 
res.append(pkg)
 
if not res:
 
branches.append(package)
 
package_info = {}
 
for branch in branches:
 
package_info[branch] = [i for i in package_reqs[branch] if i in package_with_news]
 
return package_info
 
 
def get_co_branches(branches):
 
"""
 
Return corelated branches
 
"""
 
co_branches = []
 
for branch in branches:
 
for pkg, reqs in branches.iteritems():
 
if pkg == branch:
 
continue
 
if len(branches[branch]+reqs) != len(list(set(branches[branch]+reqs))):
 
co_branches.append(branch)
 
return list(set(co_branches))
 
 
def cross_packages(data):
 
"""
 
Return cross packages
 
"""
 
cross_branches = []
 
out, pkg_reqs = {}, {}
 
package_branches = get_branches(data)
 
_package_no_news, package_with_news = single_multi(data)
 
package_with_news = package_with_news+not_installed(data)
 
for package in package_with_news:
 
res = []
 
for pkg, reqs in package_branches.iteritems():
 
if package in reqs:
 
res.append(pkg)
 
if len(res) > 1:
 
cross_branches.append(package)
 
for package in package_with_news:
 
if package not in cross_branches:
 
version = pvector(package, data)[0][1]
 
pkg_reqs = save_version(pkg_reqs, data, package, version)
 
merged_reqs = merge_two_dicts(pkg_reqs, get_no_news_req(data))
 
for package in cross_branches:
 
reqs = []
 
for pkg, pkg_data in merged_reqs.iteritems():
 
add_reqs(reqs, pkg_data, pkg=package)
 
compatible = get_compatible([pkg[1] for pkg in pvector(package, data)], reqs)
 
if compatible:
 
out = save_ic(out, package, compatible=compatible[0])
 
return out
 
 
def get_comb_summary(data, packages, common_reqs):
 
"""
 
Return combination summary
 
"""
 
out = {}
 
for comb in list(itertools.product(*packages)):
 
pkg_reqs = {}
 
for package, version in comb:
 
pkg_reqs = save_version(pkg_reqs, data, package, version)
 
pkg_reqs = merge_two_dicts(common_reqs, pkg_reqs)
 
for package, version in comb:
 
reqs = []
 
for _pkg, pkg_data in pkg_reqs.iteritems():
 
add_reqs(reqs, pkg_data, pkg=package)
 
specifiers = specifiers_intersection(
 
[i for i in itertools.chain(*[req[1] for req in reqs])])
 
out[comb] = is_in_specifiers(version, specifiers)
 
for comb in out:
 
if out[comb]:
 
sumary = 0
 
for package, version in comb:
 
sumary += pvector(package, data).index((package, version))
 
out[comb] = sumary
 
return out
 
 
def u_comb(data, packages, common_reqs):
 
"""
 
Return combination upgrade
 
"""
 
high = 0
 
comb_summary = get_comb_summary(data, packages, common_reqs)
 
for comb, summary in comb_summary.iteritems():
 
if summary > high:
 
high = summary
 
if high == 0:
 
return None
 
reqs = []
 
for comb, summary in comb_summary.iteritems():
 
if summary == high:
 
reqs.append(comb)
 
for pkg, version in reqs[0]:
 
if data[pkg]['installed_version'] != version:
 
return (pkg, version)
 
 
def ibranch(data, fix=False):
 
"""
 
Return upgradeable versions of independent branch
 
"""
 
out = {}
 
no_news_req = get_no_news_req(data)
 
branches = get_branches(data)
 
_package_no_news, package_with_news = single_multi(data)
 
co_branches = get_co_branches(branches)
 
for branch in branches:
 
if branch in co_branches:
 
continue
 
if fix:
 
version = pvector(branch, data)[0][1]
 
pkg_reqs = save_version({}, data, branch, version)
 
common_reqs = merge_two_dicts(pkg_reqs, no_news_req)
 
packages = [pvector(pkg, data)[:2] \
 
for pkg in branches[branch] if pkg in package_with_news]
 
else:
 
common_reqs = no_news_req.copy()
 
packages = [pvector(branch, data)]+[pvector(pkg, data) \
 
for pkg in branches[branch] if pkg in package_with_news]
 
version = u_comb(data, packages, common_reqs)
 
if version:
 
out = save_ic(out, version[0], compatible=version[1])
 
return out
 
 
def p_upgrade(data, pkg, compatible=None, incompatible=None):
 
"""
 
Partial upgrade
 
"""
 
if compatible:
 
data[pkg]['upgradeable_version'] = compatible
 
if incompatible:
 
for version in incompatible:
 
if compatible and version not in compatible:
 
data = move_incompatible(data, [(pkg, version)])
 
elif not compatible:
 
data = move_incompatible(data, [(pkg, version)])
 
return data
 
 
def first_loop(data):
 
"""
 
Upgrade loop
 
"""
 
while True:
 
to_delete_hards = del_hards(data)
 
data = move_incompatible(data, to_delete_hards)
 
to_delete_no_news = del_no_news(data)
 
data = move_incompatible(data, to_delete_no_news)
 
to_delete_one_ver = del_one_ver(data)
 
data = move_incompatible(data, to_delete_one_ver)
 
 
phase_one_packages = phase_one(data)
 
for pkg, pkg_data in phase_one_packages.iteritems():
 
data = p_upgrade(data, pkg, compatible=pkg_data['compatible'],
 
incompatible=pkg_data['incompatible'])
 
 
cross_pkgs = cross_packages(data)
 
for pkg, pkg_data in cross_pkgs.iteritems():
 
data = p_upgrade(data, pkg, compatible=pkg_data['compatible'],
 
incompatible=pkg_data['incompatible'])
 
 
to_delete_noti = del_notinstalled(data)
 
data = move_incompatible(data, to_delete_noti)
 
 
i_branch = ibranch(data, fix=True)
 
for pkg, pkg_data in i_branch.iteritems():
 
data = p_upgrade(data, pkg, compatible=pkg_data['compatible'])
 
if all([not to_delete_hards, not to_delete_no_news, not to_delete_one_ver,
 
not phase_one_packages, not cross_pkgs, not to_delete_noti, not i_branch]):
 
break
 
return data
def main():
def main():
"""
"""
@@ -415,38 +1064,32 @@ def main():
@@ -415,38 +1064,32 @@ def main():
"""
"""
os.environ["PYTHONWARNINGS"] = "ignore:DEPRECATION"
os.environ["PYTHONWARNINGS"] = "ignore:DEPRECATION"
arguments = arg_parse()
arguments = arg_parse()
pyver = get_pyver()
packages_data = get_pkg_data()
pkglist = get_pip_list()
packages_data = first_loop(packages_data)
jsonpipdeptree = get_jsonpipdeptree()
packages_data = collect_packages(pkglist, jsonpipdeptree, pyver=pyver)
i_branch = ibranch(packages_data)
for pkg, pkg_data in sorted(
for package, data in i_branch.iteritems():
select_pkgs(packages_data, 'newer_version').iteritems(), key=lambda x: x[0].lower()
if data['compatible']:
):
packages_data[package]['upgradeable_version'] = data['compatible']
pkg_keys = pkg_data.keys()
if 'newer_version' in pkg_keys and 'upgradeable_version' not in pkg_keys:
check_co_branches(packages_data)
for ver, deps in sorted(
check_extras(packages_data)
pkg_data['newer_version'].iteritems(),
key=lambda x: distutils.version.StrictVersion(x[0]),
reverse=True
):
ndeps = check_deps(deps, packages_data)
if ndeps:
packages_data[pkg]['upgradeable_version'] = ver
break
upgradeable_pkgs = select_pkgs(packages_data, 'upgradeable_version')
if arguments.list:
if arguments.list:
print_list(upgradeable_pkgs)
sys.exit(print_list(packages_data))
if arguments.show:
if arguments.show is not None:
for pkg in arguments.show:
if arguments.show:
 
pkgs = arguments.show
 
else:
 
pkgs = packages_data
 
for pkg in pkgs:
pprint.pprint({pkg: packages_data[pkg]})
pprint.pprint({pkg: packages_data[pkg]})
sys.exit(0)
sys.exit(0)
if arguments.upgrade:
if arguments.upgrade:
if 'pip' in upgradeable_pkgs.keys():
to_upgrade = []
upgrade_package('pip', upgradeable_pkgs['pip']['upgradeable_version'])
for pkg in sorted(select_upkgs(packages_data, 'upgradeable_version')):
del upgradeable_pkgs['pip']
to_upgrade.append((pkg, packages_data[pkg]['upgradeable_version']))
for pkg, pkg_data in sorted(upgradeable_pkgs.iteritems(), key=lambda x: x[0].lower()):
upgrade_package(to_upgrade)
upgrade_package(pkg, pkg_data['upgradeable_version'])
print "Done."
print "Done."
if __name__ == "__main__":
if __name__ == "__main__":
Loading