Newer
Older
# pylint: disable=too-many-lines
import collections
import itertools

Marek Chrastina
committed
import tarfile
import tempfile

Marek Chrastina
committed
import zipfile
import wheel.metadata

Marek Chrastina
committed
import packaging.specifiers
import packaging.version
import pip._internal.utils.misc
# https://www.python.org/dev/peps/pep-0508/#environment-markers
PY_VER = ".".join(map(str, sys.version_info[:2]))
SYS_PLAT = sys.platform
PLAT_PY_IMPL = platform.python_implementation()
SBoarder = collections.namedtuple("SBoarder", ["boarders", "extrem", "extrem_op"])
def arg_parse():
"""
argument parser
"""
parser = argparse.ArgumentParser(
description="Pipdeps shows/upgrades outdated packages with respect to existing \
dependencies."
)
parser.add_argument('-e', '--exclude',
nargs='*',
help="Space-separated list of excluded package (and version). \
Format package==version or package for all versions")
group = parser.add_mutually_exclusive_group(required=True)
group.add_argument('-l', '--list',
action='store_true',
help="show list of upgradeable packages and versions")
group.add_argument('-t', '--table',
action='store_true',
help="show table of upgradeable packages and versions")
group.add_argument('-u', '--upgrade',
action='store_true',

Marek Chrastina
committed
help="upgrade upgradeable packages")
def get_excludes(data):
"""
Parse argument excludes into array of (pkg, ver)
"""
if data is None:
return []
return [pkg.split("==") for pkg in data]
def upgrade_package(data):
pip install --upgrade "<package>==<versions>"
to_upgrade = []
for package, version in data:
to_upgrade.append("%s==%s" % (package, version))
["pip", "install", "--upgrade"] + to_upgrade,
return json.load(urllib2.urlopen(urllib2.Request(url)))

Marek Chrastina
committed
def file_download(url):
"""
Download file from url as temporary file
It returns file object
"""
tmp_file = tempfile.NamedTemporaryFile(delete=False)
rfile = urllib2.urlopen(url)
with tmp_file as output:
output.write(rfile.read())
return tmp_file
def merge_two_dicts(in_x, in_y):
Return merge of two dictionaries
out = in_x.copy()
out.update(in_y)
return out
def is_version(version):

Marek Chrastina
committed
"""
Return true if version satisfy regex, otherwise return false

Marek Chrastina
committed
"""
if re.compile(r'^(\d+) \. (\d+) (\. (\d+))? (\. (\d+))?$', re.VERBOSE).search(version) or \
re.compile(r'^(\d+) \. (\d+) (\. (\d+))? (rc(\d+))?$', re.VERBOSE).search(version):
return True
return False

Marek Chrastina
committed
def is_in_specifiers(version, specifiers):
Return true if version satisfy specifiers, otherwise return false
if not specifiers:
return True
elif version is None:
return True
else:
# https://github.com/pypa/packaging/pull/92
ver = packaging.version.LegacyVersion(version)
specifiers = [
packaging.specifiers.LegacySpecifier(s.strip()) for s in specifiers if s.strip()]
return all(s.contains(ver) for s in specifiers)
def is_in_conditions(condition):

Marek Chrastina
committed
"""
Return true if condition satisfy sys_platform and python_version and
platform_python_implementation, otherwise return false

Marek Chrastina
committed
"""
if not condition:
return True
# pylint: disable=eval-used
return eval(
condition.replace("sys_platform", '"%s"' % SYS_PLAT) \
.replace("python_version", '"%s"' % PY_VER) \
.replace("platform_python_implementation", '"%s"' % PLAT_PY_IMPL))

