diff --git a/pipdeps/pipdeps.py b/pipdeps/pipdeps.py index 898a29bec2a138645192b35f5c6b9208272373a6..e21948140ddcdc0895a5bf79b66d9febf89f26be 100644 --- a/pipdeps/pipdeps.py +++ b/pipdeps/pipdeps.py @@ -9,6 +9,10 @@ import urllib2 import re import subprocess import sys +import tarfile +import tempfile +import zipfile + import tabulate import packaging.specifiers import packaging.version @@ -24,10 +28,10 @@ def arg_parse(): group = parser.add_mutually_exclusive_group(required=True) group.add_argument('-l', '--list', action='store_true', - help="show upgradable packages and versions") + help="show upgradeable packages and versions") group.add_argument('-u', '--upgrade', action='store_true', - help="upgrade upgradable packages") + help="upgrade upgradeable packages") return parser.parse_args() def get_pyver(): @@ -36,7 +40,7 @@ def get_pyver(): """ return ".".join(map(str, sys.version_info[:3])) -def check_strict_version(version): +def is_strict_version(version): """ Return true if version is strict, otherwise return false """ @@ -46,17 +50,17 @@ def check_strict_version(version): return False return True -def check_requires_python(pyver, requires_python): +def version_conform_specifiers(version, specifiers): """ - check if running python conforms version required by package + check if version conforms specifiers """ - if not requires_python: + if not specifiers: return True - elif pyver is None: + elif version is None: return True else: - ver = packaging.version.Version(pyver) - spec = packaging.specifiers.SpecifierSet(",".join(requires_python)) + ver = packaging.version.Version(version) + spec = packaging.specifiers.SpecifierSet(",".join(specifiers)) if spec.contains(ver): return True return False @@ -70,7 +74,7 @@ def upgrade_package(package, versions): stderr=subprocess.STDOUT ) -def get_package_list(): +def get_pip_list(): """ pip list """ @@ -80,6 +84,17 @@ def get_package_list(): ) return [line.split()[0] for line in outdated_packages.strip().split("\n")[2:]] +def file_download(url): + """ + Download file from url as temporary file + It returns file object + """ + tmp_file = tempfile.NamedTemporaryFile(delete=False) + rfile = urllib2.urlopen(url) + with tmp_file as output: + output.write(rfile.read()) + return tmp_file + def get_jsonpipdeptree(): """ pipdeptree --json-tree @@ -90,6 +105,12 @@ def get_jsonpipdeptree(): ) return json.loads(pipdeptree.strip()) +def get_json(url): + """ + Return url json + """ + return json.load(urllib2.urlopen(urllib2.Request(url))) + def json_search(jsonpipdeptree, package, key): """ find package dependencies in json tree @@ -106,77 +127,189 @@ def json_search(jsonpipdeptree, package, key): for item_val in json_search(item, package, key): yield item_val -def find_available_versions(package_name, pyver): +def get_highest_version(package, data): + """ + Return upgradeable version if possible, otherwise return installed version + """ + try: + version = data[package]['upgradeable_version'] + except KeyError: + version = data[package]['installed_version'] + return version + +def find_available_vers(package_name, pyver): """ Return descending list of available strict version """ versions = [] - url = "https://pypi.python.org/pypi/%s/json" % (package_name,) - data = json.load(urllib2.urlopen(urllib2.Request(url))) + data = get_json("https://pypi.python.org/pypi/%s/json" % (package_name,)) releases = data["releases"].keys() for release in releases: - requires_python = [item['requires_python'] for item in data["releases"][release] if item['requires_python'] is not None] # pylint: disable=line-too-long - if check_strict_version(release) and check_requires_python(pyver, requires_python): + requires_python = [] + for item in data["releases"][release]: + if item['requires_python'] is not None: + requires_python.append(item['requires_python']) + if is_strict_version(release) and version_conform_specifiers(pyver, requires_python): versions.append(release) return sorted(versions, key=distutils.version.StrictVersion, reverse=True) -def find_upgradable_version(available_version, installed_version, required_version): +def get_newer_vers(available_version, required_version, installed_version=None): """ - Return upgradable version, otherwise return none. + Return list of newer versions which conforms pipdeptree dependencies, otherwise return none. """ - if [rver for rver in required_version if re.search(r'(^==.*|^\d.*)', rver) is not None]: # pylint: disable=line-too-long + if [rver for rver in required_version if re.search(r'(^==.*|^\d.*)', rver) is not None]: return None - av_version = list(available_version) # copy list as new list + result = [] + av_version = list(available_version) while True: try: version = av_version.pop(0) except IndexError: break aver = packaging.version.Version(version) - iver = packaging.version.Version(installed_version) rver = packaging.specifiers.SpecifierSet(",".join(required_version)) if rver.contains(aver): - if aver == iver: - break - elif aver > iver: - return version + if installed_version is not None: + iver = packaging.version.Version(installed_version) + if aver == iver: + break + elif aver > iver: + result.append(version) + else: + result.append(version) + if result: + return sorted(result, key=distutils.version.StrictVersion, reverse=True) return None -def collect_packages(package_list, jsonpipdeptree, exclude=None, pyver=None): +def parse_requires_txt(package, version): """ - Collect data about packages and return as list of dictionaries + Return content of requires.txt until first [ appears """ - result = [] + content = None + release_data = get_json("https://pypi.python.org/pypi/%s/%s/json" % (package, version,)) + for item in release_data['releases'][version]: + if item['packagetype'] == 'sdist': + tmp_file = file_download(item['url']) + try: + tar_file = tarfile.open(tmp_file.name, 'r') + for member in tar_file.getmembers(): + if 'requires.txt' in member.name: + content = tar_file.extractfile(member) + except tarfile.ReadError: + zip_file = zipfile.ZipFile(tmp_file.name, 'r') + for member in zip_file.namelist(): + if 'requires.txt' in member: + content = zip_file.read(member) + if content is not None: + par = [] + for line in content: + if '[' in line: + break + else: + par.append(line.strip()) + content = "\n".join(par) + os.unlink(tmp_file.name) + return content + +def find_new_dependencies(package, version, package_list, pyver): + """ + Return package dependencies parsed from pypi json + """ + content = parse_requires_txt(package, version) + if content is not None: + for line in content.split("\n"): + try: + pkg = re.search(r'^([a-zA-Z0-9_.-]+)', line).group(0) + dep = line.replace(pkg, '').strip() + if not dep: + dep = None + if pkg in package_list: + yield (pkg, dep) + else: + for child in find_new_dependencies( + pkg, + get_newer_vers(find_available_vers(pkg, pyver), dep, None)[0], + package_list, + pyver + ): + yield child + except AttributeError: + pass + +def depless_vers(res): + """ + If there is no dependencies or versionless dependencies, return the upgradeable version, + otherwise return None + """ + depless = [] + for ver, deps in res.iteritems(): + if not deps: + depless.append(ver) + else: + if not [dep for dep in deps if dep[1] is not None]: + depless.append(ver) + if depless: + depless = sorted(depless, key=distutils.version.StrictVersion, reverse=True)[0] + else: + depless = None + return depless + +def collect_packages(package_list, jsonpipdeptree, pyver=None): + """ + Collect data about packages as dictionary + """ + result = {} for package in package_list: - if isinstance(exclude, list): - if package in exclude: - continue - dependencies = [_ for _ in json_search(jsonpipdeptree, package, 'required_version')] - dependencies = list(set(dependencies)) installed_version = "".join(list(set( - [_ for _ in json_search(jsonpipdeptree, package, 'installed_version')] - ))) - required_version = [dep for dep in dependencies if 'Any' not in dep] - available_version = find_available_versions(package, pyver) - upgradable_version = find_upgradable_version( - available_version, installed_version, required_version) - rev = {'package': package, - 'installed_version': installed_version, + [_ for _ in json_search(jsonpipdeptree, package, 'installed_version')]))) + required_version = [] + for dep in list(set( + [_ for _ in json_search(jsonpipdeptree, package, 'required_version')] + )): + if 'Any' not in dep: + required_version.append(dep) + available_version = find_available_vers(package, pyver) + newer_version = get_newer_vers(available_version, required_version, installed_version) + rev = {'installed_version': installed_version, 'required_version': required_version, 'available_version': available_version} - if upgradable_version is not None: - rev['upgradable_version'] = upgradable_version - result.append(rev) + if newer_version is not None: + res = {} + for version in newer_version: + res[version] = [ + _ for _ in find_new_dependencies(package, version, package_list, pyver)] + rev['newer_version'] = res + + depless = depless_vers(res) + if depless: + rev['upgradeable_version'] = depless + + result[package] = rev return result -def filter_upgradable(packages_data): +def check_deps(deps, packages_data): """ - Return only upgradable packages + Return true, if all package dependencies conforms """ - result = [] - for package in packages_data: - if 'upgradable_version' in package.keys(): - result.append(package) + ndeps = [] + for item in deps: + if item[1] is not None: + ndeps.append( + version_conform_specifiers( + get_highest_version(item[0], packages_data), + packages_data[item[0]]['required_version']+[item[1]] + ) + ) + return all(ndeps) + +def select_pkgs(packages_data, rkey): + """ + Return data packages having requested key + """ + result = {} + for pkg, pkg_data in packages_data.iteritems(): + if rkey in pkg_data.keys(): + result[pkg] = pkg_data return result def main(): @@ -186,32 +319,45 @@ def main(): os.environ["PYTHONWARNINGS"] = "ignore:DEPRECATION" arguments = arg_parse() pyver = get_pyver() - pkglist = get_package_list() - while True: - jsonpipdeptree = get_jsonpipdeptree() - packages_data = collect_packages(pkglist, jsonpipdeptree, pyver=pyver) - upgradable_packages = filter_upgradable(packages_data) - if arguments.list: - if upgradable_packages: - data = [[pkg['package'], pkg['installed_version'], pkg['upgradable_version']] for pkg in upgradable_packages] # pylint: disable=line-too-long - header = ['package', 'installed_version', 'upgradable_version'] - print tabulate.tabulate(data, header) - sys.exit(1) - else: - print "There is nothing to upgrade." - sys.exit(0) + pkglist = get_pip_list() + jsonpipdeptree = get_jsonpipdeptree() + packages_data = collect_packages(pkglist, jsonpipdeptree, pyver=pyver) + for pkg, pkg_data in sorted( + select_pkgs(packages_data, 'newer_version').iteritems(), key=lambda x: x[0].lower() + ): + pkg_keys = pkg_data.keys() + if 'newer_version' in pkg_keys and 'upgradeable_version' not in pkg_keys: + for ver, deps in sorted( + pkg_data['newer_version'].iteritems(), + key=lambda x: distutils.version.StrictVersion(x[0]), + reverse=True + ): + ndeps = check_deps(deps, packages_data) + if ndeps: + packages_data[pkg]['upgradeable_version'] = ver + break + upgradeable_pkgs = select_pkgs(packages_data, 'upgradeable_version') - for index, pkg in enumerate(upgradable_packages): - if pkg['package'] == 'pip': - package = upgradable_packages.pop(index) - upgrade_package(package['package'], package['upgradable_version']) - - try: - package = upgradable_packages.pop(-1) - except IndexError: - break - upgrade_package(package['package'], package['upgradable_version']) - print "Done." + if arguments.list: + if upgradeable_pkgs: + data = [] + for pkg, pkg_data in sorted(upgradeable_pkgs.iteritems(), key=lambda x: x[0].lower()): + data.append([pkg, pkg_data['installed_version'], pkg_data['upgradeable_version']]) + print tabulate.tabulate( + data, + ['package', 'installed_version', 'upgradeable_version'] + ) + sys.exit(1) + else: + print "There is nothing to upgrade." + sys.exit(0) + if arguments.upgrade: + if 'pip' in upgradeable_pkgs.keys(): + upgrade_package('pip', upgradeable_pkgs['pip']['upgradeable_version']) + del upgradeable_pkgs['pip'] + for pkg, pkg_data in sorted(upgradeable_pkgs.iteritems(), key=lambda x: x[0].lower()): + upgrade_package(pkg, pkg_data['upgradeable_version']) + print "Done." if __name__ == "__main__": main()