| # Copyright 2013 The Servo Project Developers. See the COPYRIGHT |
| # file at the top-level directory of this distribution. |
| # |
| # Licensed under the Apache License, Version 2.0 <LICENSE-APACHE or |
| # http://www.apache.org/licenses/LICENSE-2.0> or the MIT license |
| # <LICENSE-MIT or http://opensource.org/licenses/MIT>, at your |
| # option. This file may not be copied, modified, or distributed |
| # except according to those terms. |
| |
| import configparser |
| import fnmatch |
| import glob |
| import io |
| import itertools |
| import json |
| import os |
| import re |
| import subprocess |
| import sys |
| from dataclasses import dataclass |
| from typing import Any, TypedDict, LiteralString |
| from collections.abc import Iterator, Callable |
| import types |
| |
| import colorama |
| import toml |
| import wpt.manifestupdate |
| |
| from .licenseck import APACHE, COPYRIGHT, MPL, OLD_MPL, licenses_toml |
| from .linting_report import GitHubAnnotationManager |
| |
| TOPDIR = os.path.abspath(os.path.dirname(sys.argv[0])) |
| WPT_PATH = os.path.join(".", "tests", "wpt") |
| CONFIG_FILE_PATH = os.path.join(".", "servo-tidy.toml") |
| WPT_CONFIG_INI_PATH = os.path.join(WPT_PATH, "config.ini") |
| # regex source https://stackoverflow.com/questions/6883049/ |
| URL_REGEX = re.compile(rb"https?://(?:[-\w.]|(?:%[\da-fA-F]{2}))+") |
| UTF8_URL_REGEX = re.compile(r"https?://(?:[-\w.]|(?:%[\da-fA-F]{2}))+") |
| CARGO_LOCK_FILE = os.path.join(TOPDIR, "Cargo.lock") |
| CARGO_DENY_CONFIG_FILE = os.path.join(TOPDIR, "deny.toml") |
| |
| ERROR_RAW_URL_IN_RUSTDOC = "Found raw link in rustdoc. Please escape it with angle brackets or use a markdown link." |
| |
| sys.path.append(os.path.join(WPT_PATH, "tests")) |
| sys.path.append(os.path.join(WPT_PATH, "tests", "tools", "wptrunner")) |
| |
| CheckingFunction = Callable[[str, bytes], Iterator[tuple[int, str]]] |
| LineCheckingFunction = Callable[[str, list[bytes]], Iterator[tuple[int, str]]] |
| |
| IgnoreConfig = TypedDict( |
| "IgnoreConfig", |
| { |
| "files": list[str], |
| "directories": list[str], |
| "packages": list[str], |
| }, |
| ) |
| |
| Config = TypedDict( |
| "Config", |
| { |
| "skip-check-licenses": bool, |
| "lint-scripts": list, |
| "blocked-packages": dict[str, Any], |
| "ignore": IgnoreConfig, |
| "check_ext": dict[str, Any], |
| }, |
| ) |
| |
| config: Config = { |
| "skip-check-licenses": False, |
| "lint-scripts": [], |
| "blocked-packages": {}, |
| "ignore": { |
| "files": [ |
| os.path.join(".", "."), # ignore hidden files |
| ], |
| "directories": [ |
| os.path.join(".", "."), # ignore hidden directories |
| ], |
| "packages": [], |
| }, |
| "check_ext": {}, |
| } |
| |
| COMMENTS = [b"// ", b"# ", b" *", b"/* "] |
| |
| # File patterns to include in the non-WPT tidy check. |
| FILE_PATTERNS_TO_CHECK = [ |
| "*.rs", |
| "*.rc", |
| "*.cpp", |
| "*.c", |
| "*.h", |
| "*.py", |
| "*.sh", |
| "*.toml", |
| "*.webidl", |
| "*.json", |
| "*.html", |
| ] |
| |
| # File patterns that are ignored for all tidy and lint checks. |
| FILE_PATTERNS_TO_IGNORE = ["*.#*", "*.pyc", "fake-ld.sh", "*.ogv", "*.webm"] |
| |
| SPEC_BASE_PATH = "components/script/dom/" |
| |
| WEBIDL_STANDARDS = [ |
| b"//www.khronos.org/registry/webgl/extensions", |
| b"//www.khronos.org/registry/webgl/specs", |
| b"//developer.mozilla.org/en-US/docs/Web/API", |
| b"//dev.w3.org/2006/webapi", |
| b"//dev.w3.org/csswg", |
| b"//dev.w3.org/fxtf", |
| b"//dvcs.w3.org/hg", |
| b"//www.w3.org/TR/trusted-types/", |
| b"//www.w3.org/TR/credential-management", |
| b"//dom.spec.whatwg.org", |
| b"//drafts.csswg.org", |
| b"//drafts.css-houdini.org", |
| b"//drafts.fxtf.org", |
| b"//console.spec.whatwg.org", |
| b"//encoding.spec.whatwg.org", |
| b"//fetch.spec.whatwg.org", |
| b"//html.spec.whatwg.org", |
| b"//streams.spec.whatwg.org", |
| b"//url.spec.whatwg.org", |
| b"//urlpattern.spec.whatwg.org", |
| b"//xhr.spec.whatwg.org", |
| b"//w3c.github.io", |
| b"//heycam.github.io/webidl", |
| b"//webbluetoothcg.github.io/web-bluetooth/", |
| b"//svgwg.org/svg2-draft", |
| b"//wicg.github.io", |
| b"//webaudio.github.io", |
| b"//immersive-web.github.io/", |
| b"//github.com/immersive-web/webxr-test-api/", |
| b"//github.com/immersive-web/webxr-hands-input/", |
| b"//gpuweb.github.io", |
| b"//notifications.spec.whatwg.org", |
| b"//testutils.spec.whatwg.org/", |
| b"//cookiestore.spec.whatwg.org/", |
| # Not a URL |
| b"// This interface is entirely internal to Servo, and should not be" + b" accessible to\n// web pages.", |
| ] |
| |
| |
| def is_iter_empty(iterator: Iterator[str]) -> tuple[bool, Iterator[str]]: |
| try: |
| obj = next(iterator) |
| return True, itertools.chain((obj,), iterator) |
| except StopIteration: |
| return False, iterator |
| |
| |
| def relative_path(path: str) -> str: |
| return os.path.relpath(os.path.abspath(path), TOPDIR) |
| |
| |
| def normalize_paths(paths: list[str] | str) -> list[str] | str: |
| if isinstance(paths, str): |
| return os.path.join(*paths.split("/")) |
| else: |
| return [os.path.join(*path.split("/")) for path in paths] |
| |
| |
| # A simple wrapper for iterators to show progress |
| # (Note that it's inefficient for giant iterators, since it iterates once to get the upper bound) |
| def progress_wrapper(iterator: Iterator[str]) -> Iterator[str]: |
| list_of_stuff = list(iterator) |
| total_files, progress = len(list_of_stuff), 0 |
| for idx, thing in enumerate(list_of_stuff): |
| progress = int(float(idx + 1) / total_files * 100) |
| sys.stdout.write("\r Progress: %s%% (%d/%d)" % (progress, idx + 1, total_files)) |
| sys.stdout.flush() |
| yield thing |
| |
| |
| def git_changes_since_last_merge(path: str) -> list[str] | str: |
| args = ["git", "log", "-n1", "--committer", "noreply@github.com", "--format=%H"] |
| last_merge = subprocess.check_output(args, universal_newlines=True).strip() |
| if not last_merge: |
| return [] |
| |
| args = ["git", "diff", "--name-only", last_merge, path] |
| file_list = normalize_paths(subprocess.check_output(args, universal_newlines=True).splitlines()) |
| |
| return file_list |
| |
| |
| class FileList(object): |
| directory: str |
| excluded: list[str] |
| generator: Iterator[str] |
| |
| def __init__( |
| self, directory: str, only_changed_files: bool = False, exclude_dirs: list[str] = [], progress: bool = True |
| ) -> None: |
| self.directory = directory |
| self.excluded = exclude_dirs |
| self.generator = self._filter_excluded() if exclude_dirs else self._default_walk() |
| if only_changed_files: |
| self.generator = self._git_changed_files() |
| if progress: |
| self.generator = progress_wrapper(self.generator) |
| |
| def _default_walk(self) -> Iterator[str]: |
| for root, _, files in os.walk(self.directory): |
| for f in files: |
| yield os.path.join(root, f) |
| |
| def _git_changed_files(self) -> Iterator[str]: |
| file_list = git_changes_since_last_merge(self.directory) |
| if not file_list: |
| return |
| for f in file_list: |
| if not any(os.path.join(".", os.path.dirname(f)).startswith(path) for path in self.excluded): |
| yield os.path.join(".", f) |
| |
| def _filter_excluded(self) -> Iterator[str]: |
| for root, dirs, files in os.walk(self.directory, topdown=True): |
| # modify 'dirs' in-place so that we don't do unnecessary traversals in excluded directories |
| dirs[:] = [d for d in dirs if not any(os.path.join(root, d).startswith(name) for name in self.excluded)] |
| for rel_path in files: |
| yield os.path.join(root, rel_path) |
| |
| def __iter__(self) -> Iterator[str]: |
| return self.generator |
| |
| def next(self) -> str: |
| return next(self.generator) |
| |
| |
| def filter_file(file_name: str) -> bool: |
| current_file = os.path.join(".", relative_path(file_name)) |
| if any(current_file.startswith(ignored_file) for ignored_file in config["ignore"]["files"]): |
| return False |
| base_name = os.path.basename(file_name) |
| if any(fnmatch.fnmatch(base_name, pattern) for pattern in FILE_PATTERNS_TO_IGNORE): |
| return False |
| return True |
| |
| |
| def filter_files(start_dir: str, only_changed_files: bool, progress: bool) -> Iterator[str]: |
| file_iter = FileList( |
| start_dir, |
| only_changed_files=only_changed_files, |
| exclude_dirs=config["ignore"]["directories"], |
| progress=progress, |
| ) |
| |
| for file_name in iter(file_iter): |
| base_name = os.path.basename(file_name) |
| if not any(fnmatch.fnmatch(base_name, pattern) for pattern in FILE_PATTERNS_TO_CHECK): |
| continue |
| if not filter_file(file_name): |
| continue |
| yield file_name |
| |
| |
| def uncomment(line: bytes) -> bytes: |
| for c in COMMENTS: |
| if line.startswith(c): |
| if line.endswith(b"*/"): |
| return line[len(c) : (len(line) - 3)].strip() |
| return line[len(c) :].strip() |
| return line |
| |
| |
| def is_apache_licensed(header: str) -> bool: |
| if "SPDX-License-Identifier: Apache-2.0 OR MIT" in header: |
| return True |
| |
| if APACHE in header: |
| return any(c in header for c in COPYRIGHT) |
| |
| return False |
| |
| |
| def check_license(file_name: str, lines: list[bytes]) -> Iterator[tuple[int, str]]: |
| if any(file_name.endswith(ext) for ext in (".toml", ".lock", ".json", ".html")) or config["skip-check-licenses"]: |
| return |
| |
| if lines[0].startswith(b"#!") and lines[1].strip(): |
| yield (1, "missing blank line after shebang") |
| |
| blank_lines = 0 |
| max_blank_lines = 2 if lines[0].startswith(b"#!") else 1 |
| license_block = [] |
| |
| for line in lines: |
| line = line.rstrip(b"\n") |
| if not line.strip(): |
| blank_lines += 1 |
| if blank_lines >= max_blank_lines: |
| break |
| continue |
| line = uncomment(line) |
| if line is not None: |
| license_block.append(line) |
| |
| header = (b" ".join(license_block)).decode("utf-8") |
| valid_license = OLD_MPL in header or MPL in header or is_apache_licensed(header) |
| acknowledged_bad_license = "xfail-license" in header |
| if not (valid_license or acknowledged_bad_license): |
| yield (1, "incorrect license") |
| |
| |
| def check_modeline(file_name: str, lines: list[bytes]) -> Iterator[tuple[int, str]]: |
| for idx, line in enumerate(lines[:5]): |
| if re.search(b"^.*[ \t](vi:|vim:|ex:)[ \t]", line): |
| yield (idx + 1, "vi modeline present") |
| elif re.search(rb"-\*-.*-\*-", line, re.IGNORECASE): |
| yield (idx + 1, "emacs file variables present") |
| |
| |
| def contains_url(line: bytes) -> bool: |
| return bool(URL_REGEX.search(line)) |
| |
| |
| def is_unsplittable(file_name: str, line: bytes) -> bool: |
| return contains_url(line) or file_name.endswith(".rs") and line.startswith(b"use ") and b"{" not in line |
| |
| |
| def check_whatwg_specific_url(idx: int, line: bytes) -> Iterator[tuple[int, str]]: |
| match = re.search(rb"https://html\.spec\.whatwg\.org/multipage/[\w-]+\.html#([\w\'\:-]+)", line) |
| if match is not None: |
| preferred_link = "https://html.spec.whatwg.org/multipage/#{}".format(match.group(1).decode("utf-8")) |
| yield (idx + 1, "link to WHATWG may break in the future, use this format instead: {}".format(preferred_link)) |
| |
| |
| def check_whatwg_single_page_url(idx: int, line: bytes) -> Iterator[tuple[int, str]]: |
| match = re.search(rb"https://html\.spec\.whatwg\.org/#([\w\'\:-]+)", line) |
| if match is not None: |
| preferred_link = "https://html.spec.whatwg.org/multipage/#{}".format(match.group(1).decode("utf-8")) |
| yield (idx + 1, "links to WHATWG single-page url, change to multi page: {}".format(preferred_link)) |
| |
| |
| def check_whitespace(idx: int, line: bytes) -> Iterator[tuple[int, str]]: |
| if line.endswith(b"\n"): |
| line = line[:-1] |
| else: |
| yield (idx + 1, "no newline at EOF") |
| |
| if line.endswith(b" "): |
| yield (idx + 1, "trailing whitespace") |
| |
| if b"\t" in line: |
| yield (idx + 1, "tab on line") |
| |
| if b"\r" in line: |
| yield (idx + 1, "CR on line") |
| |
| |
| def check_for_raw_urls_in_rustdoc(file_name: str, idx: int, line: bytes) -> Iterator[tuple[int, str]]: |
| """Check that rustdoc comments in Rust source code do not have raw URLs. These appear |
| as warnings when rustdoc is run. rustdoc warnings could be made fatal, but adding this |
| check as part of tidy catches this common problem without having to run rustdoc for all |
| of Servo.""" |
| if not file_name.endswith(".rs"): |
| return |
| |
| if b"///" not in line and b"//!" not in line: |
| return |
| |
| # Types of URLS that are allowed: |
| # - A URL surrounded by angle or square brackets. |
| # - A markdown link. |
| # - A URL as part of a markdown definition identifer. |
| # [link text]: https://example.com |
| match = URL_REGEX.search(line) |
| if match and ( |
| not line[match.start() - 1 :].startswith(b"<") |
| and not line[match.start() - 1 :].startswith(b"[") |
| and not line[match.start() - 2 :].startswith(b"](") |
| and not line[match.start() - 3 :].startswith(b"]: ") |
| ): |
| yield (idx + 1, ERROR_RAW_URL_IN_RUSTDOC) |
| |
| |
| def check_by_line(file_name: str, lines: list[bytes]) -> Iterator[tuple[int, str]]: |
| for idx, line in enumerate(lines): |
| errors = itertools.chain( |
| check_whitespace(idx, line), |
| check_whatwg_specific_url(idx, line), |
| check_whatwg_single_page_url(idx, line), |
| check_for_raw_urls_in_rustdoc(file_name, idx, line), |
| ) |
| |
| for error in errors: |
| yield error |
| |
| |
| def check_ruff_lints() -> Iterator[tuple[str, int, str]]: |
| try: |
| args = ["ruff", "check", "--output-format", "json"] |
| subprocess.check_output(args, universal_newlines=True) |
| except subprocess.CalledProcessError as e: |
| for error in json.loads(e.output): |
| yield ( |
| os.path.join(".", os.path.relpath(error["filename"])), |
| error["location"]["row"], |
| f"[{error['code']}] {error['message']} ({error['url']})", |
| ) |
| |
| |
| @dataclass |
| class PyreflyDiagnostic: |
| """ |
| Represents a single diagnostic error reported by Pyrefly. |
| """ |
| |
| line: int |
| column: int |
| stop_line: int |
| stop_column: int |
| path: str |
| code: int |
| name: str |
| description: str |
| concise_description: str |
| |
| |
| def run_python_type_checker() -> Iterator[tuple[str, int, str]]: |
| print("\r ➤ Checking type annotations in python files ...") |
| try: |
| result = subprocess.run(["pyrefly", "check", "--output-format", "json"], capture_output=True) |
| parsed_json = json.loads(result.stdout) |
| errors = parsed_json.get("errors", []) |
| except subprocess.CalledProcessError as error: |
| print(f"{colorama.Fore.YELLOW}{error}{colorama.Style.RESET_ALL}") |
| pass |
| else: |
| for error in errors: |
| diagnostic = PyreflyDiagnostic(**error) |
| yield relative_path(diagnostic.path), diagnostic.line, diagnostic.concise_description |
| |
| |
| def run_cargo_deny_lints() -> Iterator[tuple[str, int, str]]: |
| print("\r ➤ Running `cargo-deny` checks...") |
| result = subprocess.run( |
| ["cargo-deny", "--format=json", "--all-features", "check"], encoding="utf-8", capture_output=True |
| ) |
| assert result.stderr is not None, "cargo deny should return error information via stderr when failing" |
| |
| errors = [] |
| for line in result.stderr.splitlines(): |
| error_fields = json.loads(str(line))["fields"] |
| error_code = error_fields.get("code", "unknown") |
| error_severity = error_fields.get("severity", "unknown") |
| message = error_fields.get("message", "") |
| labels = error_fields.get("labels", []) |
| |
| span = "" |
| line = 1 |
| if len(labels) > 0: |
| span = labels[0]["span"] |
| line = labels[0]["line"] |
| |
| # This is an effort to detect the license failures, so that we can print |
| # a more helpful message -- which normally does not include the license |
| # name. |
| if error_code == "rejected": |
| crate = CargoDenyKrate(error_fields["graphs"][0]) |
| license_name = error_fields["notes"][0] |
| errors.append((CARGO_LOCK_FILE, 1, f'Rust dependency {crate}: Rejected license "{license_name}"')) |
| # This detects if a crate has been marked as banned in the configuration file. |
| elif error_code == "banned": |
| crate = CargoDenyKrate(error_fields["graphs"][0]) |
| parents = ", ".join([str(parent) for parent in crate.parents]) |
| errors.append((CARGO_LOCK_FILE, 1, f"{message}: used by ({parents})")) |
| # This detects when two version of a crate have been used, but are not skipped |
| # by the configuration file. |
| elif error_code == "duplicate": |
| errors.append((CARGO_LOCK_FILE, 1, f"{message}:\n {span}")) |
| # This detects any other problem, typically caused by an unnecessary exception |
| # in the deny.toml file. |
| elif error_severity in ["warning", "error"]: |
| errors.append((CARGO_DENY_CONFIG_FILE, line, f"{message}: {span}")) |
| |
| for error in errors: |
| yield error |
| |
| |
| def check_toml(file_name: str, lines: list[bytes]) -> Iterator[tuple[int, str]]: |
| if not file_name.endswith("Cargo.toml"): |
| return |
| ok_licensed = False |
| for idx, line in enumerate(map(lambda line: line.decode("utf-8"), lines)): |
| if idx == 0 and "[workspace]" in line: |
| return |
| line_without_comment, _, _ = line.partition("#") |
| if line_without_comment.find("*") != -1: |
| yield (idx + 1, "found asterisk instead of minimum version number") |
| for license_line in licenses_toml: |
| ok_licensed |= license_line in line |
| if "license.workspace" in line: |
| ok_licensed = True |
| if not ok_licensed: |
| yield (0, ".toml file should contain a valid license.") |
| |
| |
| def check_shell(file_name: str, lines: list[bytes]) -> Iterator[tuple[int, str]]: |
| if not file_name.endswith(".sh"): |
| return |
| |
| shebang = "#!/usr/bin/env bash" |
| required_options = ["set -o errexit", "set -o nounset", "set -o pipefail"] |
| |
| did_shebang_check = False |
| |
| if not lines: |
| yield (0, "script is an empty file") |
| return |
| |
| if lines[0].rstrip() != shebang.encode("utf-8"): |
| yield (1, 'script does not have shebang "{}"'.format(shebang)) |
| |
| for idx, line in enumerate(map(lambda line: line.decode("utf-8"), lines[1:])): |
| stripped = line.rstrip() |
| # Comments or blank lines are ignored. (Trailing whitespace is caught with a separate linter.) |
| if line.startswith("#") or stripped == "": |
| continue |
| |
| if not did_shebang_check: |
| if stripped in required_options: |
| required_options.remove(stripped) |
| else: |
| # The first non-comment, non-whitespace, non-option line is the first "real" line of the script. |
| # The shebang, options, etc. must come before this. |
| if required_options: |
| formatted = ['"{}"'.format(opt) for opt in required_options] |
| yield (idx + 1, "script is missing options {}".format(", ".join(formatted))) |
| did_shebang_check = True |
| |
| if "`" in stripped: |
| yield (idx + 1, "script should not use backticks for command substitution") |
| |
| if " [ " in stripped or stripped.startswith("[ "): |
| yield (idx + 1, "script should use `[[` instead of `[` for conditional testing") |
| |
| for dollar in re.finditer(r"\$", stripped): |
| next_idx = dollar.end() |
| if next_idx < len(stripped): |
| next_char = stripped[next_idx] |
| if not (next_char == "{" or next_char == "("): |
| yield (idx + 1, 'variable substitutions should use the full "${VAR}" form') |
| |
| |
| def check_rust(file_name: str, lines: list[bytes]) -> Iterator[tuple[int, str]]: |
| if ( |
| not file_name.endswith(".rs") |
| or file_name.endswith(".mako.rs") |
| or file_name.endswith(os.path.join("style", "build.rs")) |
| or file_name.endswith(os.path.join("unit", "style", "stylesheets.rs")) |
| ): |
| return |
| |
| for idx, line in enumerate(map(lambda line: line.decode("utf-8"), lines)): |
| for match in re.finditer(r"(;|\s|^)//\w", line): |
| yield (idx + 1, "Comments starting with `//` should also include a space") |
| |
| line = re.sub(r"//.*?$|/\*.*?$|^\*.*?$", "//", line) |
| rules = [ |
| # There should be any use of banned types: |
| # Cell<JSVal>, Cell<Dom<T>>, DomRefCell<Dom<T>>, DomRefCell<HEAP<T>> |
| (r"(\s|:)+Cell<JSVal>", "Banned type Cell<JSVal> detected. Use MutDom<JSVal> instead"), |
| (r"(\s|:)+Cell<Dom<.+>>", "Banned type Cell<Dom<T>> detected. Use MutDom<T> instead"), |
| (r"DomRefCell<Dom<.+>>", "Banned type DomRefCell<Dom<T>> detected. Use MutDom<T> instead"), |
| (r"DomRefCell<Heap<.+>>", "Banned type DomRefCell<Heap<T>> detected. Use MutDom<T> instead"), |
| # No benefit to using &Root<T> |
| (r": &Root<", "use &T instead of &Root<T>"), |
| (r": &DomRoot<", "use &T instead of &DomRoot<T>"), |
| ] |
| |
| for pattern, message in rules: |
| for match in re.finditer(pattern, line): |
| yield (idx + 1, message.format(*match.groups(), **match.groupdict())) |
| |
| |
| def check_webidl_spec(file_name: str, contents: bytes) -> Iterator[tuple[int, str]]: |
| # Sorted by this function (in pseudo-Rust). The idea is to group the same |
| # organization together. |
| # fn sort_standards(a: &Url, b: &Url) -> Ordering { |
| # let a_domain = a.domain().split("."); |
| # a_domain.pop(); |
| # a_domain.reverse(); |
| # let b_domain = b.domain().split("."); |
| # b_domain.pop(); |
| # b_domain.reverse(); |
| # for i in a_domain.into_iter().zip(b_domain.into_iter()) { |
| # match i.0.cmp(b.0) { |
| # Less => return Less, |
| # Greater => return Greater, |
| # _ => (), |
| # } |
| # } |
| # a_domain.path().cmp(b_domain.path()) |
| # } |
| |
| if not file_name.endswith(".webidl"): |
| return |
| |
| for i in WEBIDL_STANDARDS: |
| if contents.find(i) != -1: |
| return |
| yield (0, "No specification link found.") |
| |
| |
| def check_that_manifests_exist() -> Iterator[tuple[str, int, str]]: |
| # Determine the metadata and test directories from the configuration file. |
| metadata_dirs = [] |
| config = configparser.ConfigParser() |
| config.read(WPT_CONFIG_INI_PATH) |
| for key in config: |
| if key.startswith("manifest:"): |
| metadata_dirs.append(os.path.join("./tests/wpt/", config[key]["metadata"])) |
| |
| for directory in metadata_dirs: |
| manifest_path = os.path.join(TOPDIR, directory, "MANIFEST.json") |
| if not os.path.isfile(manifest_path): |
| yield (WPT_CONFIG_INI_PATH, 0, f"Path in config was not found: {manifest_path}") |
| |
| |
| def check_that_manifests_are_clean() -> Iterator[tuple[str, int, str]]: |
| from wptrunner import wptlogging |
| |
| print("\r ➤ Checking WPT manifests for cleanliness...") |
| output_stream = io.StringIO("") |
| logger = wptlogging.setup({}, {"mach": output_stream}) |
| if wpt.manifestupdate.update(check_clean=True, logger=logger): |
| for line in output_stream.getvalue().splitlines(): |
| if "ERROR" in line: |
| yield (WPT_CONFIG_INI_PATH, 0, line) |
| yield (WPT_CONFIG_INI_PATH, 0, "WPT manifest is dirty. Run `./mach update-manifest`.") |
| |
| |
| def lint_wpt_test_files() -> Iterator[tuple[str, int, str]]: |
| from tools.lint import lint |
| |
| # Override the logging function so that we can collect errors from |
| # the lint script, which doesn't allow configuration of the output. |
| messages: list[str] = [] |
| assert lint.logger is not None |
| |
| def collect_messages(_: None, message: str) -> None: |
| messages.append(message) |
| |
| lint.logger.error = types.MethodType(collect_messages, lint.logger) |
| |
| # We do not lint all WPT-like tests because they do not all currently have |
| # lint.ignore files. |
| LINTED_SUITES = ["tests", os.path.join("mozilla", "tests")] |
| |
| for suite in LINTED_SUITES: |
| print(f"\r ➤ Linting WPT suite ({suite})...") |
| |
| messages = [] # Clear any old messages. |
| |
| suite_directory = os.path.abspath(os.path.join(WPT_PATH, suite)) |
| tests_changed = filter_files(suite_directory, only_changed_files=False, progress=False) |
| tests_changed = [os.path.relpath(file, suite_directory) for file in tests_changed] |
| |
| if lint.lint(suite_directory, tests_changed, output_format="normal"): |
| for message in messages: |
| (filename, message) = message.split(":", maxsplit=1) |
| yield (filename, 0, message) |
| |
| |
| def run_wpt_lints(only_changed_files: bool) -> Iterator[tuple[str, int, str]]: |
| if not os.path.exists(WPT_CONFIG_INI_PATH): |
| yield (WPT_CONFIG_INI_PATH, 0, f"{WPT_CONFIG_INI_PATH} is required but was not found") |
| return |
| |
| if only_changed_files: |
| try: |
| FileList("./tests/wpt", only_changed_files=only_changed_files, progress=False).next() |
| except StopIteration: |
| print("\r ➤ Skipping WPT lint checks, because no relevant files changed.") |
| return |
| |
| manifests_exist_errors = list(check_that_manifests_exist()) |
| if manifests_exist_errors: |
| yield from manifests_exist_errors |
| return |
| |
| yield from check_that_manifests_are_clean() |
| yield from lint_wpt_test_files() |
| |
| |
| def check_spec(file_name: str, lines: list[bytes]) -> Iterator[tuple[int, str]]: |
| if SPEC_BASE_PATH not in file_name: |
| return |
| file_name = os.path.relpath(os.path.splitext(file_name)[0], SPEC_BASE_PATH) |
| patt = re.compile(r"^\s*\/\/.+") |
| |
| # Pattern representing a line with a macro |
| macro_patt = re.compile(r"^\s*\S+!(.*)$") |
| |
| # Pattern representing a line with comment containing a spec link |
| link_patt = re.compile(r"^\s*///? (<https://.+>.*|https://.+)$") |
| |
| # Pattern representing a line with comment or attribute |
| comment_patt = re.compile(r"^\s*(///?.+|#\[.+\])$") |
| |
| brace_count = 0 |
| in_impl = False |
| pattern = "impl {}Methods<crate::DomTypeHolder> for {} {{".format(file_name, file_name) |
| |
| for idx, line in enumerate(map(lambda line: line.decode("utf-8"), lines)): |
| if "// check-tidy: no specs after this line" in line: |
| break |
| if not patt.match(line): |
| if pattern.lower() in line.lower(): |
| in_impl = True |
| if ("fn " in line or macro_patt.match(line)) and brace_count == 1: |
| for up_idx in range(1, idx + 1): |
| up_line = lines[idx - up_idx].decode("utf-8") |
| if link_patt.match(up_line): |
| # Comment with spec link exists |
| break |
| if not comment_patt.match(up_line): |
| # No more comments exist above, yield warning |
| yield (idx + 1, "method declared in webidl is missing a comment with a specification link") |
| break |
| if in_impl: |
| brace_count += line.count("{") |
| brace_count -= line.count("}") |
| if brace_count < 1: |
| break |
| |
| |
| def check_config_file(config_file: LiteralString, print_text: bool = True) -> Iterator[tuple[str, int, str]]: |
| # Check if config file exists |
| if not os.path.exists(config_file): |
| print("%s config file is required but was not found" % config_file) |
| sys.exit(1) |
| |
| # Load configs from servo-tidy.toml |
| with open(config_file) as content: |
| conf_file = content.read() |
| lines = conf_file.splitlines(True) |
| |
| if print_text: |
| print(f"\r ➤ Checking config file ({config_file})...") |
| |
| config_content = toml.loads(conf_file) |
| exclude = config_content.get("ignore", {}) |
| |
| # Check for invalid listed ignored directories |
| exclude_dirs = [d for p in exclude.get("directories", []) for d in (glob.glob(p) or [p])] |
| skip_dirs = ["./target", "./tests", "./support/crown/target"] |
| invalid_dirs = [d for d in exclude_dirs if not os.path.isdir(d) and not any(s in d for s in skip_dirs)] |
| |
| # Check for invalid listed ignored files |
| invalid_files = [f for f in exclude.get("files", []) if not os.path.exists(f)] |
| |
| current_table = "" |
| for idx, line in enumerate(lines): |
| # Ignore comment lines |
| if line.strip().startswith("#"): |
| continue |
| |
| # Check for invalid tables |
| if re.match(r"\[(.*?)\]", line.strip()): |
| table_name = re.findall(r"\[(.*?)\]", line)[0].strip() |
| if table_name not in ("configs", "blocked-packages", "ignore", "check_ext"): |
| yield config_file, idx + 1, "invalid config table [%s]" % table_name |
| current_table = table_name |
| continue |
| |
| # Print invalid listed ignored directories |
| if current_table == "ignore" and invalid_dirs: |
| for d in invalid_dirs: |
| if line.strip().strip("'\",") == d: |
| yield config_file, idx + 1, "ignored directory '%s' doesn't exist" % d |
| invalid_dirs.remove(d) |
| break |
| |
| # Print invalid listed ignored files |
| if current_table == "ignore" and invalid_files: |
| for f in invalid_files: |
| if line.strip().strip("'\",") == f: |
| yield config_file, idx + 1, "ignored file '%s' doesn't exist" % f |
| invalid_files.remove(f) |
| break |
| |
| # Skip if there is no equal sign in line, assuming it's not a key |
| if "=" not in line: |
| continue |
| |
| key = line.split("=")[0].strip() |
| |
| # Check for invalid keys inside [configs] and [ignore] table |
| if ( |
| current_table == "configs" |
| and key not in config |
| or current_table == "ignore" |
| and key not in config["ignore"] |
| # Any key outside of tables |
| or current_table == "" |
| ): |
| yield config_file, idx + 1, "invalid config key '%s'" % key |
| |
| # Parse config file |
| parse_config(config_content) |
| |
| |
| def parse_config(config_file: dict[str, Any]) -> None: |
| exclude = config_file.get("ignore", {}) |
| # Add list of ignored directories to config |
| ignored_directories = [d for p in exclude.get("directories", []) for d in (glob.glob(p) or [p])] |
| config["ignore"]["directories"] += normalize_paths(ignored_directories) |
| # Add list of ignored files to config |
| config["ignore"]["files"] += normalize_paths(exclude.get("files", [])) |
| # Add list of ignored packages to config |
| config["ignore"]["packages"] = exclude.get("packages", []) |
| |
| # Add dict of dir, list of expected ext to config |
| dirs_to_check = config_file.get("check_ext", {}) |
| # Fix the paths (OS-dependent) |
| for path, exts in dirs_to_check.items(): |
| # FIXME: Temporarily ignoring this since the type signature for |
| # `normalize_paths` must use a constrained type variable for this to |
| # typecheck but Pyrefly doesn't handle that correctly (but mypy does). |
| # pyrefly: ignore[bad-argument-type] |
| config["check_ext"][normalize_paths(path)] = exts |
| |
| # Add list of blocked packages |
| config["blocked-packages"] = config_file.get("blocked-packages", {}) |
| |
| # Override default configs |
| user_configs = config_file.get("configs", []) |
| |
| for pref in user_configs: |
| if pref in config: |
| # FIXME: Temporarily ignoring this since only Pyrefly raises an issue about the dynamic key |
| # pyrefly: ignore[missing-attribute] |
| config[pref] = user_configs[pref] |
| |
| |
| def check_directory_files(directories: dict[str, Any], print_text: bool = True) -> Iterator[tuple[str, int, str]]: |
| if print_text: |
| print("\r ➤ Checking directories for correct file extensions...") |
| for directory, file_extensions in directories.items(): |
| files = sorted(os.listdir(directory)) |
| for filename in files: |
| if not any(filename.endswith(ext) for ext in file_extensions): |
| details = {"name": os.path.basename(filename), "ext": ", ".join(file_extensions), "dir_name": directory} |
| message = """Unexpected extension found for {name}. \ |
| We only expect files with {ext} extensions in {dir_name}""".format(**details) |
| yield (filename, 1, message) |
| |
| |
| def collect_errors_for_files( |
| files_to_check: Iterator[str], |
| checking_functions: tuple[CheckingFunction, ...], |
| line_checking_functions: tuple[LineCheckingFunction, ...], |
| print_text: bool = True, |
| ) -> Iterator[tuple[str, int, str]]: |
| (has_element, files_to_check) = is_iter_empty(files_to_check) |
| if not has_element: |
| return |
| if print_text: |
| print("\r ➤ Checking files for tidiness...") |
| |
| for filename in files_to_check: |
| if not os.path.exists(filename): |
| continue |
| with open(filename, "rb") as f: |
| contents: bytes = f.read() |
| if not contents.strip(): |
| yield filename, 0, "file is empty" |
| continue |
| for check in checking_functions: |
| for error in check(filename, contents): |
| # the result will be: `(filename, line, message)` |
| yield (filename,) + error |
| lines: list[bytes] = contents.splitlines(True) |
| for check in line_checking_functions: |
| for error in check(filename, lines): |
| yield (filename,) + error |
| |
| |
| def scan(only_changed_files: bool = False, progress: bool = False, github_annotations: bool = False) -> int: |
| github_annotation_manager = GitHubAnnotationManager("test-tidy") |
| # check config file for errors |
| config_errors = check_config_file(CONFIG_FILE_PATH) |
| # check directories contain expected files |
| directory_errors = check_directory_files(config["check_ext"]) |
| # standard checks |
| files_to_check = filter_files(".", only_changed_files, progress) |
| checking_functions: tuple[CheckingFunction, ...] = (check_webidl_spec,) |
| line_checking_functions: tuple[LineCheckingFunction, ...] = ( |
| check_license, |
| check_by_line, |
| check_toml, |
| check_shell, |
| check_rust, |
| check_spec, |
| check_modeline, |
| ) |
| file_errors = collect_errors_for_files(files_to_check, checking_functions, line_checking_functions) |
| |
| python_errors = check_ruff_lints() |
| python_type_check = run_python_type_checker() |
| cargo_lock_errors = run_cargo_deny_lints() |
| wpt_errors = run_wpt_lints(only_changed_files) |
| |
| # chain all the iterators |
| errors = itertools.chain( |
| config_errors, directory_errors, file_errors, python_errors, python_type_check, wpt_errors, cargo_lock_errors |
| ) |
| |
| colorama.init() |
| error = None |
| for error in errors: |
| print( |
| "\r | " |
| + f"{colorama.Fore.BLUE}{error[0]}{colorama.Style.RESET_ALL}:" |
| + f"{colorama.Fore.YELLOW}{error[1]}{colorama.Style.RESET_ALL}: " |
| + f"{colorama.Fore.RED}{error[2]}{colorama.Style.RESET_ALL}" |
| ) |
| |
| if github_annotations: |
| github_annotation_manager.emit_annotation(error[2], error[2], error[0], error[1]) |
| |
| return int(error is not None) |
| |
| |
| class CargoDenyKrate: |
| def __init__(self, data: dict[Any, Any]) -> None: |
| crate = data["Krate"] |
| self.name = crate["name"] |
| self.version = crate["version"] |
| self.parents = [CargoDenyKrate(parent) for parent in data.get("parents", [])] |
| |
| def __str__(self) -> str: |
| return f"{self.name}@{self.version}" |