Newer
Older
# pylint: disable=too-many-lines
import collections
import itertools

Marek Chrastina
committed
import tarfile
import tempfile

Marek Chrastina
committed
import zipfile
import wheel.metadata

Marek Chrastina
committed
import packaging.specifiers
import packaging.version
import pip._internal.utils.misc
# https://www.python.org/dev/peps/pep-0508/#environment-markers
PY_VER = ".".join(map(str, sys.version_info[:2]))
SYS_PLAT = sys.platform
PLAT_PY_IMPL = platform.python_implementation()
SBoarder = collections.namedtuple("SBoarder", ["boarders", "extrem", "extrem_op"])
def arg_parse():
"""
argument parser
"""
parser = argparse.ArgumentParser(
description="Pipdeps shows/upgrades outdated packages with respect to existing \
dependencies."
)
parser.add_argument('-e', '--exclude',
nargs='*',
help="Space-separated list of excluded package (and version). \
Format package==version or package for all versions")
group = parser.add_mutually_exclusive_group(required=True)
group.add_argument('-l', '--list',
action='store_true',
help="show list of upgradeable packages and versions")
group.add_argument('-t', '--table',
action='store_true',
help="show table of upgradeable packages and versions")
group.add_argument('-u', '--upgrade',
action='store_true',

Marek Chrastina
committed
help="upgrade upgradeable packages")
def get_excludes(data):
"""
Parse argument excludes into array of (pkg, ver)
"""
if data is None:
return []
return [pkg.split("==") for pkg in data]
def upgrade_package(data):
pip install --upgrade "<package>==<versions>"
to_upgrade = []
for package, version in data:
to_upgrade.append("%s==%s" % (package, version))
["pip", "install", "--upgrade"] + to_upgrade,
return json.load(urllib2.urlopen(urllib2.Request(url)))

Marek Chrastina
committed
def file_download(url):
"""
Download file from url as temporary file
It returns file object
"""
tmp_file = tempfile.NamedTemporaryFile(delete=False)
rfile = urllib2.urlopen(url)
with tmp_file as output:
output.write(rfile.read())
return tmp_file
def merge_two_dicts(in_x, in_y):
Return merge of two dictionaries
out = in_x.copy()
out.update(in_y)
return out
def is_version(version):

Marek Chrastina
committed
"""
Return true if version satisfy regex, otherwise return false

Marek Chrastina
committed
"""
if re.compile(r'^(\d+) \. (\d+) (\. (\d+))? (\. (\d+))?$', re.VERBOSE).search(version) or \
re.compile(r'^(\d+) \. (\d+) (\. (\d+))? (rc(\d+))?$', re.VERBOSE).search(version):
return True
return False

Marek Chrastina
committed
def is_in_specifiers(version, specifiers):
Return true if version satisfy specifiers, otherwise return false
if not specifiers:
return True
elif version is None:
return True
else:
# https://github.com/pypa/packaging/pull/92
ver = packaging.version.LegacyVersion(version)
specifiers = [
packaging.specifiers.LegacySpecifier(s.strip()) for s in specifiers if s.strip()]
return all(s.contains(ver) for s in specifiers)
def is_in_conditions(condition):

Marek Chrastina
committed
"""
Return true if condition satisfy sys_platform and python_version and
platform_python_implementation, otherwise return false

Marek Chrastina
committed
"""
if not condition:
return True
# pylint: disable=eval-used
return eval(
condition.replace("sys_platform", '"%s"' % SYS_PLAT) \
.replace("python_version", '"%s"' % PY_VER) \
.replace("platform_python_implementation", '"%s"' % PLAT_PY_IMPL))

