built_test_history.py

mail@pastecode.io avatar
unknown
python
a year ago
6.9 kB
4
Indexable
Never
from io import TextIOWrapper
import requests
import time
from typing import TypedDict

# This is an example JSON of one of the aligned one objects.
# From: https://wpt.fyi/api/runs?label=master&label=experimental&max-count=100&aligned
#
# id:
# 	5078469266898944
# browser_name:
# 	chrome
# browser_version:
# 	114.0.5696.0 dev
# os_name:
# 	linux
# os_version:
# 	20.04
# revision:
# 	0375b3ebd8
# full_revision_hash:
# 	0375b3ebd812ae5d8713e3bb06c5805273ebbe3b
# results_url:
# 	https://storage.googleapis.com/wptd/0375b3ebd812ae5d8713e3bb06c5805273ebbe3b/chrome-114.0.5696.0_dev-linux-20.04-c84aaf98a5-summary_v2.json.gz
# created_at:
# 	2023-04-13T04:20:50.316333Z
# time_start:
# 	2023-04-13T02:57:17.041Z
# time_end:
# 	2023-04-13T04:04:37.36Z
# raw_results_url:
# 	https://storage.googleapis.com/wptd-results/0375b3ebd812ae5d8713e3bb06c5805273ebbe3b/chrome-114.0.5696.0_dev-linux-20.04-c84aaf98a5/report.json
# labels:
# 	['chrome', 'dev', 'experimental', 'master', 'taskcluster', 'user:chromium-wpt-export-bot']


# Type hint class for the run metadata return value from api/runs endpoint.
class MetadataDict(TypedDict):
  id: str
  browser_name: str
  browser_version: str
  os_name: str
  os_version: str
  revision: str
  full_revision_hash: str
  results_url: str
  created_at: str
  time_start: str
  time_end: str
  raw_results_url: str
  labels: list[str]

# {
#   ('/wasm/jsapi/table/constructor-types.tentative.any.js', ''): 'SKIP',
#   ('/wasm/jsapi/table/constructor-types.tentative.any.js', 'Zero minimum'): 'FAIL',
# }

# Type hint class for the dictionary maintaining previous test/subtest statuses.
class PrevTestStatusDict(TypedDict):
  chrome:  dict[tuple[str, str], str]
  edge:    dict[tuple[str, str], str]
  firefox: dict[tuple[str, str], str]
  safari:  dict[tuple[str, str], str]


# Get the list of metadata for the most recent aligned runs.
def get_aligned_run_info() -> list[MetadataDict]:
  # Change the "max-count" to try this script with a smaller set.
  resp = requests.get('https://wpt.fyi/api/runs?label=master&label=experimental&max-count=2&aligned')
  runs_list: list[MetadataDict] = resp.json()

  # Sort by browser name -> then time start, so that the aligned runs are
  # processed in groups with each other.
  runs_list.sort(key=lambda run: run['browser_name'])
  runs_list.sort(key=lambda run: run['time_start'])

  # Print the dates just to get info on the list of runs we're working with.
  print('Runs to process:')
  for run in runs_list:
    print(f'{run["browser_name"]} {run["time_start"]}')
  print()
  
  return runs_list


# A little loading bar for fun.
def print_loading_bar(i: int, run_count: int) -> None:
  run_number = i + 1
  print(f'|{"#" * run_number}{"-" * (run_count - run_number)}| '
        f'({run_number}/{run_count})')


def write_row_if_needed(
    test_name: str,
    subtest_name: str,
    key: tuple[str, str],
    current_status: str,
    prev_test_statuses: dict[str, str],
    csv_row_metadata: str,
    file: TextIOWrapper,
  ) -> None:
  # If the test status differs from the previous run, write a row.
  if current_status != prev_test_statuses.get(key, ''):
    file.write('{},"{}","{}",{}\n'.format(
      csv_row_metadata,
      test_name,
      subtest_name,
      current_status)
    )

  # Replace the previous status with this new status.
  prev_test_statuses[key] = current_status


def process_single_run(
    run_metadata: MetadataDict,
    prev_test_statuses: dict[str, str],
  ) -> None:
  with open('browser-history-test-data.csv', 'a') as f:
    run_resp = requests.get(run_metadata['raw_results_url'])
    run_data = run_resp.json()

    # Create the first columns of each row with overall run info.
    csv_row_metadata = '{},{},{},{}'.format(
      run_metadata['id'],
      run_metadata['browser_name'],
      run_metadata['browser_version'],
      run_data['time_start'],
    )

    # Keep track of every single test result that's in the dataset of
    # runs we've previously seen. If they're not in the run we're processing,
    # we'll mark them as missing.
    tests_not_seen = set(prev_test_statuses.keys())

    # iterate through each test.
    for test_data in run_data['results']:
      # Format the test name.
      test_name = (test_data['test']
          .replace('\"', '\"\"').replace('\n', ' '))

      # Test results are stored in dictionary with a tuple key
      # in the form of (testname, subtest_name).
      # The overall test status has an empty string as the subtest name.
      test_key = (test_name, '')

      write_row_if_needed(
        test_name,
        subtest_name='',
        key=test_key,
        current_status=test_data['status'],
        prev_test_statuses=prev_test_statuses,
        csv_row_metadata=csv_row_metadata,
        file=f,
      )

      # Now that we've seen this test status, we can remove it from the
      # the set of tests we haven't seen yet.
      tests_not_seen.discard(test_key)

      # Do the same basic process for each subtest.
      for subtest_data in test_data['subtests']:
        subtest_name = (subtest_data['name']
            .replace('\"', '\"\"').replace('\n', ' '))
        subtest_key = (test_name, subtest_name)

        write_row_if_needed(
          test_name,
          subtest_name=subtest_name,
          key=test_key,
          current_status=subtest_data['status'],
          prev_test_statuses=prev_test_statuses,
          csv_row_metadata=csv_row_metadata,
          file=f,
        )

        tests_not_seen.discard(subtest_key)

    # Write MISSING status for tests/subtests not seen.
    for test_name, subtest_name in tests_not_seen:
      key = (test_name, subtest_name)
      # Only write a row as missing if it's not already marked as missing.
      write_row_if_needed(
        test_name,
        subtest_name=subtest_name,
        key=key,
        current_status='MISSING',
        prev_test_statuses=prev_test_statuses,
        csv_row_metadata=csv_row_metadata,
        file=f,
      )


def process_runs(runs_list: list[MetadataDict]) -> None:
  # Time the process
  start = time.time()
  # Create first CSV columns
  with open('browser-history-test-data.csv', 'w') as f:
    f.write('run_id,browser_name,browser_version,date,test_name,subtest_name,status\n')

  # Keep a dictionary of the previous test statuses from runs we've processed.
  prev_test_statuses: PrevTestStatusDict = {
    'chrome': {},
    'edge': {},
    'firefox': {},
    'safari': {},
  }

  # Go through each aligned run.
  for i, run_metadata in enumerate(runs_list):
    browser_name = run_metadata['browser_name']
    process_single_run(
      run_metadata,
      prev_test_statuses[browser_name],
    )
    print_loading_bar(i, len(runs_list))
  
  print(f'Time taken = {round(time.time() - start, 0)} seconds.')


def main():
  runs_list = get_aligned_run_info()
  process_runs(runs_list)


if __name__ == '__main__':
  main()