| #!/usr/bin/env python3 |
| |
| # This Source Code Form is subject to the terms of the Mozilla Public |
| # License, v. 2.0. If a copy of the MPL was not distributed with this |
| # file, You can obtain one at https://mozilla.org/MPL/2.0/. |
| |
| # This is a script designed to easily debug devtools messages |
| # It takes the content of a pcap wireshark capture (or creates a new |
| # one when using --scan) and pretty prints the JSON payloads. |
| # |
| # Wireshark (more specifically its cli tool tshark) needs to be installed |
| # for this script to work. Go to https://tshark.dev/setup/install for a |
| # comprehensive guide on how to install it. In short: |
| # |
| # Linux (Debian based): apt install tshark |
| # Linux (Arch based): pacman -Sy wireshark-cli |
| # MacOS (With homebrew): brew install --cask wireshark |
| # Windows (With chocolatey): choco install wireshark |
| # |
| # To use it, launch either Servo or a Firefox debugging instance in |
| # devtools mode: |
| # |
| # Servo: ./mach run --devtools=1234 |
| # Firefox: firefox --new-instance --start-debugger-server 1234 --profile PROFILE |
| # |
| # Then run this tool in capture mode and specify the same port as before: |
| # |
| # ./devtools_parser.py --scan cap.pcap --port 1234 |
| # |
| # Finally, open another instance of Firefox and go to about:debugging |
| # and connect to localhost:1234. Messages should start popping up. The |
| # scan can be finished by pressing Ctrl+C. Then, all of the messages will |
| # show up. |
| # |
| # You can also review the results of a saved scan, and filter by words |
| # or by message range: |
| # |
| # ./devtools_parser.py --use cap.pcap --port 1234 --filter watcher --range 10:30 |
| |
| import json |
| import signal |
| import sys |
| from argparse import ArgumentParser |
| from subprocess import Popen, PIPE |
| |
| try: |
| from termcolor import colored |
| except ImportError: |
| |
| def colored(text, *args, **kwargs): |
| return text |
| |
| |
| fields = ["frame.time", "tcp.srcport", "tcp.payload"] |
| |
| |
| # Use tshark to capture network traffic and save the result in a |
| # format that this tool can process later |
| def record_data(file, port): |
| # Create tshark command |
| cmd = [ |
| "tshark", |
| "-T", |
| "fields", |
| "-i", |
| "lo", |
| "-d", |
| f"tcp.port=={port},http", |
| "-w", |
| file, |
| ] + [e for f in fields for e in ("-e", f)] |
| process = Popen(cmd, stdout=PIPE) |
| |
| # Stop the analysis when using Ctrl+C |
| def signal_handler(sig, frame): |
| process.kill() |
| |
| signal.signal(signal.SIGINT, signal_handler) |
| signal.pause() |
| |
| # Get the output |
| out, err = process.communicate() |
| out = out.decode("utf-8") |
| |
| return out |
| |
| |
| # Read a pcap data file from tshark (or wireshark) and extract |
| # the necessary output fields |
| def read_data(file): |
| # Create tshark command |
| cmd = [ |
| "tshark", |
| "-T", |
| "fields", |
| "-r", |
| file, |
| ] + [e for f in fields for e in ("-e", f)] |
| process = Popen(cmd, stdout=PIPE) |
| |
| # Get the output |
| out, err = process.communicate() |
| out = out.decode("utf-8") |
| |
| return out |
| |
| |
| # Transform the raw output of wireshark into a more manageable one |
| def process_data(input, port): |
| # Split the input into lines. |
| # `input` = newline-terminated lines of tab-delimited tshark(1) output |
| lines = [line.split("\t") for line in input.split("\n")] |
| |
| # Remove empty lines and empty sends, and decode hex to bytes. |
| # `lines` = [[date, port, hex-encoded data]], e.g. |
| # `["Mar 18, 2025 21:09:51.879661797 AWST", "6080", "3133"]` |
| # `["Mar 18, 2025 21:09:51.879661797 AWST", "6080", "393a"]` |
| # `["Mar 18, 2025 21:09:51.879661797 AWST", "6080", "7b..."]` |
| sends = [] |
| for line in lines: |
| if len(line) != 3: |
| continue |
| curr_time, curr_port, curr_data = line |
| if len(curr_data) == 0: |
| continue |
| elif len(curr_data) % 2 == 1: |
| print(f"[WARNING] Extra byte in hex-encoded data: {curr_data[-1]}", file=sys.stderr) |
| curr_data = curr_data[:-1] |
| if len(sends) > 0 and sends[-1][1] == curr_port: |
| sends[-1][2] += bytearray.fromhex(curr_data) |
| else: |
| sends.append([curr_time, curr_port, bytearray.fromhex(curr_data)]) |
| |
| # Split and merge consecutive sends with the same port, to yield exactly one record per message. |
| # Message records are of the form `length:{...}`, where `length` is an integer in ASCII decimal. |
| # Incomplete messages are deferred until they are complete. |
| # `sends` = [[date, port, record data]], e.g. |
| # `["Mar 18, 2025 21:09:51.879661797 AWST", "6080", b"13"]` |
| # `["Mar 18, 2025 21:09:51.879661797 AWST", "6080", b"9:"]` |
| # `["Mar 18, 2025 21:09:51.879661797 AWST", "6080", b"{..."]` |
| # `["Mar 18, 2025 21:09:51.879661797 AWST", "6080", b"...}"]` |
| records = [] |
| scunge = {} # Map from port to incomplete message data |
| for curr_time, curr_port, rest in sends: |
| rest = scunge.pop(curr_port, b"") + rest |
| while rest != b"": |
| try: |
| length, new_rest = rest.split(b":", 1) # Can raise ValueError |
| length = int(length) |
| if len(new_rest) < length: |
| raise ValueError("Incomplete message (for now)") |
| # If we found a `length:` prefix and we have enough data to satisfy it, |
| # cut off the prefix so `rest` is just `{...}length:{...}length:{...}`. |
| rest = new_rest |
| except ValueError: |
| print(f"[WARNING] Incomplete message detected (will try to reassemble): {repr(rest)}", file=sys.stderr) |
| scunge[curr_port] = rest |
| # Wait for more data from later sends, potentially after sends with the other port. |
| break |
| # Cut off the message so `rest` is just `length:{...}length:{...}`. |
| message = rest[:length] |
| rest = rest[length:] |
| try: |
| records.append([curr_time, curr_port, message.decode()]) |
| except UnicodeError as e: |
| print(f"[WARNING] Failed to decode message as UTF-8: {e}") |
| continue |
| |
| # Process message records. |
| # `records` = [[date, port, message text]], e.g. |
| # `["Mar 18, 2025 21:09:51.879661797 AWST", "6080", "{...}"]` |
| result = [] |
| for line in records: |
| if len(line) != 3: |
| continue |
| curr_time, curr_port, text = line |
| # Time |
| curr_time = curr_time.split(" ")[-2].split(".")[0] |
| # Port |
| curr_port = "Servo" if curr_port == port else "Firefox" |
| # Data |
| result.append([curr_time, curr_port, len(result), text]) |
| |
| # `result` = [[date, endpoint, index, message text]], e.g. |
| # `["Mar 18, 2025 21:09:51.879661797 AWST", "Servo", 0, "{...}"]` |
| return result |
| |
| |
| # Pretty prints the json message |
| def parse_message(msg, *, json_output=False): |
| time, sender, i, data = msg |
| from_servo = sender == "Servo" |
| |
| colored_sender = colored(sender, "black", "on_yellow" if from_servo else "on_magenta", attrs=["bold"]) |
| if not json_output: |
| print(f"\n{colored_sender} - {colored(i, 'blue')} - {colored(time, 'dark_grey')}") |
| |
| try: |
| content = json.loads(data) |
| if json_output: |
| if "to" in content: |
| # This is a request |
| print(json.dumps({"_to": content["to"], "message": content}, sort_keys=True)) |
| elif "from" in content: |
| # This is a response |
| print(json.dumps({"_from": content["from"], "message": content}, sort_keys=True)) |
| else: |
| assert False, "Message is neither a request nor a response" |
| else: |
| if from_servo and "from" in content: |
| print(colored(f"Actor: {content['from']}", "yellow")) |
| print(json.dumps(content, sort_keys=True, indent=4)) |
| except json.JSONDecodeError: |
| print(f"Warning: Couldn't decode json\n{data}") |
| |
| if not json_output: |
| print() |
| |
| |
| if __name__ == "__main__": |
| # Program arguments |
| parser = ArgumentParser() |
| parser.add_argument("-p", "--port", default="1234", help="the port where the devtools client is running") |
| parser.add_argument("-f", "--filter", help="search for the string on the messages") |
| parser.add_argument("-r", "--range", help="only parse messages from n to m, with the form of n:m") |
| parser.add_argument("--json", action="store_true", help="output in newline-delimited JSON (NDJSON)") |
| |
| actions = parser.add_mutually_exclusive_group(required=True) |
| actions.add_argument("-s", "--scan", help="scan and save the output to a file") |
| actions.add_argument("-u", "--use", help="use the scan from a file") |
| |
| args = parser.parse_args() |
| |
| # Get the scan data |
| if args.scan: |
| data = record_data(args.scan, args.port) |
| else: |
| with open(args.use, "r") as f: |
| data = read_data(args.use) |
| |
| data = process_data(data, args.port) |
| |
| # Set the range of messages to show |
| min, max = 0, -2 |
| if args.range and len(args.range.split(":")) == 2: |
| min, max = args.range.split(":") |
| |
| for msg in data[int(min) : int(max) + 1]: |
| # Filter the messages if specified |
| if not args.filter or args.filter.lower() in msg[3].lower(): |
| parse_message(msg, json_output=args.json) |