Marek Chrastina
committed
def is_in_extra(extra, req_extra):
Return true if extra satisfy, otherwise return false
if extra is None or extra in req_extra:
return True
return False
# pylint: disable=too-many-branches
def specifier_boarders(specifiers):
Return specifier boarders, equals and notequals
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
left = SBoarder([s for s in specifiers if s.operator in ['>', '>=']], None, None)
right = SBoarder([s for s in specifiers if s.operator in ['<', '<=']], None, None)
if left.boarders:
left = left._replace(extrem=sorted([s.version for s in left.boarders],
key=packaging.specifiers.LegacyVersion,
reverse=True)[0])
left = left._replace(extrem_op=[s.operator for s in left.boarders \
if s.version == left.extrem])
if '>' in left.extrem_op:
left = left._replace(extrem_op='>')
else:
left = left._replace(extrem_op='>=')
if right.boarders:
right = right._replace(extrem=sorted([s.version for s in right.boarders],
key=packaging.specifiers.LegacyVersion)[0])
right = right._replace(extrem_op=[s.operator for s in right.boarders \
if s.version == right.extrem])
if '<' in right.extrem_op:
right = right._replace(extrem_op='<')
else:
right = right._replace(extrem_op='<=')
if left.boarders and right.boarders:
if packaging.version.LegacyVersion(left.extrem) > \
packaging.version.LegacyVersion(right.extrem):
left, right = None, None
elif packaging.version.LegacyVersion(left.extrem) == \
packaging.version.LegacyVersion(right.extrem):
if left.extrem_op in ['>='] and right.extrem_op in ['<=']:
left = left._replace(extrem_op='==')
right = right._replace(boarders=None)
else:
left, right = None, None
equals = [s for s in specifiers if s.operator in ['==']]
if equals:
cmp_v = list(set([s.version for s in equals]))[0]
if all([packaging.version.LegacyVersion(cmp_v) == packaging.version.LegacyVersion(item) \
for item in list(set([s.version for s in equals]))]):
equals = cmp_v
else:
equals = None
notequals = [s for s in specifiers if s.operator in ['!=']]
notequals = list(set([s.version for s in notequals]))
return left, right, equals, notequals
def specifiers_intersection(specifiers):
"""
Return intersection of specifiers, otherwise return None
"""
if not specifiers:
return []
specifiers = [packaging.specifiers.LegacySpecifier(s.strip()) for s in specifiers if s.strip()]
left, right, equals, notequals = specifier_boarders(specifiers)
if (left is None and right is None) or equals is None:
boarders = []
for item in [left, right]:
if item.boarders:
boarders.append("%s%s" % (item.extrem_op, item.extrem))
if boarders and notequals:
for item in notequals:
if is_in_specifiers(item, boarders):
boarders.append("!=%s" % item)
elif not boarders and notequals:
for item in notequals:
boarders.append("!=%s" % item)
if boarders and equals:
if not is_in_specifiers(equals, boarders):
return None
boarders = ["==%s" % equals]
elif not boarders and equals:
boarders = ["==%s" % equals]
return boarders
def select_upkgs(data, rkey):
"""
Return data packages having requested key
"""

