Files
mesa/bin/ci/test/test_custom_logger.py
2025-04-15 13:04:11 +00:00

747 lines
22 KiB
Python

import logging
import subprocess
from datetime import datetime
import pytest
from custom_logger import CustomLogger
@pytest.fixture
def tmp_log_file(tmp_path):
return tmp_path / "test_log.json"
@pytest.fixture
def custom_logger(tmp_log_file):
return CustomLogger(tmp_log_file)
def run_script_with_args(args):
import custom_logger
script_path = custom_logger.__file__
return subprocess.run(
["python3", str(script_path), *args], capture_output=True, text=True
)
# Test case for missing log file
@pytest.mark.parametrize(
"key, value", [("dut_attempt_counter", "1"), ("job_combined_status", "pass")]
)
def test_missing_log_file_argument(key, value):
result = run_script_with_args(["--update", "key", "value"])
assert result.returncode != 0
# Parametrize test case for valid update arguments
@pytest.mark.parametrize(
"key, value", [("dut_attempt_counter", "1"), ("job_combined_status", "pass")]
)
def test_update_argument_valid(custom_logger, tmp_log_file, key, value):
result = run_script_with_args([str(tmp_log_file), "--update", key, value])
assert result.returncode == 0
# Test case for passing only the key without a value
def test_update_argument_key_only(custom_logger, tmp_log_file):
key = "dut_attempt_counter"
result = run_script_with_args([str(tmp_log_file), "--update", key])
assert result.returncode != 0
# Test case for not passing any key-value pair
def test_update_argument_no_values(custom_logger, tmp_log_file):
result = run_script_with_args([str(tmp_log_file), "--update"])
assert result.returncode == 0
# Parametrize test case for valid arguments
@pytest.mark.parametrize(
"key, value", [("dut_attempt_counter", "1"), ("job_combined_status", "pass")]
)
def test_create_argument_valid(custom_logger, tmp_log_file, key, value):
result = run_script_with_args([str(tmp_log_file), "--create-dut-job", key, value])
assert result.returncode == 0
# Test case for passing only the key without a value
def test_create_argument_key_only(custom_logger, tmp_log_file):
key = "dut_attempt_counter"
result = run_script_with_args([str(tmp_log_file), "--create-dut-job", key])
assert result.returncode != 0
# Test case for not passing any key-value pair
def test_create_argument_no_values(custom_logger, tmp_log_file):
result = run_script_with_args([str(tmp_log_file), "--create-dut-job"])
assert result.returncode == 0
# Test case for updating a DUT job
@pytest.mark.parametrize(
"key, value", [("status", "hung"), ("dut_state", "Canceling"), ("dut_name", "asus")]
)
def test_update_dut_job(custom_logger, tmp_log_file, key, value):
result = run_script_with_args([str(tmp_log_file), "--update-dut-job", key, value])
assert result.returncode != 0
result = run_script_with_args([str(tmp_log_file), "--create-dut-job", key, value])
assert result.returncode == 0
result = run_script_with_args([str(tmp_log_file), "--update-dut-job", key, value])
assert result.returncode == 0
# Test case for updating last DUT job
def test_update_dut_multiple_job(custom_logger, tmp_log_file):
# Create the first DUT job with the first key
result = run_script_with_args(
[str(tmp_log_file), "--create-dut-job", "status", "hung"]
)
assert result.returncode == 0
# Create the second DUT job with the second key
result = run_script_with_args(
[str(tmp_log_file), "--create-dut-job", "dut_state", "Canceling"]
)
assert result.returncode == 0
result = run_script_with_args(
[str(tmp_log_file), "--update-dut-job", "dut_name", "asus"]
)
assert result.returncode == 0
# Parametrize test case for valid phase arguments
@pytest.mark.parametrize(
"phase_name",
[("Phase1"), ("Phase2"), ("Phase3")],
)
def test_create_job_phase_valid(custom_logger, tmp_log_file, phase_name):
custom_logger.create_dut_job(status="pass")
result = run_script_with_args([str(tmp_log_file), "--create-job-phase", phase_name])
assert result.returncode == 0
# Test case for not passing any arguments for create-job-phase
def test_create_job_phase_no_arguments(custom_logger, tmp_log_file):
custom_logger.create_dut_job(status="pass")
result = run_script_with_args([str(tmp_log_file), "--create-job-phase"])
assert result.returncode != 0
# Test case for trying to create a phase job without an existing DUT job
def test_create_job_phase_no_dut_job(custom_logger, tmp_log_file):
phase_name = "Phase1"
result = run_script_with_args([str(tmp_log_file), "--create-job-phase", phase_name])
assert result.returncode != 0
# Combined test cases for valid scenarios
def test_valid_scenarios(custom_logger, tmp_log_file):
valid_update_args = [("dut_attempt_counter", "1"), ("job_combined_status", "pass")]
for key, value in valid_update_args:
result = run_script_with_args([str(tmp_log_file), "--update", key, value])
assert result.returncode == 0
valid_create_args = [
("status", "hung"),
("dut_state", "Canceling"),
("dut_name", "asus"),
("phase_name", "Bootloader"),
]
for key, value in valid_create_args:
result = run_script_with_args(
[str(tmp_log_file), "--create-dut-job", key, value]
)
assert result.returncode == 0
result = run_script_with_args(
[str(tmp_log_file), "--create-dut-job", "status", "hung"]
)
assert result.returncode == 0
result = run_script_with_args(
[str(tmp_log_file), "--update-dut-job", "dut_name", "asus"]
)
assert result.returncode == 0
result = run_script_with_args(
[
str(tmp_log_file),
"--create-job-phase",
"phase_name",
]
)
assert result.returncode == 0
# Parametrize test case for valid update arguments
@pytest.mark.parametrize(
"key, value", [("dut_attempt_counter", "1"), ("job_combined_status", "pass")]
)
def test_update(custom_logger, key, value):
custom_logger.update(**{key: value})
logger_data = custom_logger.logger.data
assert key in logger_data
assert logger_data[key] == value
# Test case for updating with a key that already exists
def test_update_existing_key(custom_logger):
key = "status"
value = "new_value"
custom_logger.logger.data[key] = "old_value"
custom_logger.update(**{key: value})
logger_data = custom_logger.logger.data
assert key in logger_data
assert logger_data[key] == value
# Test case for updating "dut_jobs"
def test_update_dut_jobs(custom_logger):
key1 = "status"
value1 = "fail"
key2 = "state"
value2 = "hung"
custom_logger.create_dut_job(**{key1: value1})
logger_data = custom_logger.logger.data
job1 = logger_data["dut_jobs"][0]
assert key1 in job1
assert job1[key1] == value1
custom_logger.update_dut_job(key2, value2)
logger_data = custom_logger.logger.data
job2 = logger_data["dut_jobs"][0]
assert key2 in job2
assert job2[key2] == value2
# Test case for creating and updating DUT job
def test_create_dut_job(custom_logger):
key = "status"
value1 = "pass"
value2 = "fail"
value3 = "hung"
reason = "job_combined_status"
result = "Finished"
custom_logger.update(**{reason: result})
logger_data = custom_logger.logger.data
assert reason in logger_data
assert logger_data[reason] == result
# Create the first DUT job
custom_logger.create_dut_job(**{key: value1})
logger_data = custom_logger.logger.data
assert "dut_jobs" in logger_data
assert isinstance(logger_data["dut_jobs"], list)
assert len(logger_data["dut_jobs"]) == 1
assert isinstance(logger_data["dut_jobs"][0], dict)
# Check the values of the keys in the created first DUT job
job1 = logger_data["dut_jobs"][0]
assert key in job1
assert job1[key] == value1
# Create the second DUT job
custom_logger.create_dut_job(**{key: value2})
logger_data = custom_logger.logger.data
assert "dut_jobs" in logger_data
assert isinstance(logger_data["dut_jobs"], list)
assert len(logger_data["dut_jobs"]) == 2
assert isinstance(logger_data["dut_jobs"][1], dict)
# Check the values of the keys in the created second DUT job
job2 = logger_data["dut_jobs"][1]
assert key in job2
assert job2[key] == value2
# Update the second DUT job with value3
custom_logger.update_dut_job(key, value3)
logger_data = custom_logger.logger.data
# Check the updated value in the second DUT job
job2 = logger_data["dut_jobs"][1]
assert key in job2
assert job2[key] == value3
# Find the index of the last DUT job
last_job_index = len(logger_data["dut_jobs"]) - 1
# Update the last DUT job
custom_logger.update_dut_job("dut_name", "asus")
logger_data = custom_logger.logger.data
# Check the updated value in the last DUT job
job2 = logger_data["dut_jobs"][last_job_index]
assert "dut_name" in job2
assert job2["dut_name"] == "asus"
# Check that "dut_name" is not present in other DUT jobs
for idx, job in enumerate(logger_data["dut_jobs"]):
if idx != last_job_index:
assert job.get("dut_name") == ""
# Test case for updating with missing "dut_jobs" key
def test_update_dut_job_missing_dut_jobs(custom_logger):
key = "status"
value = "fail"
# Attempt to update a DUT job when "dut_jobs" is missing
with pytest.raises(ValueError, match="No DUT jobs found."):
custom_logger.update_dut_job(key, value)
# Test case for creating a job phase
def test_create_job_phase(custom_logger):
custom_logger.create_dut_job(status="pass")
phase_name = "Phase1"
custom_logger.create_job_phase(phase_name)
logger_data = custom_logger.logger.data
assert "dut_jobs" in logger_data
assert isinstance(logger_data["dut_jobs"], list)
assert len(logger_data["dut_jobs"]) == 1
job = logger_data["dut_jobs"][0]
assert "dut_job_phases" in job
assert isinstance(job["dut_job_phases"], list)
assert len(job["dut_job_phases"]) == 1
phase = job["dut_job_phases"][0]
assert phase["name"] == phase_name
try:
datetime.fromisoformat(phase["start_time"])
assert True
except ValueError:
assert False
assert phase["end_time"] == ""
# Test case for creating multiple phase jobs
def test_create_multiple_phase_jobs(custom_logger):
custom_logger.create_dut_job(status="pass")
phase_data = [
{
"phase_name": "Phase1",
},
{
"phase_name": "Phase2",
},
{
"phase_name": "Phase3",
},
]
for data in phase_data:
phase_name = data["phase_name"]
custom_logger.create_job_phase(phase_name)
logger_data = custom_logger.logger.data
assert "dut_jobs" in logger_data
assert isinstance(logger_data["dut_jobs"], list)
assert len(logger_data["dut_jobs"]) == 1
job = logger_data["dut_jobs"][0]
assert "dut_job_phases" in job
assert isinstance(job["dut_job_phases"], list)
assert len(job["dut_job_phases"]) == len(phase_data)
for data in phase_data:
phase_name = data["phase_name"]
phase = job["dut_job_phases"][phase_data.index(data)]
assert phase["name"] == phase_name
try:
datetime.fromisoformat(phase["start_time"])
assert True
except ValueError:
assert False
if phase_data.index(data) != len(phase_data) - 1:
try:
datetime.fromisoformat(phase["end_time"])
assert True
except ValueError:
assert False
# Check if the end_time of the last phase is an empty string
last_phase = job["dut_job_phases"][-1]
assert last_phase["end_time"] == ""
# Test case for creating multiple dut jobs and updating phase job for last dut job
def test_create_two_dut_jobs_and_add_phase(custom_logger):
# Create the first DUT job
custom_logger.create_dut_job(status="pass")
# Create the second DUT job
custom_logger.create_dut_job(status="fail")
logger_data = custom_logger.logger.data
assert "dut_jobs" in logger_data
assert isinstance(logger_data["dut_jobs"], list)
assert len(logger_data["dut_jobs"]) == 2
first_dut_job = logger_data["dut_jobs"][0]
second_dut_job = logger_data["dut_jobs"][1]
# Add a phase to the second DUT job
custom_logger.create_job_phase("Phase1")
logger_data = custom_logger.logger.data
assert "dut_jobs" in logger_data
assert isinstance(logger_data["dut_jobs"], list)
assert len(logger_data["dut_jobs"]) == 2
first_dut_job = logger_data["dut_jobs"][0]
second_dut_job = logger_data["dut_jobs"][1]
# Check first DUT job does not have a phase
assert not first_dut_job.get("dut_job_phases")
# Check second DUT job has a phase
assert second_dut_job.get("dut_job_phases")
assert isinstance(second_dut_job["dut_job_phases"], list)
assert len(second_dut_job["dut_job_phases"]) == 1
# Test case for updating DUT start time
def test_update_dut_start_time(custom_logger):
custom_logger.create_dut_job(status="pass")
custom_logger.update_dut_time("start", None)
logger_data = custom_logger.logger.data
assert "dut_jobs" in logger_data
assert len(logger_data["dut_jobs"]) == 1
dut_job = logger_data["dut_jobs"][0]
assert "dut_start_time" in dut_job
assert dut_job["dut_start_time"] != ""
try:
datetime.fromisoformat(dut_job["dut_start_time"])
assert True
except ValueError:
assert False
# Test case for updating DUT submit time
def test_update_dut_submit_time(custom_logger):
custom_time = "2023-11-09T02:37:06Z"
custom_logger.create_dut_job(status="pass")
custom_logger.update_dut_time("submit", custom_time)
logger_data = custom_logger.logger.data
assert "dut_jobs" in logger_data
assert len(logger_data["dut_jobs"]) == 1
dut_job = logger_data["dut_jobs"][0]
assert "dut_submit_time" in dut_job
try:
datetime.fromisoformat(dut_job["dut_submit_time"])
assert True
except ValueError:
assert False
# Test case for updating DUT end time
def test_update_dut_end_time(custom_logger):
custom_logger.create_dut_job(status="pass")
custom_logger.update_dut_time("end", None)
logger_data = custom_logger.logger.data
assert "dut_jobs" in logger_data
assert len(logger_data["dut_jobs"]) == 1
dut_job = logger_data["dut_jobs"][0]
assert "dut_end_time" in dut_job
try:
datetime.fromisoformat(dut_job["dut_end_time"])
assert True
except ValueError:
assert False
# Test case for updating DUT time with invalid value
def test_update_dut_time_invalid_value(custom_logger):
custom_logger.create_dut_job(status="pass")
with pytest.raises(
ValueError,
match="Error: Invalid argument provided for --update-dut-time. Use 'start', 'submit', 'end'.",
):
custom_logger.update_dut_time("invalid_value", None)
# Test case for close_dut_job
def test_close_dut_job(custom_logger):
custom_logger.create_dut_job(status="pass")
custom_logger.create_job_phase("Phase1")
custom_logger.create_job_phase("Phase2")
custom_logger.close_dut_job()
logger_data = custom_logger.logger.data
assert "dut_jobs" in logger_data
assert len(logger_data["dut_jobs"]) == 1
dut_job = logger_data["dut_jobs"][0]
assert "dut_job_phases" in dut_job
dut_job_phases = dut_job["dut_job_phases"]
phase1 = dut_job_phases[0]
assert phase1["name"] == "Phase1"
try:
datetime.fromisoformat(phase1["start_time"])
assert True
except ValueError:
assert False
try:
datetime.fromisoformat(phase1["end_time"])
assert True
except ValueError:
assert False
phase2 = dut_job_phases[1]
assert phase2["name"] == "Phase2"
try:
datetime.fromisoformat(phase2["start_time"])
assert True
except ValueError:
assert False
try:
datetime.fromisoformat(phase2["end_time"])
assert True
except ValueError:
assert False
# Test case for close
def test_close(custom_logger):
custom_logger.create_dut_job(status="pass")
custom_logger.close()
logger_data = custom_logger.logger.data
assert "dut_jobs" in logger_data
assert len(logger_data["dut_jobs"]) == 1
assert "dut_attempt_counter" in logger_data
assert logger_data["dut_attempt_counter"] == len(logger_data["dut_jobs"])
assert "job_combined_status" in logger_data
assert logger_data["job_combined_status"] != ""
dut_job = logger_data["dut_jobs"][0]
assert "submitter_end_time" in dut_job
try:
datetime.fromisoformat(dut_job["submitter_end_time"])
assert True
except ValueError:
assert False
# Test case for updating status to fail with a reason
def test_update_status_fail_with_reason(custom_logger):
custom_logger.create_dut_job()
reason = "kernel panic"
custom_logger.update_status_fail(reason)
logger_data = custom_logger.logger.data
assert "dut_jobs" in logger_data
assert len(logger_data["dut_jobs"]) == 1
dut_job = logger_data["dut_jobs"][0]
assert "status" in dut_job
assert dut_job["status"] == "fail"
assert "dut_job_fail_reason" in dut_job
assert dut_job["dut_job_fail_reason"] == reason
# Test case for updating status to fail without providing a reason
def test_update_status_fail_without_reason(custom_logger):
custom_logger.create_dut_job()
custom_logger.update_status_fail()
# Check if the status is updated and fail reason is empty
logger_data = custom_logger.logger.data
assert "dut_jobs" in logger_data
assert len(logger_data["dut_jobs"]) == 1
dut_job = logger_data["dut_jobs"][0]
assert "status" in dut_job
assert dut_job["status"] == "fail"
assert "dut_job_fail_reason" in dut_job
assert dut_job["dut_job_fail_reason"] == ""
# Test case for check_dut_timings with submission time earlier than start time
def test_check_dut_timings_submission_earlier_than_start(custom_logger, caplog):
custom_logger.create_dut_job()
# Set submission time to be earlier than start time
custom_logger.update_dut_time("start", "2023-01-01T11:00:00")
custom_logger.update_dut_time("submit", "2023-01-01T12:00:00")
logger_data = custom_logger.logger.data
assert "dut_jobs" in logger_data
assert len(logger_data["dut_jobs"]) == 1
job = logger_data["dut_jobs"][0]
# Call check_dut_timings
custom_logger.check_dut_timings(job)
# Check if an error message is logged
assert "Job submission is happening before job start." in caplog.text
# Test case for check_dut_timings with end time earlier than start time
def test_check_dut_timings_end_earlier_than_start(custom_logger, caplog):
custom_logger.create_dut_job()
# Set end time to be earlier than start time
custom_logger.update_dut_time("end", "2023-01-01T11:00:00")
custom_logger.update_dut_time("start", "2023-01-01T12:00:00")
logger_data = custom_logger.logger.data
assert "dut_jobs" in logger_data
assert len(logger_data["dut_jobs"]) == 1
job = logger_data["dut_jobs"][0]
# Call check_dut_timings
custom_logger.check_dut_timings(job)
# Check if an error message is logged
assert "Job ended before it started." in caplog.text
# Test case for check_dut_timings with valid timing sequence
def test_check_dut_timings_valid_timing_sequence(custom_logger, caplog):
custom_logger.create_dut_job()
# Set valid timing sequence
custom_logger.update_dut_time("submit", "2023-01-01T12:00:00")
custom_logger.update_dut_time("start", "2023-01-01T12:30:00")
custom_logger.update_dut_time("end", "2023-01-01T13:00:00")
logger_data = custom_logger.logger.data
assert "dut_jobs" in logger_data
assert len(logger_data["dut_jobs"]) == 1
job = logger_data["dut_jobs"][0]
# Call check_dut_timings
custom_logger.check_dut_timings(job)
# Check that no error messages are logged
assert "Job submission is happening before job start." not in caplog.text
assert "Job ended before it started." not in caplog.text
@pytest.mark.parametrize("start_time,end_time,expected", [
("2023-01-01T12:00:00", "2023-01-01T12:30:00", "0:30:00"),
("invalid", "2023-01-01T12:30:00", ""),
("", "", ""),
])
def test_get_duration_time(custom_logger, start_time, end_time, expected):
duration = custom_logger.get_duration_time(start_time, end_time)
assert duration == expected
# Test case for DUT job duration tracking
def test_dut_job_duration(custom_logger):
custom_logger.create_dut_job(status="pass")
# Set start time
custom_logger.update_dut_time("start", "2023-01-01T12:00:00")
logger_data = custom_logger.logger.data
dut_job = logger_data["dut_jobs"][0]
assert dut_job["dut_duration_time"] == ""
# Set end time
custom_logger.update_dut_time("end", "2023-01-01T12:30:00")
logger_data = custom_logger.logger.data
dut_job = logger_data["dut_jobs"][0]
assert dut_job["dut_duration_time"] == "0:30:00"
# Test case for job phase duration tracking
def test_job_phase_duration(custom_logger):
custom_logger.create_dut_job(status="pass")
# Create first phase
custom_logger.create_job_phase("Phase1")
logger_data = custom_logger.logger.data
job = logger_data["dut_jobs"][0]
phase = job["dut_job_phases"][0]
assert phase["duration_time"] == ""
# Create second phase (which should close the first phase)
custom_logger.create_job_phase("Phase2")
logger_data = custom_logger.logger.data
job = logger_data["dut_jobs"][0]
# First phase should have duration
phase1 = job["dut_job_phases"][0]
assert phase1["duration_time"] != ""
# Verify duration format
start = datetime.fromisoformat(phase1["start_time"])
end = datetime.fromisoformat(phase1["end_time"])
duration = end - start
assert phase1["duration_time"] == str(duration)
# Second phase should have empty duration
phase2 = job["dut_job_phases"][1]
assert phase2["duration_time"] == ""
# Test case for closing job with phases
def test_close_job_with_phases_duration(custom_logger):
custom_logger.create_dut_job(status="pass")
custom_logger.create_job_phase("Phase1")
custom_logger.close_dut_job()
logger_data = custom_logger.logger.data
job = logger_data["dut_jobs"][0]
phase = job["dut_job_phases"][0]
assert phase["duration_time"] != ""
# Verify duration format
start = datetime.fromisoformat(phase["start_time"])
end = datetime.fromisoformat(phase["end_time"])
duration = end - start
assert phase["duration_time"] == str(duration)