2020-04-21 18:41:58 +00:00
|
|
|
import numpy as np
|
|
|
|
|
from gym import spaces
|
|
|
|
|
|
2020-05-05 13:02:35 +00:00
|
|
|
from stable_baselines3.common.preprocessing import is_image_space
|
2020-08-05 10:12:02 +00:00
|
|
|
from stable_baselines3.common.vec_env.base_vec_env import VecEnv, VecEnvStepReturn, VecEnvWrapper
|
2020-04-23 12:56:05 +00:00
|
|
|
|
|
|
|
|
|
2020-04-21 18:41:58 +00:00
|
|
|
class VecTransposeImage(VecEnvWrapper):
|
|
|
|
|
"""
|
2020-06-09 11:54:18 +00:00
|
|
|
Re-order channels, from HxWxC to CxHxW.
|
2020-04-23 12:56:05 +00:00
|
|
|
It is required for PyTorch convolution layers.
|
|
|
|
|
|
2020-10-02 17:05:55 +00:00
|
|
|
:param venv:
|
2020-04-21 18:41:58 +00:00
|
|
|
"""
|
2020-04-23 13:18:21 +00:00
|
|
|
|
2020-04-21 18:41:58 +00:00
|
|
|
def __init__(self, venv: VecEnv):
|
2020-07-16 14:12:16 +00:00
|
|
|
assert is_image_space(venv.observation_space), "The observation space must be an image"
|
2020-04-21 18:41:58 +00:00
|
|
|
|
|
|
|
|
observation_space = self.transpose_space(venv.observation_space)
|
|
|
|
|
super(VecTransposeImage, self).__init__(venv, observation_space=observation_space)
|
|
|
|
|
|
|
|
|
|
@staticmethod
|
|
|
|
|
def transpose_space(observation_space: spaces.Box) -> spaces.Box:
|
2020-04-23 12:56:05 +00:00
|
|
|
"""
|
|
|
|
|
Transpose an observation space (re-order channels).
|
|
|
|
|
|
2020-10-02 17:05:55 +00:00
|
|
|
:param observation_space:
|
|
|
|
|
:return:
|
2020-04-23 12:56:05 +00:00
|
|
|
"""
|
2020-07-16 14:12:16 +00:00
|
|
|
assert is_image_space(observation_space), "The observation space must be an image"
|
2020-04-21 18:41:58 +00:00
|
|
|
width, height, channels = observation_space.shape
|
|
|
|
|
new_shape = (channels, width, height)
|
|
|
|
|
return spaces.Box(low=0, high=255, shape=new_shape, dtype=observation_space.dtype)
|
|
|
|
|
|
|
|
|
|
@staticmethod
|
|
|
|
|
def transpose_image(image: np.ndarray) -> np.ndarray:
|
2020-04-23 12:56:05 +00:00
|
|
|
"""
|
|
|
|
|
Transpose an image or batch of images (re-order channels).
|
|
|
|
|
|
2020-10-02 17:05:55 +00:00
|
|
|
:param image:
|
|
|
|
|
:return:
|
2020-04-23 12:56:05 +00:00
|
|
|
"""
|
2020-04-21 19:06:07 +00:00
|
|
|
if len(image.shape) == 3:
|
|
|
|
|
return np.transpose(image, (2, 0, 1))
|
2020-04-21 18:41:58 +00:00
|
|
|
return np.transpose(image, (0, 3, 1, 2))
|
|
|
|
|
|
2020-08-05 10:12:02 +00:00
|
|
|
def step_wait(self) -> VecEnvStepReturn:
|
2020-04-21 18:41:58 +00:00
|
|
|
observations, rewards, dones, infos = self.venv.step_wait()
|
2021-02-27 16:33:50 +00:00
|
|
|
|
|
|
|
|
# Transpose the terminal observations
|
|
|
|
|
for idx, done in enumerate(dones):
|
|
|
|
|
if not done:
|
|
|
|
|
continue
|
2021-04-13 16:09:31 +00:00
|
|
|
if "terminal_observation" in infos[idx]:
|
|
|
|
|
infos[idx]["terminal_observation"] = self.transpose_image(infos[idx]["terminal_observation"])
|
2021-02-27 16:33:50 +00:00
|
|
|
|
2020-04-21 18:41:58 +00:00
|
|
|
return self.transpose_image(observations), rewards, dones, infos
|
|
|
|
|
|
|
|
|
|
def reset(self) -> np.ndarray:
|
|
|
|
|
"""
|
|
|
|
|
Reset all environments
|
|
|
|
|
"""
|
|
|
|
|
return self.transpose_image(self.venv.reset())
|
|
|
|
|
|
|
|
|
|
def close(self) -> None:
|
|
|
|
|
self.venv.close()
|