mirror of
https://github.com/saymrwulf/stable-baselines3.git
synced 2026-05-14 20:58:03 +00:00
* Add `has_attr` for `VecEnv` * Add special case for gymnasium<1.0 * Update changelog.rst * Update black version
687 lines
23 KiB
Python
687 lines
23 KiB
Python
import collections
|
|
import functools
|
|
import itertools
|
|
import multiprocessing
|
|
import os
|
|
import warnings
|
|
from typing import Optional
|
|
|
|
import gymnasium as gym
|
|
import numpy as np
|
|
import pytest
|
|
from gymnasium import spaces
|
|
|
|
from stable_baselines3 import PPO
|
|
from stable_baselines3.common.env_util import make_vec_env
|
|
from stable_baselines3.common.monitor import Monitor
|
|
from stable_baselines3.common.vec_env import DummyVecEnv, SubprocVecEnv, VecFrameStack, VecNormalize, VecVideoRecorder
|
|
|
|
try:
|
|
import moviepy # noqa: F401
|
|
|
|
have_moviepy = True
|
|
except ImportError:
|
|
have_moviepy = False
|
|
|
|
N_ENVS = 3
|
|
VEC_ENV_CLASSES = [DummyVecEnv, SubprocVecEnv]
|
|
VEC_ENV_WRAPPERS = [None, VecNormalize, VecFrameStack]
|
|
|
|
|
|
class CustomGymEnv(gym.Env):
|
|
def __init__(self, space, render_mode: str = "rgb_array"):
|
|
"""
|
|
Custom gym environment for testing purposes
|
|
"""
|
|
self.action_space = space
|
|
self.observation_space = space
|
|
self.current_step = 0
|
|
self.ep_length = 4
|
|
self.render_mode = render_mode
|
|
self.current_options: Optional[dict] = None
|
|
|
|
def reset(self, *, seed: Optional[int] = None, options: Optional[dict] = None):
|
|
if seed is not None:
|
|
self.seed(seed)
|
|
self.current_step = 0
|
|
self.current_options = options
|
|
self._choose_next_state()
|
|
return self.state, {}
|
|
|
|
def step(self, action):
|
|
reward = float(np.random.rand())
|
|
self._choose_next_state()
|
|
self.current_step += 1
|
|
terminated = False
|
|
truncated = self.current_step >= self.ep_length
|
|
return self.state, reward, terminated, truncated, {}
|
|
|
|
def _choose_next_state(self):
|
|
self.state = self.observation_space.sample()
|
|
|
|
def render(self):
|
|
if self.render_mode == "rgb_array":
|
|
return np.zeros((4, 4, 3))
|
|
|
|
def seed(self, seed=None):
|
|
if seed is not None:
|
|
np.random.seed(seed)
|
|
self.observation_space.seed(seed)
|
|
|
|
@staticmethod
|
|
def custom_method(dim_0=1, dim_1=1):
|
|
"""
|
|
Dummy method to test call to custom method
|
|
from VecEnv
|
|
|
|
:param dim_0: (int)
|
|
:param dim_1: (int)
|
|
:return: (np.ndarray)
|
|
"""
|
|
return np.ones((dim_0, dim_1))
|
|
|
|
|
|
def test_vecenv_func_checker():
|
|
"""The functions in ``env_fns'' must return distinct instances since we need distinct environments."""
|
|
env = CustomGymEnv(spaces.Box(low=np.zeros(2), high=np.ones(2)))
|
|
|
|
with pytest.raises(ValueError):
|
|
DummyVecEnv([lambda: env for _ in range(N_ENVS)])
|
|
|
|
env.close()
|
|
|
|
|
|
@pytest.mark.parametrize("vec_env_class", VEC_ENV_CLASSES)
|
|
@pytest.mark.parametrize("vec_env_wrapper", VEC_ENV_WRAPPERS)
|
|
def test_vecenv_custom_calls(vec_env_class, vec_env_wrapper):
|
|
"""Test access to methods/attributes of vectorized environments"""
|
|
|
|
def make_env():
|
|
return CustomGymEnv(spaces.Box(low=np.zeros(2), high=np.ones(2)))
|
|
|
|
vec_env = vec_env_class([make_env for _ in range(N_ENVS)])
|
|
|
|
if vec_env_wrapper is not None:
|
|
if vec_env_wrapper == VecFrameStack:
|
|
vec_env = vec_env_wrapper(vec_env, n_stack=2)
|
|
else:
|
|
vec_env = vec_env_wrapper(vec_env)
|
|
|
|
# Test seed method
|
|
vec_env.seed(0)
|
|
|
|
# Test render method call
|
|
array_explicit_mode = vec_env.render(mode="rgb_array")
|
|
# test render without argument (new gym API style)
|
|
array_implicit_mode = vec_env.render()
|
|
assert np.array_equal(array_implicit_mode, array_explicit_mode)
|
|
|
|
# test warning if you try different render mode
|
|
with pytest.warns(UserWarning):
|
|
vec_env.render(mode="something_else")
|
|
|
|
# we need a X server to test the "human" mode (uses OpenCV)
|
|
# vec_env.render(mode="human")
|
|
|
|
# Set a new attribute, on the last wrapper and on the env
|
|
assert not vec_env.has_attr("dummy")
|
|
# Set value for the last wrapper only
|
|
vec_env.set_attr("dummy", 12)
|
|
assert vec_env.get_attr("dummy") == [12] * N_ENVS
|
|
if vec_env_class == DummyVecEnv:
|
|
assert vec_env.envs[0].dummy == 12
|
|
|
|
assert not vec_env.has_attr("dummy2")
|
|
# Set the value on the original env
|
|
# `set_wrapper_attr` doesn't exist before v1.0
|
|
if gym.__version__ > "1":
|
|
vec_env.env_method("set_wrapper_attr", "dummy2", 2)
|
|
assert vec_env.get_attr("dummy2") == [2] * N_ENVS
|
|
if vec_env_class == DummyVecEnv:
|
|
assert vec_env.envs[0].unwrapped.dummy2 == 2
|
|
|
|
env_method_results = vec_env.env_method("custom_method", 1, indices=None, dim_1=2)
|
|
setattr_results = []
|
|
# Set current_step to an arbitrary value
|
|
for env_idx in range(N_ENVS):
|
|
setattr_results.append(vec_env.set_attr("current_step", env_idx, indices=env_idx))
|
|
# Retrieve the value for each environment
|
|
assert vec_env.has_attr("current_step")
|
|
getattr_results = vec_env.get_attr("current_step")
|
|
|
|
assert len(env_method_results) == N_ENVS
|
|
assert len(setattr_results) == N_ENVS
|
|
assert len(getattr_results) == N_ENVS
|
|
|
|
for env_idx in range(N_ENVS):
|
|
assert (env_method_results[env_idx] == np.ones((1, 2))).all()
|
|
assert setattr_results[env_idx] is None
|
|
assert getattr_results[env_idx] == env_idx
|
|
|
|
# Call env_method on a subset of the VecEnv
|
|
env_method_subset = vec_env.env_method("custom_method", 1, indices=[0, 2], dim_1=3)
|
|
assert (env_method_subset[0] == np.ones((1, 3))).all()
|
|
assert (env_method_subset[1] == np.ones((1, 3))).all()
|
|
assert len(env_method_subset) == 2
|
|
|
|
# Test to change value for all the environments
|
|
setattr_result = vec_env.set_attr("current_step", 42, indices=None)
|
|
getattr_result = vec_env.get_attr("current_step")
|
|
assert setattr_result is None
|
|
assert getattr_result == [42 for _ in range(N_ENVS)]
|
|
|
|
# Additional tests for setattr that does not affect all the environments
|
|
vec_env.reset()
|
|
setattr_result = vec_env.set_attr("current_step", 12, indices=[0, 1])
|
|
getattr_result = vec_env.get_attr("current_step")
|
|
getattr_result_subset = vec_env.get_attr("current_step", indices=[0, 1])
|
|
assert setattr_result is None
|
|
assert getattr_result == [12 for _ in range(2)] + [0 for _ in range(N_ENVS - 2)]
|
|
assert getattr_result_subset == [12, 12]
|
|
assert vec_env.get_attr("current_step", indices=[0, 2]) == [12, 0]
|
|
|
|
vec_env.reset()
|
|
# Change value only for first and last environment
|
|
setattr_result = vec_env.set_attr("current_step", 12, indices=[0, -1])
|
|
getattr_result = vec_env.get_attr("current_step")
|
|
assert setattr_result is None
|
|
assert getattr_result == [12] + [0 for _ in range(N_ENVS - 2)] + [12]
|
|
assert vec_env.get_attr("current_step", indices=[-1]) == [12]
|
|
|
|
# Checks that options are correctly passed
|
|
assert vec_env.get_attr("current_options")[0] is None
|
|
# Same options for all envs
|
|
options = {"hello": 1}
|
|
vec_env.set_options(options)
|
|
assert vec_env.get_attr("current_options")[0] is None
|
|
# Only effective at reset
|
|
vec_env.reset()
|
|
assert vec_env.get_attr("current_options") == [options] * N_ENVS
|
|
vec_env.reset()
|
|
# Options are reset
|
|
assert vec_env.get_attr("current_options")[0] is None
|
|
# Use a list of options, different for the first env
|
|
options = [{"hello": 1}] * N_ENVS
|
|
options[0] = {"other_option": 2}
|
|
vec_env.set_options(options)
|
|
vec_env.reset()
|
|
assert vec_env.get_attr("current_options") == options
|
|
|
|
vec_env.close()
|
|
|
|
|
|
class StepEnv(gym.Env):
|
|
def __init__(self, max_steps):
|
|
"""Gym environment for testing that terminal observation is inserted
|
|
correctly."""
|
|
self.action_space = spaces.Discrete(2)
|
|
self.observation_space = spaces.Box(np.array([0]), np.array([999]), dtype="int")
|
|
self.max_steps = max_steps
|
|
self.current_step = 0
|
|
|
|
def reset(self, *, seed: Optional[int] = None, options: Optional[dict] = None):
|
|
self.current_step = 0
|
|
return np.array([self.current_step], dtype="int"), {}
|
|
|
|
def step(self, action):
|
|
prev_step = self.current_step
|
|
self.current_step += 1
|
|
terminated = False
|
|
truncated = self.current_step >= self.max_steps
|
|
return np.array([prev_step], dtype="int"), 0.0, terminated, truncated, {}
|
|
|
|
|
|
@pytest.mark.parametrize("vec_env_class", VEC_ENV_CLASSES)
|
|
@pytest.mark.parametrize("vec_env_wrapper", VEC_ENV_WRAPPERS)
|
|
def test_vecenv_terminal_obs(vec_env_class, vec_env_wrapper):
|
|
"""Test that 'terminal_observation' gets added to info dict upon
|
|
termination."""
|
|
step_nums = [i + 5 for i in range(N_ENVS)]
|
|
vec_env = vec_env_class([functools.partial(StepEnv, n) for n in step_nums])
|
|
|
|
if vec_env_wrapper is not None:
|
|
if vec_env_wrapper == VecFrameStack:
|
|
vec_env = vec_env_wrapper(vec_env, n_stack=2)
|
|
else:
|
|
vec_env = vec_env_wrapper(vec_env)
|
|
|
|
zero_acts = np.zeros((N_ENVS,), dtype="int")
|
|
prev_obs_b = vec_env.reset()
|
|
for step_num in range(1, max(step_nums) + 1):
|
|
obs_b, _, done_b, info_b = vec_env.step(zero_acts)
|
|
assert len(obs_b) == N_ENVS
|
|
assert len(done_b) == N_ENVS
|
|
assert len(info_b) == N_ENVS
|
|
env_iter = zip(prev_obs_b, obs_b, done_b, info_b, step_nums)
|
|
for prev_obs, obs, done, info, final_step_num in env_iter:
|
|
assert done == (step_num == final_step_num)
|
|
if not done:
|
|
assert "terminal_observation" not in info
|
|
else:
|
|
terminal_obs = info["terminal_observation"]
|
|
|
|
# do some rough ordering checks that should work for all
|
|
# wrappers, including VecNormalize
|
|
assert np.all(prev_obs < terminal_obs)
|
|
assert np.all(obs < prev_obs)
|
|
|
|
if not isinstance(vec_env, VecNormalize):
|
|
# more precise tests that we can't do with VecNormalize
|
|
# (which changes observation values)
|
|
assert np.all(prev_obs + 1 == terminal_obs)
|
|
assert np.all(obs == 0)
|
|
|
|
prev_obs_b = obs_b
|
|
|
|
vec_env.close()
|
|
|
|
|
|
SPACES = collections.OrderedDict(
|
|
[
|
|
("discrete", spaces.Discrete(2)),
|
|
("multidiscrete", spaces.MultiDiscrete([2, 3])),
|
|
("multibinary", spaces.MultiBinary(3)),
|
|
("continuous", spaces.Box(low=np.zeros(2), high=np.ones(2))),
|
|
]
|
|
)
|
|
|
|
|
|
def check_vecenv_spaces(vec_env_class, space, obs_assert):
|
|
"""Helper method to check observation spaces in vectorized environments."""
|
|
|
|
def make_env():
|
|
return CustomGymEnv(space)
|
|
|
|
vec_env = vec_env_class([make_env for _ in range(N_ENVS)])
|
|
obs = vec_env.reset()
|
|
obs_assert(obs)
|
|
|
|
dones = [False] * N_ENVS
|
|
while not any(dones):
|
|
actions = [vec_env.action_space.sample() for _ in range(N_ENVS)]
|
|
obs, _rews, dones, _infos = vec_env.step(actions)
|
|
obs_assert(obs)
|
|
vec_env.close()
|
|
|
|
|
|
def check_vecenv_obs(obs, space):
|
|
"""Helper method to check observations from multiple environments each belong to
|
|
the appropriate observation space."""
|
|
assert obs.shape[0] == N_ENVS
|
|
for value in obs:
|
|
assert space.contains(value)
|
|
|
|
|
|
@pytest.mark.parametrize("vec_env_class,space", itertools.product(VEC_ENV_CLASSES, SPACES.values()))
|
|
def test_vecenv_single_space(vec_env_class, space):
|
|
def obs_assert(obs):
|
|
return check_vecenv_obs(obs, space)
|
|
|
|
check_vecenv_spaces(vec_env_class, space, obs_assert)
|
|
|
|
|
|
class _UnorderedDictSpace(spaces.Dict):
|
|
"""Like DictSpace, but returns an unordered dict when sampling."""
|
|
|
|
def sample(self):
|
|
return dict(super().sample())
|
|
|
|
|
|
@pytest.mark.parametrize("vec_env_class", VEC_ENV_CLASSES)
|
|
def test_vecenv_dict_spaces(vec_env_class):
|
|
"""Test dictionary observation spaces with vectorized environments."""
|
|
space = spaces.Dict(SPACES)
|
|
|
|
def obs_assert(obs):
|
|
assert isinstance(obs, dict)
|
|
assert obs.keys() == space.spaces.keys()
|
|
for key, values in obs.items():
|
|
check_vecenv_obs(values, space.spaces[key])
|
|
|
|
check_vecenv_spaces(vec_env_class, space, obs_assert)
|
|
|
|
unordered_space = _UnorderedDictSpace(SPACES)
|
|
# Check that vec_env_class can accept unordered dict observations (and convert to OrderedDict)
|
|
check_vecenv_spaces(vec_env_class, unordered_space, obs_assert)
|
|
|
|
|
|
@pytest.mark.parametrize("vec_env_class", VEC_ENV_CLASSES)
|
|
def test_vecenv_tuple_spaces(vec_env_class):
|
|
"""Test tuple observation spaces with vectorized environments."""
|
|
space = spaces.Tuple(tuple(SPACES.values()))
|
|
|
|
def obs_assert(obs):
|
|
assert isinstance(obs, tuple)
|
|
assert len(obs) == len(space.spaces)
|
|
for values, inner_space in zip(obs, space.spaces):
|
|
check_vecenv_obs(values, inner_space)
|
|
|
|
return check_vecenv_spaces(vec_env_class, space, obs_assert)
|
|
|
|
|
|
def test_subproc_start_method():
|
|
start_methods = [None]
|
|
# Only test thread-safe methods. Others may deadlock tests! (gh/428)
|
|
# Note: adding unsafe `fork` method as we are now using PyTorch
|
|
all_methods = {"forkserver", "spawn", "fork"}
|
|
available_methods = multiprocessing.get_all_start_methods()
|
|
start_methods += list(all_methods.intersection(available_methods))
|
|
space = spaces.Discrete(2)
|
|
|
|
def obs_assert(obs):
|
|
return check_vecenv_obs(obs, space)
|
|
|
|
for start_method in start_methods:
|
|
vec_env_class = functools.partial(SubprocVecEnv, start_method=start_method)
|
|
check_vecenv_spaces(vec_env_class, space, obs_assert)
|
|
|
|
with pytest.raises(ValueError, match="cannot find context for 'illegal_method'"):
|
|
vec_env_class = functools.partial(SubprocVecEnv, start_method="illegal_method")
|
|
check_vecenv_spaces(vec_env_class, space, obs_assert)
|
|
|
|
|
|
class CustomWrapperA(VecNormalize):
|
|
def __init__(self, venv):
|
|
VecNormalize.__init__(self, venv)
|
|
self.var_a = "a"
|
|
|
|
|
|
class CustomWrapperB(VecNormalize):
|
|
def __init__(self, venv):
|
|
VecNormalize.__init__(self, venv)
|
|
self.var_b = "b"
|
|
|
|
def func_b(self):
|
|
return self.var_b
|
|
|
|
def name_test(self):
|
|
return self.__class__
|
|
|
|
|
|
class CustomWrapperBB(CustomWrapperB):
|
|
def __init__(self, venv):
|
|
CustomWrapperB.__init__(self, venv)
|
|
self.var_bb = "bb"
|
|
|
|
|
|
def test_vecenv_wrapper_getattr():
|
|
def make_env():
|
|
return CustomGymEnv(spaces.Box(low=np.zeros(2), high=np.ones(2)))
|
|
|
|
vec_env = DummyVecEnv([make_env for _ in range(N_ENVS)])
|
|
wrapped = CustomWrapperA(CustomWrapperBB(vec_env))
|
|
assert wrapped.var_a == "a"
|
|
assert wrapped.var_b == "b"
|
|
assert wrapped.var_bb == "bb"
|
|
assert wrapped.func_b() == "b"
|
|
assert wrapped.name_test() == CustomWrapperBB
|
|
|
|
double_wrapped = CustomWrapperA(CustomWrapperB(wrapped))
|
|
_ = double_wrapped.var_a # should not raise as it is directly defined here
|
|
with pytest.raises(AttributeError): # should raise due to ambiguity
|
|
_ = double_wrapped.var_b
|
|
with pytest.raises(AttributeError): # should raise as does not exist
|
|
_ = double_wrapped.nonexistent_attribute
|
|
|
|
|
|
def test_framestack_vecenv():
|
|
"""Test that framestack environment stacks on desired axis"""
|
|
|
|
image_space_shape = [12, 8, 3]
|
|
zero_acts = np.zeros([N_ENVS, *image_space_shape])
|
|
|
|
transposed_image_space_shape = image_space_shape[::-1]
|
|
transposed_zero_acts = np.zeros([N_ENVS, *transposed_image_space_shape])
|
|
|
|
def make_image_env():
|
|
return CustomGymEnv(
|
|
spaces.Box(
|
|
low=np.zeros(image_space_shape),
|
|
high=np.ones(image_space_shape) * 255,
|
|
dtype=np.uint8,
|
|
)
|
|
)
|
|
|
|
def make_transposed_image_env():
|
|
return CustomGymEnv(
|
|
spaces.Box(
|
|
low=np.zeros(transposed_image_space_shape),
|
|
high=np.ones(transposed_image_space_shape) * 255,
|
|
dtype=np.uint8,
|
|
)
|
|
)
|
|
|
|
def make_non_image_env():
|
|
return CustomGymEnv(spaces.Box(low=np.zeros((2,)), high=np.ones((2,))))
|
|
|
|
vec_env = DummyVecEnv([make_image_env for _ in range(N_ENVS)])
|
|
vec_env = VecFrameStack(vec_env, n_stack=2)
|
|
obs, _, _, _ = vec_env.step(zero_acts)
|
|
vec_env.close()
|
|
|
|
# Should be stacked on the last dimension
|
|
assert obs.shape[-1] == (image_space_shape[-1] * 2)
|
|
|
|
# Try automatic stacking on first dimension now
|
|
vec_env = DummyVecEnv([make_transposed_image_env for _ in range(N_ENVS)])
|
|
vec_env = VecFrameStack(vec_env, n_stack=2)
|
|
obs, _, _, _ = vec_env.step(transposed_zero_acts)
|
|
vec_env.close()
|
|
|
|
# Should be stacked on the first dimension (note the transposing in make_transposed_image_env)
|
|
assert obs.shape[1] == (image_space_shape[-1] * 2)
|
|
|
|
# Try forcing dimensions
|
|
vec_env = DummyVecEnv([make_image_env for _ in range(N_ENVS)])
|
|
vec_env = VecFrameStack(vec_env, n_stack=2, channels_order="last")
|
|
obs, _, _, _ = vec_env.step(zero_acts)
|
|
vec_env.close()
|
|
|
|
# Should be stacked on the last dimension
|
|
assert obs.shape[-1] == (image_space_shape[-1] * 2)
|
|
|
|
vec_env = DummyVecEnv([make_image_env for _ in range(N_ENVS)])
|
|
vec_env = VecFrameStack(vec_env, n_stack=2, channels_order="first")
|
|
obs, _, _, _ = vec_env.step(zero_acts)
|
|
vec_env.close()
|
|
|
|
# Should be stacked on the first dimension
|
|
assert obs.shape[1] == (image_space_shape[0] * 2)
|
|
|
|
# Test invalid channels_order
|
|
vec_env = DummyVecEnv([make_image_env for _ in range(N_ENVS)])
|
|
with pytest.raises(AssertionError):
|
|
vec_env = VecFrameStack(vec_env, n_stack=2, channels_order="not_valid")
|
|
|
|
# Test that it works with non-image envs when no channels_order is given
|
|
vec_env = DummyVecEnv([make_non_image_env for _ in range(N_ENVS)])
|
|
vec_env = VecFrameStack(vec_env, n_stack=2)
|
|
|
|
|
|
def test_vec_env_is_wrapped():
|
|
# Test is_wrapped call of subproc workers
|
|
def make_env():
|
|
return CustomGymEnv(spaces.Box(low=np.zeros(2), high=np.ones(2)))
|
|
|
|
def make_monitored_env():
|
|
return Monitor(CustomGymEnv(spaces.Box(low=np.zeros(2), high=np.ones(2))))
|
|
|
|
# One with monitor, one without
|
|
vec_env = SubprocVecEnv([make_env, make_monitored_env])
|
|
|
|
assert vec_env.env_is_wrapped(Monitor) == [False, True]
|
|
|
|
vec_env.close()
|
|
|
|
# One with monitor, one without
|
|
vec_env = DummyVecEnv([make_env, make_monitored_env])
|
|
|
|
assert vec_env.env_is_wrapped(Monitor) == [False, True]
|
|
|
|
vec_env = VecFrameStack(vec_env, n_stack=2)
|
|
assert vec_env.env_is_wrapped(Monitor) == [False, True]
|
|
|
|
|
|
@pytest.mark.parametrize("vec_env_class", VEC_ENV_CLASSES)
|
|
def test_vec_deterministic(vec_env_class):
|
|
def make_env():
|
|
env = CustomGymEnv(gym.spaces.Box(low=np.zeros(2), high=np.ones(2)))
|
|
return env
|
|
|
|
vec_env = vec_env_class([make_env for _ in range(N_ENVS)])
|
|
vec_env.seed(3)
|
|
obs = vec_env.reset()
|
|
vec_env.seed(3)
|
|
new_obs = vec_env.reset()
|
|
assert np.allclose(new_obs, obs)
|
|
# Test with VecNormalize (VecEnvWrapper should call self.venv.seed())
|
|
vec_normalize = VecNormalize(vec_env)
|
|
vec_normalize.seed(3)
|
|
obs = vec_env.reset()
|
|
vec_normalize.seed(3)
|
|
new_obs = vec_env.reset()
|
|
assert np.allclose(new_obs, obs)
|
|
vec_normalize.close()
|
|
# Similar test but with make_vec_env
|
|
vec_env_1 = make_vec_env("Pendulum-v1", n_envs=N_ENVS, vec_env_cls=vec_env_class, seed=0)
|
|
vec_env_2 = make_vec_env("Pendulum-v1", n_envs=N_ENVS, vec_env_cls=vec_env_class, seed=0)
|
|
assert np.allclose(vec_env_1.reset(), vec_env_2.reset())
|
|
random_actions = [vec_env_1.action_space.sample() for _ in range(N_ENVS)]
|
|
assert np.allclose(vec_env_1.step(random_actions)[0], vec_env_2.step(random_actions)[0])
|
|
vec_env_1.close()
|
|
vec_env_2.close()
|
|
|
|
|
|
@pytest.mark.parametrize("vec_env_class", VEC_ENV_CLASSES)
|
|
def test_vec_seeding(vec_env_class):
|
|
def make_env():
|
|
return CustomGymEnv(spaces.Box(low=np.zeros(2), high=np.ones(2)))
|
|
|
|
# For SubprocVecEnv check for all starting methods
|
|
start_methods = [None]
|
|
if vec_env_class != DummyVecEnv:
|
|
all_methods = {"forkserver", "spawn", "fork"}
|
|
available_methods = multiprocessing.get_all_start_methods()
|
|
start_methods = list(all_methods.intersection(available_methods))
|
|
|
|
for start_method in start_methods:
|
|
if start_method is not None:
|
|
vec_env_class = functools.partial(SubprocVecEnv, start_method=start_method)
|
|
|
|
n_envs = 3
|
|
vec_env = vec_env_class([make_env] * n_envs)
|
|
# Seed with no argument
|
|
vec_env.seed()
|
|
obs = vec_env.reset()
|
|
_, rewards, _, _ = vec_env.step(np.array([vec_env.action_space.sample() for _ in range(n_envs)]))
|
|
# Seed should be different per process
|
|
assert not np.allclose(obs[0], obs[1])
|
|
assert not np.allclose(rewards[0], rewards[1])
|
|
assert not np.allclose(obs[1], obs[2])
|
|
assert not np.allclose(rewards[1], rewards[2])
|
|
|
|
vec_env.close()
|
|
|
|
|
|
@pytest.mark.parametrize("vec_env_class", VEC_ENV_CLASSES)
|
|
def test_render(vec_env_class):
|
|
# Skip if no X-Server
|
|
if not os.environ.get("DISPLAY"):
|
|
pytest.skip("No X-Server")
|
|
|
|
env_id = "Pendulum-v1"
|
|
# DummyVecEnv human render is currently
|
|
# buggy because of gym:
|
|
# https://github.com/carlosluis/stable-baselines3/pull/3#issuecomment-1356863808
|
|
n_envs = 2
|
|
# Human render
|
|
vec_env = make_vec_env(
|
|
env_id,
|
|
n_envs,
|
|
vec_env_cls=vec_env_class,
|
|
env_kwargs=dict(render_mode="human"),
|
|
)
|
|
|
|
vec_env.reset()
|
|
vec_env.render()
|
|
|
|
with pytest.warns(UserWarning):
|
|
vec_env.render("rgb_array")
|
|
|
|
with pytest.warns(UserWarning):
|
|
vec_env.render(mode="blah")
|
|
|
|
for _ in range(10):
|
|
vec_env.step([vec_env.action_space.sample() for _ in range(n_envs)])
|
|
vec_env.render()
|
|
|
|
vec_env.close()
|
|
# rgb_array render, which allows human_render
|
|
# thanks to OpenCV
|
|
vec_env = make_vec_env(
|
|
env_id,
|
|
n_envs,
|
|
vec_env_cls=vec_env_class,
|
|
env_kwargs=dict(render_mode="rgb_array"),
|
|
)
|
|
|
|
vec_env.reset()
|
|
with warnings.catch_warnings(record=True) as record:
|
|
vec_env.render()
|
|
vec_env.render("rgb_array")
|
|
vec_env.render(mode="human")
|
|
|
|
# No warnings for using human mode
|
|
assert len(record) == 0
|
|
|
|
with pytest.warns(UserWarning):
|
|
vec_env.render(mode="blah")
|
|
|
|
for _ in range(10):
|
|
vec_env.step([vec_env.action_space.sample() for _ in range(n_envs)])
|
|
vec_env.render()
|
|
|
|
# Check that it still works with vec env wrapper
|
|
vec_env = VecFrameStack(vec_env, 2)
|
|
vec_env.render()
|
|
assert vec_env.render_mode == "rgb_array"
|
|
vec_env = VecNormalize(vec_env)
|
|
assert vec_env.render_mode == "rgb_array"
|
|
vec_env.render()
|
|
|
|
vec_env.close()
|
|
|
|
|
|
@pytest.mark.skipif(not have_moviepy, reason="moviepy is not installed")
|
|
def test_video_recorder(tmp_path):
|
|
env_id = "CartPole-v1"
|
|
video_folder = str(tmp_path)
|
|
|
|
vec_env = make_vec_env(env_id, n_envs=1)
|
|
|
|
# Wrap to check unwrapping works
|
|
vec_env = VecNormalize(vec_env)
|
|
|
|
# Record the video starting at the first step
|
|
vec_env = VecVideoRecorder(
|
|
vec_env,
|
|
video_folder,
|
|
record_video_trigger=lambda x: x % 65 == 0,
|
|
video_length=10,
|
|
name_prefix=f"agent-{env_id}",
|
|
)
|
|
|
|
model = PPO("MlpPolicy", vec_env, n_steps=64, n_epochs=1, verbose=0)
|
|
|
|
model.learn(total_timesteps=128)
|
|
|
|
# print all videos in video_folder, should be multiple step 0-100, step 1024-1124
|
|
video_files = list(map(str, tmp_path.glob("*.mp4")))
|
|
video_files.sort(reverse=True)
|
|
|
|
# Clean up
|
|
vec_env.close()
|
|
|
|
assert len(video_files) == 2
|
|
assert "agent-CartPole-v1-step-65-to-step-75.mp4" in video_files[0]
|
|
assert "agent-CartPole-v1-step-0-to-step-10.mp4" in video_files[1]
|