Marek Chrastina
committed
def is_in_extra(extra, req_extra):
Return true if extra satisfy, otherwise return false
if extra is None or extra in req_extra:
return True
return False
# pylint: disable=too-many-branches
def specifier_boarders(specifiers):
Return specifier boarders, equals and notequals
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
left = SBoarder([s for s in specifiers if s.operator in ['>', '>=']], None, None)
right = SBoarder([s for s in specifiers if s.operator in ['<', '<=']], None, None)
if left.boarders:
left = left._replace(extrem=sorted([s.version for s in left.boarders],
key=packaging.specifiers.LegacyVersion,
reverse=True)[0])
left = left._replace(extrem_op=[s.operator for s in left.boarders \
if s.version == left.extrem])
if '>' in left.extrem_op:
left = left._replace(extrem_op='>')
else:
left = left._replace(extrem_op='>=')
if right.boarders:
right = right._replace(extrem=sorted([s.version for s in right.boarders],
key=packaging.specifiers.LegacyVersion)[0])
right = right._replace(extrem_op=[s.operator for s in right.boarders \
if s.version == right.extrem])
if '<' in right.extrem_op:
right = right._replace(extrem_op='<')
else:
right = right._replace(extrem_op='<=')
if left.boarders and right.boarders:
if packaging.version.LegacyVersion(left.extrem) > \
packaging.version.LegacyVersion(right.extrem):
left, right = None, None
elif packaging.version.LegacyVersion(left.extrem) == \
packaging.version.LegacyVersion(right.extrem):
if left.extrem_op in ['>='] and right.extrem_op in ['<=']:
left = left._replace(extrem_op='==')
right = right._replace(boarders=None)
else:
left, right = None, None
equals = [s for s in specifiers if s.operator in ['==']]
if equals:
cmp_v = list(set([s.version for s in equals]))[0]
if all([packaging.version.LegacyVersion(cmp_v) == packaging.version.LegacyVersion(item) \
for item in list(set([s.version for s in equals]))]):
equals = cmp_v
else:
equals = None
notequals = [s for s in specifiers if s.operator in ['!=']]
notequals = list(set([s.version for s in notequals]))
return left, right, equals, notequals
def specifiers_intersection(specifiers):
"""
Return intersection of specifiers, otherwise return None
"""
if not specifiers:
return []
specifiers = [packaging.specifiers.LegacySpecifier(s.strip()) for s in specifiers if s.strip()]
left, right, equals, notequals = specifier_boarders(specifiers)
if (left is None and right is None) or equals is None:
boarders = []
for item in [left, right]:
if item.boarders:
boarders.append("%s%s" % (item.extrem_op, item.extrem))
if boarders and notequals:
for item in notequals:
if is_in_specifiers(item, boarders):
boarders.append("!=%s" % item)
elif not boarders and notequals:
for item in notequals:
boarders.append("!=%s" % item)
if boarders and equals:
if not is_in_specifiers(equals, boarders):
return None
boarders = ["==%s" % equals]
elif not boarders and equals:
boarders = ["==%s" % equals]
return boarders
def select_upkgs(data, rkey):
"""
Return data packages having requested key
"""

Marek Chrastina
committed
result = []
for pkg, pkg_data in data.iteritems():
if rkey in pkg_data.keys():
result.append(pkg)
return result
def print_table(data):
Print table upgradeable versions
"""
upkgs = select_upkgs(data, 'upgradeable_version')
if not upkgs:
print "There is nothing to upgrade."
return 0
tab_data = []
for pkg in sorted(upkgs):
tab_data.append([pkg, data[pkg]['installed_version'], data[pkg]['upgradeable_version']])
print tabulate.tabulate(tab_data,
['package', 'installed_version', 'upgradeable_version'])
return 1
def print_list(data):
"""
Print list upgradeable versions
"""
upkgs = select_upkgs(data, 'upgradeable_version')
if not upkgs:
print "There is nothing to upgrade."
return 0
list_data = []
for pkg in sorted(upkgs):
list_data.append("%s==%s" % (pkg, data[pkg]['upgradeable_version']))
print " ".join(list_data,)
def write_metadata(tmp_file):
"""
Write package metadata
"""
try:
tar_file = tarfile.open(tmp_file.name, 'r')
for member in tar_file.getmembers():
if 'requires.txt' in member.name:
with open('/tmp/requires.txt', 'w') as tmpf:
tmpf.write(tar_file.extractfile(member).read())
if 'PKG-INFO' in member.name:
with open('/tmp/PKG-INFO', 'w') as tmpf:
tmpf.write(tar_file.extractfile(member).read())
except tarfile.ReadError:
zip_file = zipfile.ZipFile(tmp_file.name, 'r')
for member in zip_file.namelist():
if 'requires.txt' in member:
with open('/tmp/requires.txt', 'w') as tmpf:
tmpf.write(zip_file.read(member))
if 'PKG-INFO' in member:
with open('/tmp/PKG-INFO', 'w') as tmpf:
tmpf.write(zip_file.read(member))
def get_metadata(package, version):
Return package metadata
metadata = None
for item in get_json("https://pypi.python.org/pypi/%s/%s/json" % (package, version,)) \
['releases'][version]:

Marek Chrastina
committed
if item['packagetype'] == 'sdist':
tmp_file = file_download(item['url'])
write_metadata(tmp_file)
if os.path.isfile('/tmp/PKG-INFO'):
metadata = [
line.decode('utf-8') \
for line in wheel.metadata.pkginfo_to_metadata('/tmp', '/tmp/PKG-INFO') \
.as_string().splitlines()]
try:
os.unlink('/tmp/requires.txt')
except OSError:
pass

Marek Chrastina
committed
try:
os.unlink('/tmp/PKG-INFO')
except OSError:
pass
os.unlink(tmp_file.name)
if metadata:

Marek Chrastina
committed
break
elif item['packagetype'] == 'bdist_wheel':
tmp_file = file_download(item['url'])
zip_file = zipfile.ZipFile(tmp_file.name, 'r')
for member in zip_file.namelist():
if 'METADATA' in member:
metadata = [line.decode('utf-8') for line in zip_file.read(member).splitlines()]
os.unlink(tmp_file.name)
break
return metadata
def parse_metadata(metadata, extra):
"""
Return dependencies parsed from metadata
"""
if metadata is None:
return None
for line in metadata:
if 'Metadata-Version' in line.decode('utf-8'):
metadata_version = line.replace('Metadata-Version:', '').strip()
break
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
arr = []
if metadata_version and \
packaging.version.Version(metadata_version) >= packaging.version.Version('2.0'):
arr = []
lines = [line.replace('Requires-Dist:', '').strip() \
for line in metadata if re.search(r'^Requires-Dist:', line)]
for line in lines:
data = pkginfo(str(line), req_extra=extra, repair=True)
if data:
arr.append(pkginfo(str(line), req_extra=extra, repair=True))
return arr
def get_pkg_data():
"""
Return package data
"""
packages_data = {}
# pylint: disable=protected-access
for pkg in pip._internal.utils.misc.get_installed_distributions():
pkg_name, pkg_ver, _pkg_extra = pkginfo(str(pkg))
rev = {'installed_version': pkg_ver,
'requires': [pkginfo(str(dep), repair=True) for dep in pkg.requires()]}
packages_data[pkg_name] = rev
packages_data = insert_extras(packages_data)
packages_data = insert_availables(packages_data)
packages_data = insert_news(packages_data)
while True:
new_packages_data = new_packages(packages_data)
if not new_packages_data:
break
new_packages_data = insert_availables(new_packages_data)
new_packages_data = insert_news(new_packages_data)
packages_data = merge_two_dicts(packages_data, new_packages_data)
check_new_extras(packages_data)
return packages_data
def pkginfo(data, req_extra=None, repair=False):
"""
Return parsed pkginfo
"""
extra_match = re.compile(
r"""^(?P<package>.*?)(;\s*(?P<condition>.*?)(extra == '(?P<extra>.*?)')?)$""").search(data)
if extra_match:
groupdict = extra_match.groupdict()
condition = groupdict['condition']
extra = groupdict['extra']
package = groupdict['package']
if condition.endswith(' and '):
condition = condition[:-5]
mysearch = re.compile(r'(extra == .*)').search(condition)
if mysearch:
extra = mysearch.group(1)
condition = condition.replace(extra, '')
if not condition:
condition = None
extra = re.compile(r'extra == (.*)').search(extra).group(1).replace('"', "")
else:
condition, extra = None, None
package = data
if not is_in_conditions(condition):
return None
pkg_name, pkg_extra, pkg_ver = re.compile(
r'([\w\.\-]*)(\[\w*\])?(.*)').search(package).groups()
if pkg_extra:
pkg_extra = pkg_extra.replace("[", "").replace("]", "").lower()
pkg_ver = pkg_ver.replace("(", "").replace(")", "").strip()
if not pkg_ver:
pkg_ver = []
else:
if repair:
try:
pkg_ver = re.compile(r'^(\d.*)$').search(pkg_ver).group(1)
except AttributeError:
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
pass
pkg_ver = pkg_ver.split(",")
if not is_in_extra(extra, req_extra):
return None
return (pkg_name.lower(), pkg_ver, pkg_extra)
def insert_extras(data):
"""
Insert extras
"""
for key in data.keys():
extra = []
for pkg, pkg_data in data.iteritems():
for dep in pkg_data['requires']:
if dep[0] == key:
if dep[2]:
extra.append(dep[2])
data[key]['extras'] = extra
if extra:
# pylint: disable=protected-access
for pkg in pip._internal.utils.misc.get_installed_distributions():
pkg_name, _pkg_ver, _pkg_extra = pkginfo(str(pkg))
if pkg_name == key:
data[key]['requires'] += [pkginfo(str(dep), repair=True, req_extra=extra) \
for dep in pkg.requires(extras=extra)]
return data
def insert_availables(data):
"""
Insert available versions
"""
for pkg, pkg_data in data.iteritems():
if 'available_version' in pkg_data.keys():
continue
try:
data[pkg]['available_version'] = get_available_vers(pkg)
except urllib2.HTTPError:
data[pkg]['available_version'] = []
return data
def get_available_vers(package):
"""
Return descending list of public available strict version
"""
versions = []
try:
data = get_json("https://pypi.python.org/pypi/%s/json" % (package))
except urllib2.HTTPError, err:
raise urllib2.HTTPError(err.url, err.code, None, err.hdrs, err.fp)
releases = data["releases"].keys()
for release in releases:

Marek Chrastina
committed
requires_python, python_version, packagetype = [], [], []
for item in data["releases"][release]:

Marek Chrastina
committed
python_version.append(item['python_version'])
packagetype.append(item['packagetype'])
if item['requires_python'] is not None:
for reqpyt in item['requires_python'].split(","):
requires_python.append(reqpyt.strip())
if requires_python:
requires_python = list(set(requires_python))

Marek Chrastina
committed
if len(packagetype) == 1 and packagetype[0] == 'bdist_wheel' and len(python_version) == 1:
pyt_ver = re.search(r"^py([0-9])", python_version[0])
if pyt_ver is not None and not is_in_specifiers(PY_VER, [">= %s" % pyt_ver.group(1)]):
continue
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
if is_version(release) and is_in_specifiers(PY_VER, requires_python):
versions.append(release)
return sorted(versions, key=packaging.specifiers.LegacyVersion, reverse=True)
def select_news(available_version, installed_version=None):
"""
Select versions newer than installed version, if it is known
"""
if installed_version is None:
return sorted(available_version, key=packaging.specifiers.LegacyVersion, reverse=True)
iver = packaging.version.Version(installed_version)
return sorted([aver for aver in available_version if packaging.version.Version(aver) > iver],
key=packaging.specifiers.LegacyVersion, reverse=True)
def insert_news(data):
"""
Insert new versions
"""
for pkg, pkg_data in data.iteritems():
if 'new_version' in pkg_data.keys():
continue
try:
new_version = select_news(pkg_data['available_version'], pkg_data['installed_version'])
except KeyError:
new_version = select_news(pkg_data['available_version'])
if new_version:
res = {}
for version in new_version:
content = parse_metadata(get_metadata(pkg, version), pkg_data['extras'])
if content is not None:
res[version] = content
if res:
pkg_data['new_version'] = res
return data
def new_packages(data):
"""
Return new packages as dictionary
"""
out = {}
arr = []
pkg_list = data.keys()
for pkg, pkg_data in data.iteritems():
try:
for dep in pkg_data['requires']:
if dep[0] not in pkg_list:
arr.append(dep)
for _ver, ver_data in pkg_data['new_version'].iteritems():
for dep in ver_data:
if dep[0] not in pkg_list:
arr.append(dep)
except KeyError:
pass
for item in list(set([i[0] for i in arr])):
extras = []
for pkg, _req, extra in arr:
if pkg == item and extra is not None:
extras.append(extra)
out[item] = {'extras': extras}
return out

Marek Chrastina
committed
def check_new_extras(data):

Marek Chrastina
committed
"""
Check if there are new extras