Marek Chrastina
committed
result = []
for pkg, pkg_data in data.iteritems():
if rkey in pkg_data.keys():
result.append(pkg)
return result
def print_table(data):
Print table upgradeable versions
"""
upkgs = select_upkgs(data, 'upgradeable_version')
if not upkgs:
print "There is nothing to upgrade."
return 0
tab_data = []
for pkg in sorted(upkgs):
tab_data.append([pkg, data[pkg]['installed_version'], data[pkg]['upgradeable_version']])
print tabulate.tabulate(tab_data,
['package', 'installed_version', 'upgradeable_version'])
return 1
def print_list(data):
"""
Print list upgradeable versions
"""
upkgs = select_upkgs(data, 'upgradeable_version')
if not upkgs:
print "There is nothing to upgrade."
return 0
list_data = []
for pkg in sorted(upkgs):
list_data.append("%s==%s" % (pkg, data[pkg]['upgradeable_version']))
print " ".join(list_data,)
def write_metadata(tmp_file):
"""
Write package metadata
"""
try:
tar_file = tarfile.open(tmp_file.name, 'r')
for member in tar_file.getmembers():
if 'requires.txt' in member.name:
with open('/tmp/requires.txt', 'w') as tmpf:
tmpf.write(tar_file.extractfile(member).read())
if 'PKG-INFO' in member.name:
with open('/tmp/PKG-INFO', 'w') as tmpf:
tmpf.write(tar_file.extractfile(member).read())
except tarfile.ReadError:
zip_file = zipfile.ZipFile(tmp_file.name, 'r')
for member in zip_file.namelist():
if 'requires.txt' in member:
with open('/tmp/requires.txt', 'w') as tmpf:
tmpf.write(zip_file.read(member))
if 'PKG-INFO' in member:
with open('/tmp/PKG-INFO', 'w') as tmpf:
tmpf.write(zip_file.read(member))
def get_metadata(package, version):
Return package metadata
metadata = None
for item in get_json("https://pypi.python.org/pypi/%s/%s/json" % (package, version,)) \
['releases'][version]:

Marek Chrastina
committed
if item['packagetype'] == 'sdist':
tmp_file = file_download(item['url'])
write_metadata(tmp_file)
if os.path.isfile('/tmp/PKG-INFO'):
metadata = [
line.decode('utf-8') \
for line in wheel.metadata.pkginfo_to_metadata('/tmp', '/tmp/PKG-INFO') \
.as_string().splitlines()]
try:
os.unlink('/tmp/requires.txt')
except OSError:
pass

Marek Chrastina
committed
try:
os.unlink('/tmp/PKG-INFO')
except OSError:
pass
os.unlink(tmp_file.name)
if metadata:

Marek Chrastina
committed
break
elif item['packagetype'] == 'bdist_wheel':
tmp_file = file_download(item['url'])
zip_file = zipfile.ZipFile(tmp_file.name, 'r')
for member in zip_file.namelist():
if 'METADATA' in member:
metadata = [line.decode('utf-8') for line in zip_file.read(member).splitlines()]
os.unlink(tmp_file.name)
break
return metadata
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
def metadata_version(data):
"""
Return metadata version or None
"""
version = None
for line in data:
if 'Metadata-Version' in line.decode('utf-8'):
version = line.replace('Metadata-Version:', '').strip()
break
return version
def validate_pyver(metadata):
"""
Return True if python version satisfies
"""
if metadata is None:
return None
mversion = metadata_version(metadata)
if mversion and is_in_specifiers(mversion, ['>=2.0']):
py_ver = ", ".join([line.replace('Requires-Python:', '').strip() \
for line in metadata if re.search(r'^Requires-Python:', line)])
py_ver = py_ver.split(',')
if py_ver:
return is_in_specifiers(PY_VER, py_ver)
return True
def parse_metadata(metadata, extra):
"""
Return dependencies parsed from metadata
"""
if metadata is None:
return None
mversion = metadata_version(metadata)
if mversion and is_in_specifiers(mversion, ['>=2.0']):
arr = []
lines = [line.replace('Requires-Dist:', '').strip() \
for line in metadata if re.search(r'^Requires-Dist:', line)]
for line in lines:
data = pkginfo(str(line), req_extra=extra, repair=True)
if data:
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
return arr
def pkginfo(data, req_extra=None, repair=False):
"""
Return parsed pkginfo
"""
extra_match = re.compile(
r"""^(?P<package>.*?)(;\s*(?P<condition>.*?)(extra == '(?P<extra>.*?)')?)$""").search(data)
if extra_match:
groupdict = extra_match.groupdict()
condition = groupdict['condition']
extra = groupdict['extra']
package = groupdict['package']
if condition.endswith(' and '):
condition = condition[:-5]
mysearch = re.compile(r'(extra == .*)').search(condition)
if mysearch:
extra = mysearch.group(1)
condition = condition.replace(extra, '')
if not condition:
condition = None
extra = re.compile(r'extra == (.*)').search(extra).group(1).replace('"', "")
else:
condition, extra = None, None
package = data
if not is_in_conditions(condition):
return None
pkg_name, pkg_extra, pkg_ver = re.compile(
r'([\w\.\-]*)(\[\w*\])?(.*)').search(package).groups()
if pkg_extra:
pkg_extra = pkg_extra.replace("[", "").replace("]", "").lower()
pkg_ver = pkg_ver.replace("(", "").replace(")", "").strip()
if not pkg_ver:
pkg_ver = []
else:
if repair:
try:
pkg_ver = re.compile(r'^(\d.*)$').search(pkg_ver).group(1)
except AttributeError:
pass
pkg_ver = pkg_ver.split(",")
if not is_in_extra(extra, req_extra):
return None
return (pkg_name.lower(), pkg_ver, pkg_extra)
def get_pkg_data():
"""
Return package data
"""
packages_data = {}
# pylint: disable=protected-access
for pkg in pip._internal.utils.misc.get_installed_distributions():
pkg_name, pkg_ver, _pkg_extra = pkginfo(str(pkg))
rev = {'installed_version': pkg_ver,
'requires': [pkginfo(str(dep), repair=True) for dep in pkg.requires()]}
packages_data[pkg_name] = rev
packages_data = insert_extras(packages_data)
packages_data = insert_availables(packages_data)
packages_data = insert_news(packages_data)
while True:
new_packages_data = new_packages(packages_data)
if not new_packages_data:
break
new_packages_data = insert_availables(new_packages_data)
new_packages_data = insert_news(new_packages_data)
packages_data = merge_two_dicts(packages_data, new_packages_data)
check_new_extras(packages_data)
return packages_data
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
def insert_extras(data):
"""
Insert extras
"""
for key in data.keys():
extra = []
for pkg, pkg_data in data.iteritems():
for dep in pkg_data['requires']:
if dep[0] == key:
if dep[2]:
extra.append(dep[2])
data[key]['extras'] = extra
if extra:
# pylint: disable=protected-access
for pkg in pip._internal.utils.misc.get_installed_distributions():
pkg_name, _pkg_ver, _pkg_extra = pkginfo(str(pkg))
if pkg_name == key:
data[key]['requires'] += [pkginfo(str(dep), repair=True, req_extra=extra) \
for dep in pkg.requires(extras=extra)]
return data
def insert_availables(data):
"""
Insert available versions
"""
for pkg, pkg_data in data.iteritems():
if 'available_version' in pkg_data.keys():
continue
try:
data[pkg]['available_version'] = get_available_vers(pkg)
except urllib2.HTTPError:
data[pkg]['available_version'] = []
return data
def get_available_vers(package):
"""
Return descending list of public available strict version
"""
versions = []
try:
data = get_json("https://pypi.python.org/pypi/%s/json" % (package))
except urllib2.HTTPError, err:
raise urllib2.HTTPError(err.url, err.code, None, err.hdrs, err.fp)
releases = data["releases"].keys()
for release in releases:

Marek Chrastina
committed
requires_python, python_version, packagetype = [], [], []
for item in data["releases"][release]:

Marek Chrastina
committed
python_version.append(item['python_version'])
packagetype.append(item['packagetype'])
if item['requires_python'] is not None:
for reqpyt in item['requires_python'].split(","):
requires_python.append(reqpyt.strip())
if requires_python:
requires_python = list(set(requires_python))

Marek Chrastina
committed
if len(packagetype) == 1 and packagetype[0] == 'bdist_wheel' and len(python_version) == 1:
py_ver = re.search(r"^py([0-9])", python_version[0])
if py_ver is not None and not is_in_specifiers(PY_VER, [">= %s" % py_ver.group(1)]):

Marek Chrastina
committed
continue
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
if is_version(release) and is_in_specifiers(PY_VER, requires_python):
versions.append(release)
return sorted(versions, key=packaging.specifiers.LegacyVersion, reverse=True)
def select_news(available_version, installed_version=None):
"""
Select versions newer than installed version, if it is known
"""
if installed_version is None:
return sorted(available_version, key=packaging.specifiers.LegacyVersion, reverse=True)
iver = packaging.version.Version(installed_version)
return sorted([aver for aver in available_version if packaging.version.Version(aver) > iver],
key=packaging.specifiers.LegacyVersion, reverse=True)
def insert_news(data):
"""
Insert new versions
"""
for pkg, pkg_data in data.iteritems():
if 'new_version' in pkg_data.keys():
continue
try:
new_version = select_news(pkg_data['available_version'], pkg_data['installed_version'])
except KeyError:
new_version = select_news(pkg_data['available_version'])
if new_version:
res = {}
for version in new_version:
metadata = get_metadata(pkg, version)
pyver_validation = validate_pyver(metadata)
if pyver_validation is not None and pyver_validation is False:
pkg_data['available_version'].remove(version)
continue
content = parse_metadata(metadata, pkg_data['extras'])
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
if content is not None:
res[version] = content
if res:
pkg_data['new_version'] = res
return data
def new_packages(data):
"""
Return new packages as dictionary
"""
out = {}
arr = []
pkg_list = data.keys()
for pkg, pkg_data in data.iteritems():
try:
for dep in pkg_data['requires']:
if dep[0] not in pkg_list:
arr.append(dep)
for _ver, ver_data in pkg_data['new_version'].iteritems():
for dep in ver_data:
if dep[0] not in pkg_list:
arr.append(dep)
except KeyError:
pass
for item in list(set([i[0] for i in arr])):
extras = []
for pkg, _req, extra in arr:
if pkg == item and extra is not None:
extras.append(extra)
out[item] = {'extras': extras}
return out

Marek Chrastina
committed
def check_new_extras(data):

Marek Chrastina
committed
"""
Check if there are new extras

