diff --git a/src/agents/__main__.py b/src/agents/__main__.py index 392171e..fabb3f6 100644 --- a/src/agents/__main__.py +++ b/src/agents/__main__.py @@ -157,7 +157,6 @@ def run_eval_post_training( summary="max", ) eval_cfgs = [EvalConfig(**cfg) for cfg in json.loads(eval_cfgs)] - kwargs = json.loads(kwargs) for idx, env in enumerate(eval_cfgs): wandb.define_metric( f"{env.env_id}/success", @@ -197,7 +196,7 @@ def run_eval_post_training( # spawn n processes and run in parallel - agent_cfgs = [AgentConfig(**json.loads(agent_cfg)) for _ in range(steps)] + agent_cfgs = [AgentConfig(**json.loads(agent_cfg)) for _ in steps] for idx in range(len(steps)): agent_cfgs[idx].port += idx with Pool(n_processes) as p: @@ -231,7 +230,7 @@ def run_eval_post_training( per_env_results_last_reward, per_env_results_rewards, eval_cfgs, - agent_cfg=agent_cfg, + agent_cfg=agent_cfgs[0], out=output_path, ) wandb.log_artifact(path, type="file", name="results", aliases=[f"step_{step}"]) diff --git a/src/agents/evaluator_envs.py b/src/agents/evaluator_envs.py index 9aa1c87..ecd7e8a 100644 --- a/src/agents/evaluator_envs.py +++ b/src/agents/evaluator_envs.py @@ -88,7 +88,7 @@ def step(self, action: Act) -> tuple[Obs, float, bool, bool, dict]: return self.translate_obs(obs), reward, success, truncated, info def reset(self, seed: int | None = None, options: dict[str, Any] | None = None) -> tuple[Obs, dict[str, Any]]: - obs, info = self.env.reset(seed=seed, options=options) + obs, info = self.env.reset() return self.translate_obs(obs), info @property @@ -212,11 +212,11 @@ class AgentConfig: def single_eval(env: EvaluatorEnv, agent: Agent, max_steps: int) -> tuple[list[float], list[float], list[float]]: - logging.info(f"Starting evaluation of {env.env.unwrapped.spec.id}") + logging.debug(f"Starting evaluation of {env.env.unwrapped.spec.id}") obs, _ = env.reset(options={}) - logging.info(f"Reset env {env.env.unwrapped.spec.id}") + logging.debug(f"Reset env {env.env.unwrapped.spec.id}") agent.reset(obs, env.language_instruction) - logging.info(f"Reset agent {env.env.unwrapped.spec.id}") + logging.debug(f"Reset agent {env.env.unwrapped.spec.id}") done = False truncated = False step = 0.0 @@ -230,7 +230,7 @@ def single_eval(env: EvaluatorEnv, agent: Agent, max_steps: int) -> tuple[list[f rewards.append(reward) env.reset(options={}) - logging.info( + logging.debug( f"Finished evaluation of {env.env.unwrapped.spec.id} with {step} steps and reward {reward}, success {done}" ) # success, last reward and number of steps @@ -241,22 +241,21 @@ def single_eval(env: EvaluatorEnv, agent: Agent, max_steps: int) -> tuple[list[f def create_env_agent(agent_config: AgentConfig, cfg: EvalConfig, seed: int) -> tuple[EvaluatorEnv, RemoteAgent]: - logging.info(f"retrieving env {cfg.env_id} and agent") + logging.debug(f"retrieving env {cfg.env_id} and agent") if cfg.env_id not in per_process_cache: logging.info(f"env {cfg.env_id} not available, creating new env and agent") env = EvaluatorEnv.make(cfg.env_id, seed=seed, **cfg.env_kwargs) logging.info("done creating env") - agent = RemoteAgent(agent_config.host, agent_config.port, agent_config.model) + agent = RemoteAgent(agent_config.host, agent_config.port, agent_config.agent_name) logging.info("done creating agent") per_process_cache[cfg.env_id] = (env, agent) return per_process_cache[cfg.env_id] -def per_process(args: tuple[int, list[EvalConfig], int, AgentConfig]) -> tuple[float, float, float]: - logging.info(f"Starting process {args}") +def run_episode(args: tuple[int, list[EvalConfig], int, AgentConfig]) -> tuple[float, float, float]: i, cfgs, episodes, agent_cfg = args cfg = cfgs[i // episodes] - env, agent = create_env_agent(agent_cfg, cfg, seed=i) + env, agent = create_env_agent(agent_cfg, cfg, seed=idx) # busy wait for server to finish initialization while not agent.is_initialized(): logging.info("Waiting for agent to initialize...") @@ -273,11 +272,11 @@ def multi_eval( # with process # with Pool(n_processes) as p: # args = [(i, cfgs, episodes, client_cfg) for i in range(len(cfgs) * episodes)] - # single_results = p.map(per_process, args) + # single_results = p.map(run_episode, args) # without process args = [(i, cfgs, episodes, agent_cfg) for i in range(len(cfgs) * episodes)] - single_results = [per_process(arg) for arg in tqdm(args)] + single_results = [run_episode(arg) for arg in tqdm(args)] single_results_last_reward = np.array([(i[0], i[1][-1], i[2]) for i in single_results]) @@ -402,19 +401,16 @@ def run_eval_post_training( episodes: int = 100, n_processes: int | None = None, n_gpus: int = 1, - cmd=None, + python_path: str = "python", ): - if cmd is None: - cmd = ["python"] - - slurm.sbatch( + eval_cmd = shlex.quote( shlex.join( - cmd - + [ + [ "-m", "agents", "run-eval-post-training", - f"--agent-cfg={json.dumps(asdict(agent_cfg))}" f"--episodes={episodes}", + f"--agent-cfg={json.dumps(asdict(agent_cfg))}", + f"--episodes={episodes}", f"--n-processes={n_processes}", f"--eval-cfgs={json.dumps([asdict(cfg) for cfg in eval_cfgs])}", f"--wandb-group={wandb_group.replace(':', '_') if wandb_group else ''}", @@ -424,11 +420,14 @@ def run_eval_post_training( f"--wandb-name={wandb_name}", f"--n-gpus={n_gpus}", f"--steps={json.dumps(checkpoint_steps)}", - f"--run-path={output_path}", + f"--output-path={output_path}", ] - ), + ) ) + python_path += eval_cmd + slurm.sbatch(python_path) + def write_results( results: np.ndarray,