Marek Chrastina
committed
"""
extra_pkgs = []
pkg_list = data.keys()
for pkg, pkg_data in data.iteritems():
try:
for _ver, ver_data in pkg_data['new_version'].iteritems():
for dep in ver_data:
if dep[0] in pkg_list and dep[2] is not None:
extra_pkgs.append(dep)
except KeyError:
pass
for pkg, _req, extra in extra_pkgs:
if extra not in data[pkg]['extras']:
raise Exception('There are new extras!')
def check_extras(data):
"""
Check if there are extras in upgradeable packages
"""
for package in select_upkgs(data, 'upgradeable_version'):
if data[package]['extras']:
raise Exception('There are extras in upgradeable packages!')
def check_co_branches(data):
"""
Check if there branches with intersection of packages
"""
branches = get_branches(data)
co_branches = get_co_branches(branches)
if co_branches:
raise Exception('There are branches with intersection of packages!')

Marek Chrastina
committed
def pvector(package, data):

Marek Chrastina
committed
"""
Return vector of package versions

Marek Chrastina
committed
"""
out = []
if 'new_version' not in data[package].keys():
out.append((package, data[package]['installed_version']))
else:
if 'upgradeable_version' in data[package].keys():
out.append((package, data[package]['upgradeable_version']))

Marek Chrastina
committed
else:
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
if 'installed_version' in data[package].keys():
out.append((package, data[package]['installed_version']))
for ver in sorted(data[package]['new_version'].keys(),
key=packaging.specifiers.LegacyVersion):
if 'upgradeable_version' in data[package].keys():
if packaging.specifiers.LegacyVersion(ver) > \
packaging.specifiers.LegacyVersion(data[package]['upgradeable_version']):
out.append((package, ver))
else:
out.append((package, ver))
return out
def single_multi(data):
"""
Return list of packages with new versions and list of packages without new versions
"""
pkg_list, single, multi = [], [], []
for pkg, pkg_data in data.iteritems():
if 'requires' in pkg_data.keys():
pkg_list.append(pkg)
for pkg in pkg_list:
vec = pvector(pkg, data)
if len(vec) == 1:
single.append(*vec)
elif len(vec) > 1:
multi.append(vec)
single = list(set([item[0] for item in single]))
multi = list(set([item[0] for pkg_data in multi for item in pkg_data]))
return single, multi
def move_incompatible(data, to_delete):
"""
Move new version to incompatible
"""
if not to_delete:
return data
for package, version in to_delete:
if 'incompatible_version' not in data[package].keys():
data[package]['incompatible_version'] = {}
data[package]['incompatible_version'][version] = data[package]['new_version'][version]
del data[package]['new_version'][version]
if not data[package]['new_version']:
del data[package]['new_version']
return data
def get_compatible(versions, reqs, inverse=False):
"""
Return compatible versions
"""
specifiers = specifiers_intersection([i for i in itertools.chain(*[req[1] for req in reqs])])
if inverse:
v_versions = [version for version in versions if not is_in_specifiers(version, specifiers)]

Marek Chrastina
committed
else:
v_versions = [version for version in versions if is_in_specifiers(version, specifiers)]
return sorted(v_versions, key=packaging.specifiers.LegacyVersion, reverse=True)

Marek Chrastina
committed
def get_hards(data, package_no_news):
"""
Return requirements
"""
out = {}
deps = get_simple_reqs(data, None, package_no_news)
for item in list(set([pkg[0] for pkg in deps])):
reqs, extras = [], []
for pkg, req, extra in deps:
if pkg == item:
reqs += req
if extra:
extras += extra
if 'installed_version' in data[item].keys():
out[item] = {'installed_version': data[item]['installed_version'],
'requirements': list(set(reqs)),
'extras': list(set(extras))}
return out

Marek Chrastina
committed
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
def del_excls(data, excludes):
"""
Return list of packages and their versions that are excluded by argument
"""
to_delete = []
_package_no_news, package_with_news = single_multi(data)
package_not_installed = not_installed(data)
for exc in excludes:
try:
exc_pkg, exc_ver = exc[0], exc[1]
except IndexError:
exc_pkg, exc_ver = exc[0], None
if exc_pkg not in package_with_news+package_not_installed:
print "Warning! Excluded package {} has no upgrades. Ignoring".format(exc_pkg)
continue
vers = [ver for ver, _ver_data in data[exc_pkg]['new_version'].iteritems()]
if exc_ver is not None and exc_ver not in vers:
print "Warning! Excluded package {}=={} is not upgradable. Ignoring".format(exc_pkg,
exc_ver)
continue
if exc_ver is None:
for ver in vers:
to_delete.append((exc_pkg, ver))
else:
to_delete.append((exc_pkg, exc_ver))
return to_delete
def del_hards(data):
"""
Return list of packages and their versions that does not satisfy
requirements of packages without new version
"""
to_delete = []
package_no_news, package_with_news = single_multi(data)
hard_requirements = get_hards(data, package_no_news)
for pkg in package_with_news+not_installed(data):
for ver, ver_data in data[pkg]['new_version'].iteritems():
for dep, req, _extra in ver_data:
if dep in hard_requirements.keys():
if specifiers_intersection(req+hard_requirements[dep]['requirements']) is None:
to_delete.append((pkg, ver))
return to_delete
def del_no_news(data):
Return list of packages and their versions that does not satisfy packages without new version
to_delete = []
package_no_news, package_with_news = single_multi(data)
hard_requirements = get_hards(data, package_no_news)
for package in package_with_news+not_installed(data):
if package in hard_requirements.keys():
versions = [pkg[1] for pkg in pvector(package, data)]
specifiers = specifiers_intersection(hard_requirements[package]['requirements'])
for version in versions:
if not is_in_specifiers(version, specifiers):
to_delete.append((package, version))
return to_delete

Marek Chrastina
committed
def del_one_ver(data):

Marek Chrastina
committed
"""
If all packages requirements lead to one specific version, return list of that packages

Marek Chrastina
committed
"""
723
724
725
726
727
728
729
730
731
732
733
734
735
736
737
738
739
740
741
742
743
744
745
746
747
748
749
750
751
752
753
754
755
756
757
758
759
760
761
762
763
764
765
766
767
768
769
770
to_delete = []
_package_no_news, package_with_news = single_multi(data)
for package in package_with_news+not_installed(data):
reqs = get_reqs(data, package)
specifiers = specifiers_intersection(
[i for i in itertools.chain(*[req[1] for req in reqs])])
if specifiers:
if len(specifiers) == 1 and '==' in specifiers[0]:
versions = [pkg[1] for pkg in pvector(package, data)]
versions.remove(specifiers[0].replace('==', ''))
for version in versions:
to_delete.append((package, version))
return to_delete
def del_notinstalled(data):
"""
If no package requires notinstalled packages, return list of that packages
"""
to_delete = []
for package in not_installed(data):
reqs = get_reqs(data, package)
if not reqs:
for pkg, version in pvector(package, data):
to_delete.append((pkg, version))
return to_delete
def get_deps(data, package):
"""
Return package deep requirements
"""
try:
content = data[package]
except KeyError:
content = []
for pkg in content:
yield pkg
for child in get_deps(data, pkg):
yield child
def not_installed(data):
"""
Return not installed packages
"""
not_i = []
for pkg, pkg_data in data.iteritems():
if 'requires' not in pkg_data.keys() and 'new_version' in pkg_data.keys():
not_i.append(pkg)
return not_i
def get_no_news_req(data):
Return requirements of packages without new versions
reqs = {}
package_no_news, _package_with_news = single_multi(data)
for package in package_no_news:
version = pvector(package, data)[0][1]
reqs = save_version(reqs, data, package, version)
return reqs
def save_version(r_data, p_data, pkg, ver):
"""
Save the highest package version
"""
if 'installed_version' in p_data[pkg].keys() and p_data[pkg]['installed_version'] == ver:
r_data[pkg] = p_data[pkg]['requires']
790
791
792
793
794
795
796
797
798
799
800
801
802
803
804
805
806
807
808
809
810
811
812
813
814
815
816
817
818
819
820
821
822
823
824
825
826
827
828
829
830
831
832
833
834
835
836
837
838
839
840
841
842
843
844
845
846
847
848
849
850
851
852
853
854
855
856
857
858
859
860
861
862
863
864
865
866
867
868
869
870
871
872
873
874
875
876
877
878
879
880
881
882
883
884
885
886
887
888
889
890
891
892
893
894
895
896
897
898
899
900
901
902
903
904
905
906
907
908
909
910
911
912
913
914
915
916
917
918
919
920
921
922
923
924
925
926
927
928
929
930
931
932
933
934
935
936
937
938
939
940
941
942
943
944
945
946
947
948
949
950
951
952
953
954
955
956
957
958
959
960
961
962
963
964
965
966
967
968
969
970
971
972
973
974
975
976
977
978
979
980
981
982
983
984
985
986
987
988
989
990
991
992
993
994
995
996
997
998
999
1000
r_data[pkg] = p_data[pkg]['new_version'][ver]
return r_data
def add_reqs(reqs, data, pkg=None, addpkg=None):
"""
Append requirements
"""
for dep, req, extra in data:
if pkg and dep != pkg:
continue
if addpkg:
reqs.append(addpkg)
else:
reqs.append((dep, req, extra))
def save_ic(out, package, incompatible=None, compatible=None):
"""
Save compatible/incompatible version
"""
if package not in out.keys():
out[package] = {'incompatible': [], 'compatible': None}
if incompatible:
out[package]['incompatible'].append(incompatible)
if compatible:
out[package]['compatible'] = compatible
return out
def get_reqs(data, package, data2=None, addpkg=False):
"""
Get requirements
"""
reqs = []
if data2:
for pkg in data2:
if 'upgradeable_version' in data[pkg].keys():
uver = data[pkg]['upgradeable_version']
add_reqs(reqs, data[pkg]['new_version'][uver], pkg=package)
else:
add_reqs(reqs, data[pkg]['requires'], pkg=package)
return reqs
for pkg, pkg_data in data.iteritems():
uver = None
if pkg == package:
continue
if 'upgradeable_version' in pkg_data.keys():
uver = pkg_data['upgradeable_version']
if addpkg:
add_reqs(reqs, pkg_data['new_version'][uver], pkg=package, addpkg=pkg)
else:
add_reqs(reqs, pkg_data['new_version'][uver], pkg=package)
elif 'requires' in pkg_data.keys():
if addpkg:
add_reqs(reqs, pkg_data['requires'], pkg=package, addpkg=pkg)
else:
add_reqs(reqs, pkg_data['requires'], pkg=package)
if 'new_version' in pkg_data.keys():
for ver, ver_data in pkg_data['new_version'].iteritems():
if uver:
if packaging.specifiers.LegacyVersion(ver) <= \
packaging.specifiers.LegacyVersion(uver):
continue
if addpkg:
add_reqs(reqs, ver_data, pkg=package, addpkg=pkg)
else:
add_reqs(reqs, ver_data, pkg=package)
return reqs
def get_reqs_dict(data):
"""
Get requirements
"""
out = {}
for pkg, pkg_data in data.iteritems():
reqs = []
uver = None
if 'upgradeable_version' in pkg_data.keys():
uver = pkg_data['upgradeable_version']
add_reqs(reqs, pkg_data['new_version'][uver])
elif 'requires' in pkg_data.keys():
add_reqs(reqs, pkg_data['requires'])
if 'new_version' in pkg_data.keys():
for ver, ver_data in pkg_data['new_version'].iteritems():
if uver:
if packaging.specifiers.LegacyVersion(ver) <= \
packaging.specifiers.LegacyVersion(uver):
continue
add_reqs(reqs, ver_data)
out[pkg] = list(set([req[0] for req in reqs]))
return out
def get_simple_reqs(data, package_with_news, package_no_news):
"""
Return no_requires, only_no_news_requires or requirements
"""
if package_with_news is None:
reqs = []
for pkg in package_no_news:
if 'requires' in data[pkg].keys():
if 'upgradeable_version' in data[pkg].keys():
uver = data[pkg]['upgradeable_version']
reqs += [req for req in data[pkg]['new_version'][uver] if req[1] or req[2]]
else:
reqs += [req for req in data[pkg]['requires'] if req[1] or req[2]]
return reqs
no_requires, only_no_news_requires, = {}, {}
for pkg in package_with_news:
dep_in_no_news_vers = []
dep_vers = []
uver = None
if 'upgradeable_version' in data[pkg].keys():
uver = data[pkg]['upgradeable_version']
if 'new_version' in data[pkg].keys():
for ver, ver_data in data[pkg]['new_version'].iteritems():
if uver:
if packaging.specifiers.LegacyVersion(ver) <= \
packaging.specifiers.LegacyVersion(uver):
continue
if ver_data:
reqs = [req[0] for req in ver_data]
reqs_not_in_no_news = [req for req in reqs if req not in package_no_news]
if not reqs_not_in_no_news:
dep_in_no_news_vers.append(ver)
else:
dep_vers.append(ver)
if dep_vers:
no_requires[pkg] = dep_vers
if dep_in_no_news_vers:
only_no_news_requires[pkg] = dep_in_no_news_vers
return no_requires, only_no_news_requires
def phase_one(data):
"""
Partial resolve upgrades
"""
out, no_requires_deps = {}, {}
package_no_news, package_with_news = single_multi(data)
no_requires, only_no_news_requires = get_simple_reqs(data, package_with_news, package_no_news)
for package, version in no_requires.iteritems():
reqs = get_reqs(data, package, addpkg=True)
if reqs:
no_requires_deps[package] = list(set(reqs))
else:
out = save_ic(out, package,
compatible=sorted(no_requires[package],
key=packaging.specifiers.LegacyVersion,
reverse=True)[0])
for package, dep in no_requires_deps.iteritems():
if all([pkg in package_no_news for pkg in dep]):
reqs = get_reqs(data, package, data2=dep)
compatible = get_compatible(no_requires[package], reqs)
for version in no_requires[package]:
if version not in compatible:
out = save_ic(out, package, incompatible=version)
if compatible:
out = save_ic(out, package, compatible=compatible[0])
for package, versions in only_no_news_requires.iteritems():
reqs = get_reqs(data, package, addpkg=True)
if all([item in package_no_news for item in list(set(reqs))]):
out = save_ic(out, package,
compatible=sorted(versions,
key=packaging.specifiers.LegacyVersion,
reverse=True)[0])
return out
def get_branches(data):
"""
Return branches
"""
branches = []
package_reqs = {}
_package_no_news, package_with_news = single_multi(data)
package_with_news = package_with_news+not_installed(data)
package_info = get_reqs_dict(data)
for package in package_with_news:
package_reqs[package] = list(set([i for i in get_deps(package_info, package)]))
for package in package_with_news:
res = []
for pkg, deps in package_reqs.iteritems():
if pkg == package:
continue
if package in deps:
res.append(pkg)
if not res:
branches.append(package)
package_info = {}
for branch in branches:
package_info[branch] = [i for i in package_reqs[branch] if i in package_with_news]
return package_info
def get_co_branches(branches):
"""
Return corelated branches
"""
co_branches = []
for branch in branches:
for pkg, reqs in branches.iteritems():
if pkg == branch:
continue
if len(branches[branch]+reqs) != len(list(set(branches[branch]+reqs))):
co_branches.append(branch)
return list(set(co_branches))
def cross_packages(data):
"""
Return cross packages
"""
cross_branches = []
out, pkg_reqs = {}, {}
package_branches = get_branches(data)