Marek Chrastina
committed
"""
extra_pkgs = []
pkg_list = data.keys()
for pkg, pkg_data in data.iteritems():
try:
for _ver, ver_data in pkg_data['new_version'].iteritems():
for dep in ver_data:
if dep[0] in pkg_list and dep[2] is not None:
extra_pkgs.append(dep)
except KeyError:
pass
for pkg, _req, extra in extra_pkgs:
if extra not in data[pkg]['extras']:
raise Exception('There are new extras!')
def check_extras(data):
"""
Check if there are extras in upgradeable packages
"""
for package in select_upkgs(data, 'upgradeable_version'):
if data[package]['extras']:
raise Exception('There are extras in upgradeable packages!')
def check_co_branches(data):
"""
Check if there branches with intersection of packages
"""
branches = get_branches(data)
co_branches = get_co_branches(branches)
if co_branches:
raise Exception('There are branches with intersection of packages!')

Marek Chrastina
committed
def pvector(package, data):

Marek Chrastina
committed
"""
Return vector of package versions

Marek Chrastina
committed
"""
out = []
if 'new_version' not in data[package].keys():
out.append((package, data[package]['installed_version']))
else:
if 'upgradeable_version' in data[package].keys():
out.append((package, data[package]['upgradeable_version']))

Marek Chrastina
committed
else:
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
if 'installed_version' in data[package].keys():
out.append((package, data[package]['installed_version']))
for ver in sorted(data[package]['new_version'].keys(),
key=packaging.specifiers.LegacyVersion):
if 'upgradeable_version' in data[package].keys():
if packaging.specifiers.LegacyVersion(ver) > \
packaging.specifiers.LegacyVersion(data[package]['upgradeable_version']):
out.append((package, ver))
else:
out.append((package, ver))
return out
def single_multi(data):
"""
Return list of packages with new versions and list of packages without new versions
"""
pkg_list, single, multi = [], [], []
for pkg, pkg_data in data.iteritems():
if 'requires' in pkg_data.keys():
pkg_list.append(pkg)
for pkg in pkg_list:
vec = pvector(pkg, data)
if len(vec) == 1:
single.append(*vec)
elif len(vec) > 1:
multi.append(vec)
single = list(set([item[0] for item in single]))
multi = list(set([item[0] for pkg_data in multi for item in pkg_data]))
return single, multi
def move_incompatible(data, to_delete):
"""
Move new version to incompatible
"""
if not to_delete:
return data
for package, version in to_delete:
if 'incompatible_version' not in data[package].keys():
data[package]['incompatible_version'] = {}
data[package]['incompatible_version'][version] = data[package]['new_version'][version]
del data[package]['new_version'][version]
if not data[package]['new_version']:
del data[package]['new_version']
return data
def get_compatible(versions, reqs, inverse=False):
"""
Return compatible versions
"""
specifiers = specifiers_intersection([i for i in itertools.chain(*[req[1] for req in reqs])])
if inverse:
v_versions = [version for version in versions if not is_in_specifiers(version, specifiers)]

Marek Chrastina
committed
else:
v_versions = [version for version in versions if is_in_specifiers(version, specifiers)]
return sorted(v_versions, key=packaging.specifiers.LegacyVersion, reverse=True)

Marek Chrastina
committed
def get_hards(data, package_no_news):
"""
Return requirements
"""
out = {}
deps = get_simple_reqs(data, None, package_no_news)
for item in list(set([pkg[0] for pkg in deps])):
reqs, extras = [], []
for pkg, req, extra in deps:
if pkg == item:
reqs += req
if extra:
extras += extra
if 'installed_version' in data[item].keys():
out[item] = {'installed_version': data[item]['installed_version'],
'requirements': list(set(reqs)),
'extras': list(set(extras))}
return out

Marek Chrastina
committed
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
def del_excls(data, excludes):
"""
Return list of packages and their versions that are excluded by argument
"""
to_delete = []
_package_no_news, package_with_news = single_multi(data)
package_not_installed = not_installed(data)
for exc in excludes:
try:
exc_pkg, exc_ver = exc[0], exc[1]
except IndexError:
exc_pkg, exc_ver = exc[0], None
if exc_pkg not in package_with_news+package_not_installed:
print "Warning! Excluded package {} has no upgrades. Ignoring".format(exc_pkg)
continue
vers = [ver for ver, _ver_data in data[exc_pkg]['new_version'].iteritems()]
if exc_ver is not None and exc_ver not in vers:
print "Warning! Excluded package {}=={} is not upgradable. Ignoring".format(exc_pkg,
exc_ver)
continue
if exc_ver is None:
for ver in vers:
to_delete.append((exc_pkg, ver))
else:
to_delete.append((exc_pkg, exc_ver))
return to_delete
def del_hards(data):
"""
Return list of packages and their versions that does not satisfy
requirements of packages without new version
"""
to_delete = []
package_no_news, package_with_news = single_multi(data)
hard_requirements = get_hards(data, package_no_news)
for pkg in package_with_news+not_installed(data):
for ver, ver_data in data[pkg]['new_version'].iteritems():
for dep, req, _extra in ver_data:
if dep in hard_requirements.keys():
if specifiers_intersection(req+hard_requirements[dep]['requirements']) is None:
to_delete.append((pkg, ver))
return to_delete
def del_no_news(data):
Return list of packages and their versions that does not satisfy packages without new version
to_delete = []
package_no_news, package_with_news = single_multi(data)
hard_requirements = get_hards(data, package_no_news)
for package in package_with_news+not_installed(data):
if package in hard_requirements.keys():
versions = [pkg[1] for pkg in pvector(package, data)]
specifiers = specifiers_intersection(hard_requirements[package]['requirements'])
for version in versions:
if not is_in_specifiers(version, specifiers):
to_delete.append((package, version))
return to_delete

Marek Chrastina
committed
def del_one_ver(data):

Marek Chrastina
committed
"""
If all packages requirements lead to one specific version, return list of that packages

