Skip to content
Snippets Groups Projects
code_clean.py 51 KiB
Newer Older
  • Learn to ignore specific revisions
  •         def setup(cls) -> Any:
    
    Campbell Barton's avatar
    Campbell Barton committed
                # For each file replace `pragma once` with old-style header guard.
    
                # This is needed so we can remove the header with the knowledge the source file didn't use it indirectly.
    
                files: List[Tuple[str, str, str, str]] = []
    
                shared_edit_data = {
                    'files': files,
                }
                for f in files_recursive_with_ext(
                        os.path.join(SOURCE_DIR, 'source'),
                        ('.h', '.hh', '.inl', '.hpp', '.hxx'),
                ):
                    with open(f, 'r', encoding='utf-8') as fh:
                        data = fh.read()
    
                    for match in re.finditer(r'^[ \t]*#\s*(pragma\s+once)\b', data, flags=re.MULTILINE):
                        header_guard = cls._header_guard_from_filename(f)
                        start, end = match.span()
                        src = data[start:end]
                        dst = (
                            '#ifndef %s\n#define %s' % (header_guard, header_guard)
                        )
                        dst_footer = '\n#endif /* %s */\n' % header_guard
                        files.append((f, src, dst, dst_footer))
                        data = data[:start] + dst + data[end:] + dst_footer
                        with open(f, 'w', encoding='utf-8') as fh:
                            fh.write(data)
                        break
                return shared_edit_data
    
            @staticmethod
    
            def teardown(shared_edit_data: Any) -> None:
    
                files = shared_edit_data['files']
                for f, src, dst, dst_footer in files:
                    with open(f, 'r', encoding='utf-8') as fh:
                        data = fh.read()
    
                    data = data.replace(
                        dst, src,
                    ).replace(
                        dst_footer, '',
                    )
                    with open(f, 'w', encoding='utf-8') as fh:
                        fh.write(data)
    
            @classmethod
    
            def edit_list_from_file(cls, _source: str, data: str, _shared_edit_data: Any) -> List[Edit]:
    
                edits = []
    
                # Remove include.
                for match in re.finditer(r"^(([ \t]*#\s*include\s+\")([^\"]+)(\"[^\n]*\n))", data, flags=re.MULTILINE):
                    header_name = match.group(3)
                    header_guard = cls._header_guard_from_filename(header_name)
                    edits.append(Edit(
                        span=match.span(),
    
                        content='',  # Remove the header.
    
                        content_fail='%s__ALWAYS_FAIL__%s' % (match.group(2), match.group(4)),
    
                        extra_build_args=('-D' + header_guard, ),
    
        class use_function_style_cast(EditGenerator):
            """
            Use function call style casts (C++ only).
    
            Replace:
              (float)(a + b)
            With:
              float(a + b)
    
            Also support more complex cases involving right hand bracket insertion.
    
            Replace:
              (float)foo(a + b)
            With:
              float(foo(a + b))
            """
            @staticmethod
            def edit_list_from_file(_source: str, data: str, _shared_edit_data: Any) -> List[Edit]:
    
                any_number_re = "(" + "|".join(BUILT_IN_NUMERIC_TYPES) + ")"
    
                edits = []
    
                # Handle both:
                # - Simple case:  `(float)(a + b)` -> `float(a + b)`.
                # - Complex Case: `(float)foo(a + b) + c` -> `float(foo(a + b)) + c`
                for match in re.finditer(
                        "(\\()" +  # 1st group.
                        any_number_re +  # 2nd group.
                        "(\\))",  # 3rd group.
    
                    # This could be ignored, but `sizeof` accounts for such a large number
                    # of cases that should be left as-is, that it's best to explicitly ignore them.
                    if (
                        (beg > 6) and
                        (data[beg - 6: beg] == 'sizeof') and
                        (not data[beg - 7].isalpha())
                    ):
                        continue
    
    
                    char_after = data[end]
                    if char_after == "(":
                        # Simple case.
                        edits.append(Edit(
                            span=(beg, end),
                            content=match.group(2),
                            content_fail='__ALWAYS_FAIL__',
                        ))
                    else:
                        # The complex case is involved as brackets need to be added.
                        # Currently this is not handled in a clever way, just try add in brackets
                        # and rely on matching build output to know if they were added in the right place.
                        text = match.group(2)
                        span = (beg, end)
                        for offset_end in range(end + 1, len(data)):
                            # Not technically correct, but it's rare that this will span lines.
                            if "\n" == data[offset_end]:
                                break
    
                            if (
                                    (data[offset_end - 1] in IDENTIFIER_CHARS) and
                                    (data[offset_end] in IDENTIFIER_CHARS)
                            ):
                                continue
    
                            # Include `text_tail` in fail content in case it contains comments.
                            text_tail = "(" + data[end:offset_end] + ")"
                            edits.append(Edit(
                                span=(beg, offset_end),
                                content=text + text_tail,
                                content_fail='(__ALWAYS_FAIL__)' + text_tail,
                            ))
    
                # Simple case: `static_cast<float>(a + b)` => `float(a + b)`.
                for match in re.finditer(
                        r"\b(static_cast<)" +  # 1st group.
                        any_number_re +  # 2nd group.
                        "(>)",  # 3rd group.
    
                    edits.append(Edit(
                        span=match.span(),
                        content='%s' % match.group(2),
                        content_fail='__ALWAYS_FAIL__',
                    ))
    
                return edits
    
    
    def test_edit(
            source: str,
            output: str,
            output_bytes: Optional[bytes],
    
            build_args: Sequence[str],
            build_cwd: Optional[str],
    
            data: str,
            data_test: str,
            keep_edits: bool = True,
            expect_failure: bool = False,
    ) -> bool:
    
        """
        Return true if `data_test` has the same object output as `data`.
        """
        if os.path.exists(output):
            os.remove(output)
    
        with open(source, 'w', encoding='utf-8') as fh:
            fh.write(data_test)
    
    
        ret = run(build_args, cwd=build_cwd, quiet=expect_failure)
    
        if ret == 0:
            output_bytes_test = file_as_bytes(output)
            if (output_bytes is None) or (file_as_bytes(output) == output_bytes):
                if not keep_edits:
                    with open(source, 'w', encoding='utf-8') as fh:
                        fh.write(data)
                return True
            else:
    
                if VERBOSE_EDIT_ACTION:
                    print("Changed code, skip...", hex(hash(output_bytes)), hex(hash(output_bytes_test)))
    
        else:
            if not expect_failure:
    
                if VERBOSE_EDIT_ACTION:
                    print("Failed to compile, skip...")
    
    
        with open(source, 'w', encoding='utf-8') as fh:
            fh.write(data)
        return False
    
    
    # -----------------------------------------------------------------------------
    # List Fix Functions
    
    
    def edit_function_get_all() -> List[str]:
    
        fixes = []
    
        for name in dir(edit_generators):
            value = getattr(edit_generators, name)
            if type(value) is type and issubclass(value, EditGenerator):
                fixes.append(name)
    
        fixes.sort()
        return fixes
    
    
    
    def edit_class_from_id(name: str) -> Type[EditGenerator]:
        result = getattr(edit_generators, name)
    
        assert issubclass(result, EditGenerator)
    
        # MYPY 0.812 doesn't recognize the assert above.
        return result  # type: ignore
    
    
    
    # -----------------------------------------------------------------------------
    # Accept / Reject Edits
    
    
    def apply_edit(data: str, text_to_replace: str, start: int, end: int, *, verbose: bool) -> str:
    
        if verbose:
            line_before = line_from_span(data, start, end)
    
        data = data[:start] + text_to_replace + data[end:]
    
        if verbose:
            end += len(text_to_replace) - (end - start)
            line_after = line_from_span(data, start, end)
    
            print("")
            print("Testing edit:")
            print(line_before)
            print(line_after)
    
        return data
    
    
    
    def wash_source_with_edits(
            source: str,
            output: str,
            build_args: Sequence[str],
            build_cwd: Optional[str],
            edit_to_apply: str,
            skip_test: bool,
            shared_edit_data: Any,
    ) -> None:
    
        # build_args = build_args + " -Werror=duplicate-decl-specifier"
        with open(source, 'r', encoding='utf-8') as fh:
            data = fh.read()
    
        edit_generator_class = edit_class_from_id(edit_to_apply)
    
        # After performing all edits, store the result in this set.
        #
        # This is a heavy solution that guarantees edits never oscillate between
        # multiple states, so re-visiting a previously visited state will always exit.
    
    
        # When overlapping edits are found, keep attempting edits.
        edit_again = True
        while edit_again:
            edit_again = False
    
            edits = edit_generator_class.edit_list_from_file(source, data, shared_edit_data)
            # Sort by span, in a way that tries shorter spans first
            # This is more efficient when testing multiple overlapping edits,
            # since when a smaller edit succeeds, it's less likely to have to try as many edits that span wider ranges.
            # (This applies to `use_function_style_cast`).
            edits.sort(reverse=True, key=lambda edit: (edit.span[0], -edit.span[1]))
            if not edits:
                return
    
            if skip_test:
                # Just apply all edits.
                for (start, end), text, _text_always_fail, _extra_build_args in edits:
                    data = apply_edit(data, text, start, end, verbose=VERBOSE)
    
                with open(source, 'w', encoding='utf-8') as fh:
                    fh.write(data)
    
                source, output, None, build_args, build_cwd, data, data,
    
                keep_edits=False,
            )
            if not os.path.exists(output):
    
                # raise Exception("Failed to produce output file: " + output)
    
                # NOTE(@campbellbarton): This fails very occasionally and needs to be investigated why.
                # For now skip, as it's disruptive to force-quit in the middle of all other changes.
    
                print("Failed to produce output file, skipping:", repr(output))
    
    
            output_bytes = file_as_bytes(output)
            # Dummy value that won't cause problems.
            edit_prev_start = len(data) + 1
    
            for (start, end), text, text_always_fail, extra_build_args in edits:
                if end >= edit_prev_start:
                    # Run the edits again, in case this would have succeeded,
                    # but was skipped due to edit-overlap.
                    edit_again = True
                    continue
                build_args_for_edit = build_args
                if extra_build_args:
                    # Add directly after the compile command.
    
                    build_args_for_edit = build_args[:1] + extra_build_args + build_args[1:]
    
    
                data_test = apply_edit(data, text, start, end, verbose=VERBOSE)
                if test_edit(
    
                        source, output, output_bytes, build_args_for_edit, build_cwd, data, data_test,
    
                        keep_edits=False,
                ):
                    # This worked, check if the change would fail if replaced with 'text_always_fail'.
                    data_test_always_fail = apply_edit(data, text_always_fail, start, end, verbose=False)
                    if test_edit(
    
                            source, output, output_bytes, build_args_for_edit, build_cwd, data, data_test_always_fail,
    
                            expect_failure=True, keep_edits=False,
                    ):
                        if VERBOSE_EDIT_ACTION:
                            print("Edit at", (start, end), "doesn't fail, assumed to be ifdef'd out, continuing")
                        continue
    
                    # Apply the edit.
                    data = data_test
                    with open(source, 'w', encoding='utf-8') as fh:
                        fh.write(data)
    
                    # Update the last successful edit, the end of the next edit must not overlap this one.
                    edit_prev_start = start
    
            # Finished applying `edits`, check if further edits should be applied.
            if edit_again:
                data_states_len = len(data_states)
                data_states.add(data)
                if data_states_len == len(data_states):
                    # Avoid the *extremely* unlikely case that edits re-visit previously visited states.
                    edit_again = False
                else:
                    # It is interesting to know how many passes run when debugging.
                    # print("Passes for: ", source, len(data_states))
                    pass
    
    
    # -----------------------------------------------------------------------------
    # Edit Source Code From Args
    
    
    def run_edits_on_directory(
            build_dir: str,
            regex_list: List[re.Pattern[str]],
            edit_to_apply: str,
            skip_test: bool = False,
    ) -> int:
    
        # currently only supports ninja or makefiles
        build_file_ninja = os.path.join(build_dir, "build.ninja")
        build_file_make = os.path.join(build_dir, "Makefile")
        if os.path.exists(build_file_ninja):
            print("Using Ninja")
            args = find_build_args_ninja(build_dir)
        elif os.path.exists(build_file_make):
            print("Using Make")
            args = find_build_args_make(build_dir)
        else:
            sys.stderr.write(
                "Can't find Ninja or Makefile (%r or %r), aborting" %
                (build_file_ninja, build_file_make)
            )
    
    
        if args is None:
            # Error will have been reported.
            return 1
    
    
        # needed for when arguments are referenced relatively
        os.chdir(build_dir)
    
        # Weak, but we probably don't want to handle extern.
        # this limit could be removed.
        source_paths = (
            os.path.join("intern", "ghost"),
            os.path.join("intern", "guardedalloc"),
            os.path.join("source"),
        )
    
    
        def split_build_args_with_cwd(build_args_str: str) -> Tuple[Sequence[str], Optional[str]]:
    
            import shlex
    
            build_args = shlex.split(build_args_str)
    
            cwd = None
            if len(build_args) > 3:
                if build_args[0] == "cd" and build_args[2] == "&&":
                    cwd = build_args[1]
                    del build_args[0:3]
            return build_args, cwd
    
        def output_from_build_args(build_args: Sequence[str], cwd: Optional[str]) -> str:
            i = build_args.index("-o")
            # Assume the output is a relative path is a CWD was set.
            if cwd:
                return os.path.join(cwd, build_args[i + 1])
            return build_args[i + 1]
    
        def test_path(c: str) -> bool:
    
            # Skip any generated source files (files in the build directory).
            if os.path.abspath(c).startswith(build_dir):
                return False
            # Raise an exception since this should never happen,
            # we want to know about it early if it does, as it will cause failure
    
    Campbell Barton's avatar
    Campbell Barton committed
            # when attempting to compile the missing file.
    
            if not os.path.exists(c):
                raise Exception("Missing source file: " + c)
    
    
            for source_path in source_paths:
                index = c.rfind(source_path)
    
                if index != -1:
                    # Remove first part of the path, we don't want to match
                    # against paths in Blender's repo.
    
                    c_strip = c[index:]
                    for regex in regex_list:
                        if regex.match(c_strip) is not None:
                            return True
            return False
    
        # Filter out build args.
        args_orig_len = len(args)
    
        args_with_cwd = [
            (c, *split_build_args_with_cwd(build_args_str))
            for (c, build_args_str) in args
    
            if test_path(c)
        ]
    
        del args
        print("Operating on %d of %d files..." % (len(args_with_cwd), args_orig_len))
        for (c, build_args, build_cwd) in args_with_cwd:
    
            print(" ", c)
        del args_orig_len
    
    
        edit_generator_class = edit_class_from_id(edit_to_apply)
    
        shared_edit_data = edit_generator_class.setup()
    
        try:
            if USE_MULTIPROCESS:
    
                args_expanded = [(
                    c,
                    output_from_build_args(build_args, build_cwd),
                    build_args,
                    build_cwd,
                    edit_to_apply,
                    skip_test,
                    shared_edit_data,
                ) for (c, build_args, build_cwd) in args_with_cwd]
    
                import multiprocessing
                job_total = multiprocessing.cpu_count()
                pool = multiprocessing.Pool(processes=job_total * 2)
    
                pool.starmap(wash_source_with_edits, args_expanded)
    
                del args_expanded
    
                for c, build_args, build_cwd in args_with_cwd:
    
                    wash_source_with_edits(
    
                        c,
                        output_from_build_args(build_args, build_cwd),
                        build_args,
                        build_cwd,
                        edit_to_apply,
                        skip_test,
                        shared_edit_data,
    
        except Exception as ex:
            raise ex
        finally:
            edit_generator_class.teardown(shared_edit_data)
    
    def create_parser() -> argparse.ArgumentParser:
    
        edits_all = edit_function_get_all()
    
        # Create docstring for edits.
        edits_all_docs = []
        for edit in edits_all:
            edits_all_docs.append(
                "  %s\n%s" % (
                    edit,
                    indent(dedent(getattr(edit_generators, edit).__doc__ or '').strip('\n') + '\n', '    '),
                )
            )
    
    
        parser = argparse.ArgumentParser(
            description=__doc__,
            formatter_class=argparse.RawTextHelpFormatter,
        )
        parser.add_argument(
            "build_dir",
            help="list of files or directories to check",
        )
        parser.add_argument(
            "--match",
            nargs='+',
            required=True,
            metavar="REGEX",
            help="Match file paths against this expression",
        )
        parser.add_argument(
    
            "--edit",
            dest="edit",
            choices=edits_all,
    
            help="Specify the edit preset to run.\n\n" + "\n".join(edits_all_docs) + "\n",
    
            required=True,
        )
        parser.add_argument(
            "--skip-test",
            dest="skip_test",
            default=False,
            action='store_true',
            help=(
                "Perform all edits without testing if they perform functional changes. "
                "Use to quickly preview edits, or to perform edits which are manually checked (default=False)"
            ),
            required=False,
        )
    
        return parser
    
    
    
    def main() -> int:
    
        parser = create_parser()
        args = parser.parse_args()
    
        build_dir = args.build_dir
        regex_list = []
    
    
        for expr in args.match:
    
            try:
                regex_list.append(re.compile(expr))
            except Exception as ex:
                print(f"Error in expression: {expr}\n  {ex}")
                return 1
    
    
        return run_edits_on_directory(build_dir, regex_list, args.edit, args.skip_test)
    
    
    
    if __name__ == "__main__":
        sys.exit(main())