mirrored 15 minutes ago
0
Dunjie Lufeat: add gpt-5.4 computer-use runner (#450) * feat: add gpt-5.4 computer-use runner * fix: trim unrelated gpt54 debug logging * chore: default gpt54 reasoning effort to xhighb778119
from __future__ import annotations

import argparse
import datetime
import json
import logging
import os
import signal
import sys
import time
from multiprocessing import Manager, Process, current_process
from typing import List

# Add project root to path for imports
sys.path.insert(0, os.path.join(os.path.dirname(__file__), "../.."))

import lib_run_single
from desktop_env.desktop_env import DesktopEnv
from mm_agents.gpt54_agent import GPT54Agent


active_environments = []
processes = []
is_terminating = False

if os.path.exists(".env"):
    from dotenv import load_dotenv

    load_dotenv()


def config() -> argparse.Namespace:
    parser = argparse.ArgumentParser(
        description="Run end-to-end evaluation on OSWorld with GPT-5.4 computer use"
    )

    parser.add_argument("--path_to_vm", type=str, default=None)
    parser.add_argument("--headless", action="store_true", help="Run in headless machine")
    parser.add_argument("--action_space", type=str, default="pyautogui", help="Action type")
    parser.add_argument(
        "--observation_type",
        choices=["screenshot"],
        default="screenshot",
        help="Observation type",
    )
    parser.add_argument("--sleep_after_execution", type=float, default=0.0)
    parser.add_argument("--max_steps", type=int, default=15)

    parser.add_argument("--max_trajectory_length", type=int, default=3)
    parser.add_argument("--test_config_base_dir", type=str, default="evaluation_examples")

    parser.add_argument("--model", type=str, default="gpt-5.4")
    parser.add_argument("--temperature", type=float, default=1.0)
    parser.add_argument("--top_p", type=float, default=0.9)
    parser.add_argument("--max_tokens", type=int, default=1500)
    parser.add_argument(
        "--reasoning_effort",
        type=str,
        choices=["none", "low", "medium", "high", "xhigh"],
        default="xhigh",
    )
    parser.add_argument("--stop_token", type=str, default=None)

    parser.add_argument("--domain", type=str, default="all")
    parser.add_argument(
        "--test_all_meta_path", type=str, default="evaluation_examples/test_all.json"
    )

    parser.add_argument("--result_dir", type=str, default="./results")
    parser.add_argument("--num_envs", type=int, default=1, help="Number of environments to run in parallel")
    parser.add_argument(
        "--log_level",
        type=str,
        choices=["DEBUG", "INFO", "WARNING", "ERROR", "CRITICAL"],
        default="INFO",
        help="Set the logging level",
    )
    parser.add_argument("--region", type=str, default="us-east-1", help="AWS region for the VM")
    parser.add_argument(
        "--provider_name",
        type=str,
        default="docker",
        choices=["aws", "virtualbox", "vmware", "docker", "azure"],
        help="Provider name",
    )
    parser.add_argument("--client_password", type=str, default="", help="Client password")
    parser.add_argument("--screen_width", type=int, default=1920, help="Screen width")
    parser.add_argument("--screen_height", type=int, default=1080, help="Screen height")
    return parser.parse_args()


args = config()

logger = logging.getLogger()
log_level = getattr(logging, args.log_level.upper())
logger.setLevel(log_level)

datetime_str = datetime.datetime.now().strftime("%Y%m%d@%H%M%S")

file_handler = logging.FileHandler(os.path.join("logs", f"normal-{datetime_str}.log"), encoding="utf-8")
debug_handler = logging.FileHandler(os.path.join("logs", f"debug-{datetime_str}.log"), encoding="utf-8")
stdout_handler = logging.StreamHandler(sys.stdout)

file_handler.setLevel(logging.INFO)
debug_handler.setLevel(logging.DEBUG)
stdout_handler.setLevel(log_level)

formatter = logging.Formatter(
    fmt="\x1b[1;33m[%(asctime)s \x1b[31m%(levelname)s \x1b[32m%(module)s/%(lineno)d-%(processName)s\x1b[1;33m] \x1b[0m%(message)s"
)
file_handler.setFormatter(formatter)
debug_handler.setFormatter(formatter)
stdout_handler.setFormatter(formatter)

stdout_handler.addFilter(logging.Filter("desktopenv"))

logger.addHandler(file_handler)
logger.addHandler(debug_handler)
logger.addHandler(stdout_handler)

logger = logging.getLogger("desktopenv.experiment")


def distribute_tasks(test_all_meta: dict) -> List[tuple]:
    all_tasks = []
    for domain, examples in test_all_meta.items():
        for example_id in examples:
            all_tasks.append((domain, example_id))
    return all_tasks


def process_signal_handler(signum, frame, env_idx):
    logger.info("Process %d received signal %s. Shutting down...", env_idx + 1, signum)
    local_vars = frame.f_locals
    frame_active_envs = local_vars.get("active_environments", [])
    for env in frame_active_envs:
        if env is not None:
            try:
                env.close()
            except Exception as exc:
                logger.error("Process %d error closing environment: %s", env_idx + 1, exc)
    sys.exit(0)


def run_env_tasks(task_queue, args: argparse.Namespace, shared_scores: list):
    active_environments = []
    env = None
    try:
        screen_size = (args.screen_width, args.screen_height)
        env_kwargs = dict(
            path_to_vm=args.path_to_vm,
            action_space=args.action_space,
            provider_name=args.provider_name,
            screen_size=screen_size,
            headless=args.headless,
            os_type="Ubuntu",
            require_a11y_tree=False,
            enable_proxy=True,
            client_password=args.client_password,
        )

        if args.provider_name == "aws":
            from desktop_env.providers.aws.manager import IMAGE_ID_MAP

            region = args.region
            ami_id = IMAGE_ID_MAP[region].get(screen_size, IMAGE_ID_MAP[region][(1920, 1080)])
            env_kwargs["region"] = region
            env_kwargs["snapshot_name"] = ami_id

        env = DesktopEnv(**env_kwargs)
        active_environments.append(env)
        agent = GPT54Agent(
            env=env,
            model=args.model,
            max_tokens=args.max_tokens,
            top_p=args.top_p,
            temperature=args.temperature,
            action_space=args.action_space,
            observation_type=args.observation_type,
            max_trajectory_length=args.max_trajectory_length,
            client_password=args.client_password,
            provider_name=args.provider_name,
            screen_width=args.screen_width,
            screen_height=args.screen_height,
            sleep_after_execution=args.sleep_after_execution,
            reasoning_effort=args.reasoning_effort,
        )
        logger.info("Process %s started.", current_process().name)

        while True:
            try:
                item = task_queue.get(timeout=5)
            except Exception:
                break

            domain, example_id = item
            try:
                config_file = os.path.join(
                    args.test_config_base_dir, f"examples/{domain}/{example_id}.json"
                )
                with open(config_file, "r", encoding="utf-8") as f:
                    example = json.load(f)

                logger.info("[%s][Domain]: %s", current_process().name, domain)
                logger.info("[%s][Example ID]: %s", current_process().name, example_id)
                logger.info("[%s][Instruction]: %s", current_process().name, example["instruction"])

                example_result_dir = os.path.join(
                    args.result_dir,
                    args.action_space,
                    args.observation_type,
                    args.model,
                    domain,
                    example_id,
                )
                os.makedirs(example_result_dir, exist_ok=True)

                try:
                    lib_run_single.run_single_example_gpt54(
                        agent,
                        env,
                        example,
                        args.max_steps,
                        example["instruction"],
                        args,
                        example_result_dir,
                        shared_scores,
                    )
                except Exception as exc:
                    import traceback

                    logger.error(
                        "Exception in %s %s/%s: %s",
                        current_process().name,
                        domain,
                        example_id,
                        exc,
                    )
                    logger.error(traceback.format_exc())
                    try:
                        if hasattr(env, "controller") and env.controller is not None:
                            env.controller.end_recording(
                                os.path.join(example_result_dir, "recording.mp4")
                            )
                    except Exception as record_exc:
                        logger.error("Failed to end recording: %s", record_exc)
                    with open(os.path.join(example_result_dir, "traj.jsonl"), "a", encoding="utf-8") as f:
                        f.write(json.dumps({"Error": f"{domain}/{example_id} - {exc}"}))
                        f.write("\n")
            except Exception as exc:
                import traceback

                logger.error("Task-level error in %s: %s", current_process().name, exc)
                logger.error(traceback.format_exc())
    except Exception as exc:
        import traceback

        logger.error("Process-level error in %s: %s", current_process().name, exc)
        logger.error(traceback.format_exc())
    finally:
        logger.info("%s cleaning up environment...", current_process().name)
        try:
            if env:
                env.close()
                logger.info("%s environment closed successfully", current_process().name)
        except Exception as exc:
            logger.error("%s error during environment cleanup: %s", current_process().name, exc)


def signal_handler(signum, frame):
    global is_terminating, active_environments, processes

    if is_terminating:
        return

    is_terminating = True
    logger.info("Received signal %s. Gracefully shutting down...", signum)

    for env in active_environments:
        try:
            env.close()
        except Exception as exc:
            logger.error("Error closing environment: %s", exc)

    for process in processes:
        if process.is_alive():
            try:
                process.terminate()
            except Exception as exc:
                logger.error("Error sending termination signal to process: %s", exc)

    time.sleep(1)

    for process in processes:
        if process.is_alive():
            try:
                import signal as os_signal

                os.kill(process.pid, os_signal.SIGKILL)
            except Exception as exc:
                logger.error("Error forcefully terminating process: %s", exc)

    logger.info("Shutdown complete. Exiting.")
    sys.exit(0)


def test(args: argparse.Namespace, test_all_meta: dict) -> None:
    global processes

    logger.info("Args: %s", args)
    all_tasks = distribute_tasks(test_all_meta)
    logger.info("Total tasks: %d", len(all_tasks))

    with Manager() as manager:
        shared_scores = manager.list()
        task_queue = manager.Queue()
        for item in all_tasks:
            task_queue.put(item)

        processes = []
        for index in range(args.num_envs):
            process = Process(
                target=run_env_tasks,
                args=(task_queue, args, shared_scores),
                name=f"EnvProcess-{index + 1}",
            )
            process.daemon = True
            process.start()
            processes.append(process)
            logger.info("Started process %s with PID %s", process.name, process.pid)

        try:
            while True:
                alive_count = 0
                for index, process in enumerate(processes):
                    if not process.is_alive():
                        logger.warning("Process %s died, restarting...", process.name)
                        new_process = Process(
                            target=run_env_tasks,
                            args=(task_queue, args, shared_scores),
                            name=f"EnvProcess-Restart-{index + 1}",
                        )
                        new_process.daemon = True
                        new_process.start()
                        processes[index] = new_process
                        logger.info("Restarted process %s with PID %s", new_process.name, new_process.pid)
                    else:
                        alive_count += 1

                if task_queue.empty():
                    logger.info("All tasks finished.")
                    break
                if alive_count == 0:
                    logger.error("All processes died, exiting.")
                    break
                time.sleep(5)

            for process in processes:
                process.join()
        except KeyboardInterrupt:
            logger.info("Main process received KeyboardInterrupt. Initiating graceful shutdown...")
            raise
        except Exception as exc:
            logger.error("Unexpected error while waiting for processes: %s", exc, exc_info=True)
            for process in processes:
                if process.is_alive():
                    try:
                        process.terminate()
                    except Exception as term_exc:
                        logger.error("Error terminating process %s: %s", process.name, term_exc)
            raise

        scores = list(shared_scores)

    logger.info("Average score: %s", sum(scores) / len(scores) if scores else 0)


def get_unfinished(action_space, use_model, observation_type, result_dir, total_file_json):
    target_dir = os.path.join(result_dir, action_space, observation_type, use_model)

    if not os.path.exists(target_dir):
        return total_file_json

    finished = {}
    for domain in os.listdir(target_dir):
        finished[domain] = []
        domain_path = os.path.join(target_dir, domain)
        if os.path.isdir(domain_path):
            for example_id in os.listdir(domain_path):
                if example_id == "onboard":
                    continue
                example_path = os.path.join(domain_path, example_id)
                if os.path.isdir(example_path):
                    if "result.txt" not in os.listdir(example_path):
                        for file_name in os.listdir(example_path):
                            os.remove(os.path.join(example_path, file_name))
                    else:
                        finished[domain].append(example_id)

    if not finished:
        return total_file_json

    for domain, examples in finished.items():
        if domain in total_file_json:
            total_file_json[domain] = [
                example_id for example_id in total_file_json[domain] if example_id not in examples
            ]

    return total_file_json


def get_result(action_space, use_model, observation_type, result_dir, total_file_json):
    target_dir = os.path.join(result_dir, action_space, observation_type, use_model)
    if not os.path.exists(target_dir):
        print("New experiment, no result yet.")
        return None

    all_result = []
    for domain in os.listdir(target_dir):
        domain_path = os.path.join(target_dir, domain)
        if os.path.isdir(domain_path):
            for example_id in os.listdir(domain_path):
                example_path = os.path.join(domain_path, example_id)
                if os.path.isdir(example_path) and "result.txt" in os.listdir(example_path):
                    try:
                        all_result.append(float(open(os.path.join(example_path, "result.txt"), "r", encoding="utf-8").read()))
                    except Exception:
                        all_result.append(0.0)

    if not all_result:
        print("New experiment, no result yet.")
        return None

    print("Current Success Rate:", sum(all_result) / len(all_result) * 100, "%")
    return all_result


if __name__ == "__main__":
    os.environ["TOKENIZERS_PARALLELISM"] = "false"

    signal.signal(signal.SIGINT, signal_handler)
    signal.signal(signal.SIGTERM, signal_handler)

    try:
        args = config()

        path_to_args = os.path.join(
            args.result_dir,
            args.action_space,
            args.observation_type,
            args.model,
            "args.json",
        )
        os.makedirs(os.path.dirname(path_to_args), exist_ok=True)
        with open(path_to_args, "w", encoding="utf-8") as f:
            json.dump(vars(args), f, indent=4)

        with open(args.test_all_meta_path, "r", encoding="utf-8") as f:
            test_all_meta = json.load(f)

        if args.domain != "all":
            test_all_meta = {args.domain: test_all_meta[args.domain]}

        test_file_list = get_unfinished(
            args.action_space,
            args.model,
            args.observation_type,
            args.result_dir,
            test_all_meta,
        )

        left_info = ""
        for domain in test_file_list:
            left_info += f"{domain}: {len(test_file_list[domain])}\n"
        logger.info("Left tasks:\n%s", left_info)

        get_result(
            args.action_space,
            args.model,
            args.observation_type,
            args.result_dir,
            test_all_meta,
        )
        test(args, test_file_list)
    except KeyboardInterrupt:
        logger.info("Main process received KeyboardInterrupt.")
    except Exception as exc:
        logger.error("Unexpected error in main process: %s", exc, exc_info=True)
        signal_handler(signal.SIGTERM, None)
    finally:
        logger.info("Main process final cleanup...")
        for env in active_environments:
            if env is not None:
                try:
                    env.close()
                except Exception:
                    pass