Marek Chrastina
committed
"""
751
752
753
754
755
756
757
758
759
760
761
762
763
764
765
766
767
768
769
770
771
772
773
774
775
776
777
778
779
780
781
782
783
784
785
786
787
788
789
790
791
792
793
794
795
796
797
798
to_delete = []
_package_no_news, package_with_news = single_multi(data)
for package in package_with_news+not_installed(data):
reqs = get_reqs(data, package)
specifiers = specifiers_intersection(
[i for i in itertools.chain(*[req[1] for req in reqs])])
if specifiers:
if len(specifiers) == 1 and '==' in specifiers[0]:
versions = [pkg[1] for pkg in pvector(package, data)]
versions.remove(specifiers[0].replace('==', ''))
for version in versions:
to_delete.append((package, version))
return to_delete
def del_notinstalled(data):
"""
If no package requires notinstalled packages, return list of that packages
"""
to_delete = []
for package in not_installed(data):
reqs = get_reqs(data, package)
if not reqs:
for pkg, version in pvector(package, data):
to_delete.append((pkg, version))
return to_delete
def get_deps(data, package):
"""
Return package deep requirements
"""
try:
content = data[package]
except KeyError:
content = []
for pkg in content:
yield pkg
for child in get_deps(data, pkg):
yield child
def not_installed(data):
"""
Return not installed packages
"""
not_i = []
for pkg, pkg_data in data.iteritems():
if 'requires' not in pkg_data.keys() and 'new_version' in pkg_data.keys():
not_i.append(pkg)
return not_i
def get_no_news_req(data):
Return requirements of packages without new versions
reqs = {}
package_no_news, _package_with_news = single_multi(data)
for package in package_no_news:
version = pvector(package, data)[0][1]
reqs = save_version(reqs, data, package, version)
return reqs
def save_version(r_data, p_data, pkg, ver):
"""
Save the highest package version
"""
if 'installed_version' in p_data[pkg].keys() and p_data[pkg]['installed_version'] == ver:
r_data[pkg] = p_data[pkg]['requires']
818
819
820
821
822
823
824
825
826
827
828
829
830
831
832
833
834
835
836
837
838
839
840
841
842
843
844
845
846
847
848
849
850
851
852
853
854
855
856
857
858
859
860
861
862
863
864
865
866
867
868
869
870
871
872
873
874
875
876
877
878
879
880
881
882
883
884
885
886
887
888
889
890
891
892
893
894
895
896
897
898
899
900
901
902
903
904
905
906
907
908
909
910
911
912
913
914
915
916
917
918
919
920
921
922
923
924
925
926
927
928
929
930
931
932
933
934
935
936
937
938
939
940
941
942
943
944
945
946
947
948
949
950
951
952
953
954
955
956
957
958
959
960
961
962
963
964
965
966
967
968
969
970
971
972
973
974
975
976
977
978
979
980
981
982
983
984
985
986
987
988
989
990
991
992
993
994
995
996
997
998
999
1000
r_data[pkg] = p_data[pkg]['new_version'][ver]
return r_data
def add_reqs(reqs, data, pkg=None, addpkg=None):
"""
Append requirements
"""
for dep, req, extra in data:
if pkg and dep != pkg:
continue
if addpkg:
reqs.append(addpkg)
else:
reqs.append((dep, req, extra))
def save_ic(out, package, incompatible=None, compatible=None):
"""
Save compatible/incompatible version
"""
if package not in out.keys():
out[package] = {'incompatible': [], 'compatible': None}
if incompatible:
out[package]['incompatible'].append(incompatible)
if compatible:
out[package]['compatible'] = compatible
return out
def get_reqs(data, package, data2=None, addpkg=False):
"""
Get requirements
"""
reqs = []
if data2:
for pkg in data2:
if 'upgradeable_version' in data[pkg].keys():
uver = data[pkg]['upgradeable_version']
add_reqs(reqs, data[pkg]['new_version'][uver], pkg=package)
else:
add_reqs(reqs, data[pkg]['requires'], pkg=package)
return reqs
for pkg, pkg_data in data.iteritems():
uver = None
if pkg == package:
continue
if 'upgradeable_version' in pkg_data.keys():
uver = pkg_data['upgradeable_version']
if addpkg:
add_reqs(reqs, pkg_data['new_version'][uver], pkg=package, addpkg=pkg)
else:
add_reqs(reqs, pkg_data['new_version'][uver], pkg=package)
elif 'requires' in pkg_data.keys():
if addpkg:
add_reqs(reqs, pkg_data['requires'], pkg=package, addpkg=pkg)
else:
add_reqs(reqs, pkg_data['requires'], pkg=package)
if 'new_version' in pkg_data.keys():
for ver, ver_data in pkg_data['new_version'].iteritems():
if uver:
if packaging.specifiers.LegacyVersion(ver) <= \
packaging.specifiers.LegacyVersion(uver):
continue
if addpkg:
add_reqs(reqs, ver_data, pkg=package, addpkg=pkg)
else:
add_reqs(reqs, ver_data, pkg=package)
return reqs
def get_reqs_dict(data):
"""
Get requirements
"""
out = {}
for pkg, pkg_data in data.iteritems():
reqs = []
uver = None
if 'upgradeable_version' in pkg_data.keys():
uver = pkg_data['upgradeable_version']
add_reqs(reqs, pkg_data['new_version'][uver])
elif 'requires' in pkg_data.keys():
add_reqs(reqs, pkg_data['requires'])
if 'new_version' in pkg_data.keys():
for ver, ver_data in pkg_data['new_version'].iteritems():
if uver:
if packaging.specifiers.LegacyVersion(ver) <= \
packaging.specifiers.LegacyVersion(uver):
continue
add_reqs(reqs, ver_data)
out[pkg] = list(set([req[0] for req in reqs]))
return out
def get_simple_reqs(data, package_with_news, package_no_news):
"""
Return no_requires, only_no_news_requires or requirements
"""
if package_with_news is None:
reqs = []
for pkg in package_no_news:
if 'requires' in data[pkg].keys():
if 'upgradeable_version' in data[pkg].keys():
uver = data[pkg]['upgradeable_version']
reqs += [req for req in data[pkg]['new_version'][uver] if req[1] or req[2]]
else:
reqs += [req for req in data[pkg]['requires'] if req[1] or req[2]]
return reqs
no_requires, only_no_news_requires, = {}, {}
for pkg in package_with_news:
dep_in_no_news_vers = []
dep_vers = []
uver = None
if 'upgradeable_version' in data[pkg].keys():
uver = data[pkg]['upgradeable_version']
if 'new_version' in data[pkg].keys():
for ver, ver_data in data[pkg]['new_version'].iteritems():
if uver:
if packaging.specifiers.LegacyVersion(ver) <= \
packaging.specifiers.LegacyVersion(uver):
continue
if ver_data:
reqs = [req[0] for req in ver_data]
reqs_not_in_no_news = [req for req in reqs if req not in package_no_news]
if not reqs_not_in_no_news:
dep_in_no_news_vers.append(ver)
else:
dep_vers.append(ver)
if dep_vers:
no_requires[pkg] = dep_vers
if dep_in_no_news_vers:
only_no_news_requires[pkg] = dep_in_no_news_vers
return no_requires, only_no_news_requires
def phase_one(data):
"""
Partial resolve upgrades
"""
out, no_requires_deps = {}, {}
package_no_news, package_with_news = single_multi(data)
no_requires, only_no_news_requires = get_simple_reqs(data, package_with_news, package_no_news)
for package, version in no_requires.iteritems():
reqs = get_reqs(data, package, addpkg=True)
if reqs:
no_requires_deps[package] = list(set(reqs))
else:
out = save_ic(out, package,
compatible=sorted(no_requires[package],
key=packaging.specifiers.LegacyVersion,
reverse=True)[0])
for package, dep in no_requires_deps.iteritems():
if all([pkg in package_no_news for pkg in dep]):
reqs = get_reqs(data, package, data2=dep)
compatible = get_compatible(no_requires[package], reqs)
for version in no_requires[package]:
if version not in compatible:
out = save_ic(out, package, incompatible=version)
if compatible:
out = save_ic(out, package, compatible=compatible[0])
for package, versions in only_no_news_requires.iteritems():
reqs = get_reqs(data, package, addpkg=True)
if all([item in package_no_news for item in list(set(reqs))]):
out = save_ic(out, package,
compatible=sorted(versions,
key=packaging.specifiers.LegacyVersion,
reverse=True)[0])
return out
def get_branches(data):
"""
Return branches
"""
branches = []
package_reqs = {}
_package_no_news, package_with_news = single_multi(data)
package_with_news = package_with_news+not_installed(data)
package_info = get_reqs_dict(data)
for package in package_with_news:
package_reqs[package] = list(set([i for i in get_deps(package_info, package)]))
for package in package_with_news:
res = []
for pkg, deps in package_reqs.iteritems():
if pkg == package:
continue
if package in deps: