Commit 903bad8a authored by Campbell Barton's avatar Campbell Barton
Browse files

code_clean: use type hints

'mypy --strict' runs without errors, exposed 3x bugs.
parent c357d5b7
......@@ -27,11 +27,27 @@ Note: currently this is limited to paths in "source/" and "intern/",
we could change this if it's needed.
"""
import argparse
import re
import subprocess
import sys
import os
from typing import (
Any,
Callable,
Dict,
Generator,
List,
Optional,
Sequence,
Tuple,
Type,
)
# List of (source_file, all_arguments)
ProcessedCommands = List[Tuple[str, str]]
USE_MULTIPROCESS = True
VERBOSE = False
......@@ -56,12 +72,12 @@ SOURCE_DIR = os.path.normpath(os.path.join(BASE_DIR, "..", "..", ".."))
# General Utilities
# Note that we could use a hash, however there is no advantage, compare it's contents.
def file_as_bytes(filename):
def file_as_bytes(filename: str) -> bytes:
with open(filename, 'rb') as fh:
return fh.read()
def line_from_span(text, start, end):
def line_from_span(text: str, start: int, end: int) -> str:
while start > 0 and text[start - 1] != '\n':
start -= 1
while end < len(text) and text[end] != '\n':
......@@ -69,7 +85,7 @@ def line_from_span(text, start, end):
return text[start:end]
def files_recursive_with_ext(path, ext):
def files_recursive_with_ext(path: str, ext: Tuple[str, ...]) -> Generator[str, None, None]:
for dirpath, dirnames, filenames in os.walk(path):
# skip '.git' and other dot-files.
dirnames[:] = [d for d in dirnames if not d.startswith(".")]
......@@ -81,9 +97,9 @@ def files_recursive_with_ext(path, ext):
# -----------------------------------------------------------------------------
# Execution Wrappers
def run(args, *, quiet):
def run(args: str, *, quiet: bool) -> int:
if VERBOSE_COMPILER and not quiet:
out = sys.stdout
out = sys.stdout.fileno()
else:
out = subprocess.DEVNULL
......@@ -96,7 +112,7 @@ def run(args, *, quiet):
# -----------------------------------------------------------------------------
# Build System Access
def cmake_cache_var(cmake_dir, var):
def cmake_cache_var(cmake_dir: str, var: str) -> Optional[str]:
with open(os.path.join(cmake_dir, "CMakeCache.txt"), encoding='utf-8') as cache_file:
lines = [
l_strip for l in cache_file
......@@ -113,9 +129,16 @@ def cmake_cache_var(cmake_dir, var):
RE_CFILE_SEARCH = re.compile(r"\s\-c\s([\S]+)")
def process_commands(cmake_dir, data):
def process_commands(cmake_dir: str, data: Sequence[str]) -> Optional[ProcessedCommands]:
compiler_c = cmake_cache_var(cmake_dir, "CMAKE_C_COMPILER")
compiler_cxx = cmake_cache_var(cmake_dir, "CMAKE_CXX_COMPILER")
if compiler_c is None:
sys.stderr.write("Can't find C compiler in %r" % cmake_dir)
return None
if compiler_cxx is None:
sys.stderr.write("Can't find C++ compiler in %r" % cmake_dir)
return None
file_args = []
for l in data:
......@@ -138,7 +161,7 @@ def process_commands(cmake_dir, data):
return file_args
def find_build_args_ninja(build_dir):
def find_build_args_ninja(build_dir: str) -> Optional[ProcessedCommands]:
import time
cmake_dir = build_dir
make_exe = "ninja"
......@@ -149,6 +172,7 @@ def find_build_args_ninja(build_dir):
)
while process.poll():
time.sleep(1)
assert(process.stdout is not None)
out = process.stdout.read()
process.stdout.close()
......@@ -157,7 +181,7 @@ def find_build_args_ninja(build_dir):
return process_commands(cmake_dir, data)
def find_build_args_make(build_dir):
def find_build_args_make(build_dir: str) -> Optional[ProcessedCommands]:
import time
make_exe = "make"
process = subprocess.Popen(
......@@ -167,6 +191,7 @@ def find_build_args_make(build_dir):
)
while process.poll():
time.sleep(1)
assert(process.stdout is not None)
out = process.stdout.read()
process.stdout.close()
......@@ -212,15 +237,20 @@ del namedtuple
class EditGenerator:
__slots__ = ()
def __new__(cls, *args, **kwargs):
def __new__(cls, *args: Tuple[Any], **kwargs: Dict[str, Any]) -> Any:
raise RuntimeError("%s should not be instantiated" % cls)
@staticmethod
def setup():
def edit_list_from_file(_source: str, _data: str, _shared_edit_data: Any) -> List[Edit]:
raise RuntimeError("This function must be overridden by it's subclass!")
return []
@staticmethod
def setup() -> Any:
return None
@staticmethod
def teardown(_shared_edit_data):
def teardown(_shared_edit_data: Any) -> None:
pass
......@@ -237,7 +267,7 @@ class edit_generators:
sizeof(float[4][4])
"""
@staticmethod
def edit_list_from_file(_source, data, _shared_edit_data):
def edit_list_from_file(_source: str, data: str, _shared_edit_data: Any) -> List[Edit]:
edits = []
for match in re.finditer(r"sizeof\(([a-zA-Z_]+)\) \* (\d+) \* (\d+)", data):
......@@ -277,7 +307,7 @@ class edit_generators:
const float abc[3]
"""
@staticmethod
def edit_list_from_file(_source, data, _shared_edit_data):
def edit_list_from_file(_source: str, data: str, _shared_edit_data: Any) -> List[Edit]:
edits = []
# `float abc[3] = {0, 1, 2};` -> `const float abc[3] = {0, 1, 2};`
......@@ -313,7 +343,7 @@ class edit_generators:
1.0f
"""
@staticmethod
def edit_list_from_file(_source, data, _shared_edit_data):
def edit_list_from_file(_source: str, data: str, _shared_edit_data: Any) -> List[Edit]:
edits = []
# `1.f` -> `1.0f`
......@@ -346,7 +376,7 @@ class edit_generators:
(!ELEM(a, b, c))
"""
@staticmethod
def edit_list_from_file(_source, data, _shared_edit_data):
def edit_list_from_file(_source: str, data: str, _shared_edit_data: Any) -> List[Edit]:
edits = []
test_equal = (
......@@ -416,30 +446,29 @@ class edit_generators:
(STR_ELEM(a, b, c))
"""
@staticmethod
def edit_list_from_file(_source, data, _shared_edit_data):
def edit_list_from_file(_source: str, data: str, _shared_edit_data: Any) -> List[Edit]:
edits = []
test_equal = (
r'[\(]*'
r'STREQ'
'\('
'([^\|\(\),]+)' # group 1 (no (|,))
',\s+'
'([^\|\(\),]+)' # group 2 (no (|,))
'\)'
r'\('
r'([^\|\(\),]+)' # group 1 (no (|,))
r',\s+'
r'([^\|\(\),]+)' # group 2 (no (|,))
r'\)'
r'[\)]*'
)
test_not_equal = (
r'[\(]*'
'!' # Only difference.
'!' # Only difference.
r'STREQ'
'\('
'([^\|\(\),]+)' # group 1 (no (|,))
',\s+'
'([^\|\(\),]+)' # group 2 (no (|,))
'\)'
r'\('
r'([^\|\(\),]+)' # group 1 (no (|,))
r',\s+'
r'([^\|\(\),]+)' # group 2 (no (|,))
r'\)'
r'[\)]*'
)
......@@ -486,7 +515,6 @@ class edit_generators:
return edits
class use_const_vars(EditGenerator):
"""
Use `const` where possible:
......@@ -497,7 +525,7 @@ class edit_generators:
const float abc[3] = {0, 1, 2};
"""
@staticmethod
def edit_list_from_file(_source, data, _shared_edit_data):
def edit_list_from_file(_source: str, data: str, _shared_edit_data: Any) -> List[Edit]:
edits = []
# for match in re.finditer(r"( [a-zA-Z0-9_]+ [a-zA-Z0-9_]+ = [A-Z][A-Z_0-9_]*;)", data):
......@@ -516,7 +544,6 @@ class edit_generators:
return edits
class remove_return_parens(EditGenerator):
"""
Remove redundant parenthesis around return arguments:
......@@ -527,7 +554,7 @@ class edit_generators:
return value;
"""
@staticmethod
def edit_list_from_file(_source, data, _shared_edit_data):
def edit_list_from_file(_source: str, data: str, _shared_edit_data: Any) -> List[Edit]:
edits = []
# Remove `return (NULL);`
......@@ -554,7 +581,7 @@ class edit_generators:
!STREQ(a, b)
"""
@staticmethod
def edit_list_from_file(_source, data, _shared_edit_data):
def edit_list_from_file(_source: str, data: str, _shared_edit_data: Any) -> List[Edit]:
edits = []
# `strcmp(a, b) == 0` -> `STREQ(a, b)`
......@@ -597,7 +624,7 @@ class edit_generators:
ARRAY_SIZE(foo)
"""
@staticmethod
def edit_list_from_file(_source, data, _shared_edit_data):
def edit_list_from_file(_source: str, data: str, _shared_edit_data: Any) -> List[Edit]:
edits = []
# Note that this replacement is only valid in some cases,
# so only apply with validation that binary output matches.
......@@ -619,15 +646,14 @@ class edit_generators:
"""
@staticmethod
def _header_guard_from_filename(f):
def _header_guard_from_filename(f: str) -> str:
return '__%s__' % os.path.basename(f).replace('.', '_').upper()
@classmethod
def setup(cls):
def setup(cls) -> Any:
# For each file replace `pragma once` with old-style header guard.
# This is needed so we can remove the header with the knowledge the source file didn't use it indirectly.
files = []
files: List[Tuple[str, str, str, str]] = []
shared_edit_data = {
'files': files,
}
......@@ -654,7 +680,7 @@ class edit_generators:
return shared_edit_data
@staticmethod
def teardown(shared_edit_data):
def teardown(shared_edit_data: Any) -> None:
files = shared_edit_data['files']
for f, src, dst, dst_footer in files:
with open(f, 'r', encoding='utf-8') as fh:
......@@ -669,7 +695,7 @@ class edit_generators:
fh.write(data)
@classmethod
def edit_list_from_file(cls, source, data, _shared_edit_data):
def edit_list_from_file(cls, source: str, data: str, _shared_edit_data: Any) -> List[Edit]:
edits = []
# Remove include.
......@@ -678,7 +704,7 @@ class edit_generators:
header_guard = cls._header_guard_from_filename(header_name)
edits.append(Edit(
span=match.span(),
content='', # Remove the header.
content='', # Remove the header.
content_fail='%s__ALWAYS_FAIL__%s' % (match.group(2), match.group(4)),
extra_build_args=('-D' + header_guard),
))
......@@ -686,7 +712,16 @@ class edit_generators:
return edits
def test_edit(source, output, output_bytes, build_args, data, data_test, keep_edits=True, expect_failure=False):
def test_edit(
source: str,
output: str,
output_bytes: Optional[bytes],
build_args: str,
data: str,
data_test: str,
keep_edits: bool = True,
expect_failure: bool = False,
) -> bool:
"""
Return true if `data_test` has the same object output as `data`.
"""
......@@ -720,7 +755,7 @@ def test_edit(source, output, output_bytes, build_args, data, data_test, keep_ed
# -----------------------------------------------------------------------------
# List Fix Functions
def edit_function_get_all():
def edit_function_get_all() -> List[str]:
fixes = []
for name in dir(edit_generators):
value = getattr(edit_generators, name)
......@@ -730,14 +765,17 @@ def edit_function_get_all():
return fixes
def edit_class_from_id(name):
return getattr(edit_generators, name)
def edit_class_from_id(name: str) -> Type[EditGenerator]:
result = getattr(edit_generators, name)
assert(issubclass(result, EditGenerator))
# MYPY 0.812 doesn't recognize the assert above.
return result # type: ignore
# -----------------------------------------------------------------------------
# Accept / Reject Edits
def apply_edit(data, text_to_replace, start, end, *, verbose):
def apply_edit(data: str, text_to_replace: str, start: int, end: int, *, verbose: bool) -> str:
if verbose:
line_before = line_from_span(data, start, end)
......@@ -755,7 +793,7 @@ def apply_edit(data, text_to_replace, start, end, *, verbose):
return data
def wash_source_with_edits(arg_group):
def wash_source_with_edits(arg_group: Tuple[str, str, str, str, bool, Any]) -> None:
(source, output, build_args, edit_to_apply, skip_test, shared_edit_data) = arg_group
# build_args = build_args + " -Werror=duplicate-decl-specifier"
with open(source, 'r', encoding='utf-8') as fh:
......@@ -814,7 +852,12 @@ def wash_source_with_edits(arg_group):
# -----------------------------------------------------------------------------
# Edit Source Code From Args
def run_edits_on_directory(build_dir, regex_list, edit_to_apply, skip_test=False):
def run_edits_on_directory(
build_dir: str,
regex_list: List[re.Pattern[str]],
edit_to_apply: str,
skip_test: bool = False,
) -> int:
# currently only supports ninja or makefiles
build_file_ninja = os.path.join(build_dir, "build.ninja")
build_file_make = os.path.join(build_dir, "Makefile")
......@@ -830,6 +873,11 @@ def run_edits_on_directory(build_dir, regex_list, edit_to_apply, skip_test=False
(build_file_ninja, build_file_make)
)
return 1
if args is None:
# Error will have been reported.
return 1
# needed for when arguments are referenced relatively
os.chdir(build_dir)
......@@ -841,13 +889,13 @@ def run_edits_on_directory(build_dir, regex_list, edit_to_apply, skip_test=False
os.path.join("source"),
)
def output_from_build_args(build_args):
def output_from_build_args(build_args: str) -> str:
import shlex
build_args = shlex.split(build_args)
i = build_args.index("-o")
return build_args[i + 1]
build_args_split = shlex.split(build_args)
i = build_args_split.index("-o")
return build_args_split[i + 1]
def test_path(c):
def test_path(c: str) -> bool:
# Skip any generated source files (files in the build directory).
if os.path.abspath(c).startswith(build_dir):
return False
......@@ -888,14 +936,15 @@ def run_edits_on_directory(build_dir, regex_list, edit_to_apply, skip_test=False
try:
if USE_MULTIPROCESS:
args = [
args_expanded = [
(c, output_from_build_args(build_args), build_args, edit_to_apply, skip_test, shared_edit_data)
for (c, build_args) in args
]
import multiprocessing
job_total = multiprocessing.cpu_count()
pool = multiprocessing.Pool(processes=job_total * 2)
pool.map(wash_source_with_edits, args)
pool.map(wash_source_with_edits, args_expanded)
del args_expanded
else:
# now we have commands
for i, (c, build_args) in enumerate(args):
......@@ -911,9 +960,8 @@ def run_edits_on_directory(build_dir, regex_list, edit_to_apply, skip_test=False
return 0
def create_parser():
def create_parser() -> argparse.ArgumentParser:
from textwrap import indent, dedent
import argparse
edits_all = edit_function_get_all()
......@@ -926,7 +974,6 @@ def create_parser():
indent(dedent(getattr(edit_generators, edit).__doc__ or '').strip('\n') + '\n', ' '),
)
)
edits_all_docs = "\n".join(edits_all_docs)
parser = argparse.ArgumentParser(
description=__doc__,
......@@ -947,7 +994,7 @@ def create_parser():
"--edit",
dest="edit",
choices=edits_all,
help="Specify the edit preset to run.\n\n" + edits_all_docs + "\n",
help="Specify the edit preset to run.\n\n" + "\n".join(edits_all_docs) + "\n",
required=True,
)
parser.add_argument(
......@@ -965,7 +1012,7 @@ def create_parser():
return parser
def main():
def main() -> int:
parser = create_parser()
args = parser.parse_args()
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment