Multi-robot PettingZoo Environment#
- enki_env.parallel_env(scenario: Scenario, config: dict[str, GroupConfig], time_step: float = 0.1, physics_substeps: int = 3, max_duration: float = -1, render_mode: str | None = None, render_fps: float = 10.0, render_kwargs: dict[str, Any] = {}, notebook: bool | None = None, terminate_on: Literal['any', 'all'] | None = 'all', success_info: bool = True, default_success: bool | None = None) ParallelEnv[str, dict[str, ndarray[tuple[Any, ...], dtype[float64]]], ndarray[tuple[Any, ...], dtype[float64]]]#
Helper function that creates a parallel environment, passing all arguments to
enki_env.ParallelEnkiEnv.- Parameters:
scenario – The scenario that generates worlds at
gymnasium.Env.reset().config – The configuration for all groups. Robots with a
pyenki.PhysicalObject.nameequal to the group will be assigned to the group and use its configuration. The group with name “” will catch the remaining robots, if it appears inconfig.time_step – The time step of the simulation [s].
max_duration – The maximum duration of the episodes [s].
physics_substeps – The number of physics sub-steps for each simulation step, see
pyenki.World.step().render_mode – The render mode (one of
None,rgb_arrayorhuman).render_fps – The render fps (only relevant when
render_mode="human".render_kwargs – The render keywords arguments arguments forwarded to
pyenki.viewer.render()when rendering an environment.notebook – Whether to use a notebook-compatible renderer. If
None, it will select it if we are running a notebook.terminate_on – Whether to terminate the episode as soon as the first robot terminates (
"any") or whether to wait for all agents to terminate before removing all of them at once ("all". If set toNone, it will terminate robots independently from each other and remove them from the environment before the episode terminates.success_info – Whether to include key
"is_success"in the final info dictionary for each robot. It will be included only if it has been set by one ofenki_env.GroupConfig.terminationsor ifdefault_successis notNone.default_success – The value associated with
"is_success"in the final info dictionary when, at the end of the episode, the robot has not been yet terminated.
- class enki_env.ParallelEnkiEnvSpec#
Holds the parameters passed to the constructor of
enki_env.ParallelEnkiEnv.
- class enki_env.ParallelEnkiEnv(scenario: Scenario, config: dict[str, GroupConfig], time_step: float = 0.1, physics_substeps: int = 3, max_duration: float = -1, render_mode: str | None = None, render_fps: float = 10.0, render_kwargs: dict[str, Any] = {}, notebook: bool | None = None, terminate_on: Literal['any', 'all'] | None = 'all', success_info: bool = True, default_success: bool | None = None)#
Bases:
ParallelEnv[str,dict[str,ndarray[tuple[Any, …],dtype[float64]]],ndarray[tuple[Any, …],dtype[float64]]]A
pettingzoo.utils.env.ParallelEnvthat exposes robots in apyenki.World.Observations, rewards, and information returned by
gymnasium.Env.reset()andgymnasium.Env.step(), are generated from the robot sensors and internal state usingenki_env.GroupConfig.observation,enki_env.GroupConfig.reward, andenki_env.GroupConfig.info. Termination criteria are specifiedenki_env.GroupConfig.terminations. Actions are actuated according toenki_env.GroupConfig.action.Rendering is performed:
by a
pyenki.viewer.WorldViewifrender_mode="human"and we are not running in a Jupyter notebook.by a
pyenki.buffer.EnkiRemoteFrameBufferifrender_mode="human"and we are running in a Jupyter notebook.by
pyenki.viewer.render()ifrender_mode="rgb_array".
To create an environment, you need to first
define a scenario with a least one robot, e.g.
import pyenki import enki_env class MyScenario(enki_env.BaseScenario): def init(self, world: pyenki.World) -> None: thymio = pyenki.Thymio2() world.add_object(thymio) epuck = pyenki.EPuck() x = world.random_generator.uniform(10, 20) epuck.position = (x, 0) world.add_object(epuck) return world
define a configuration, e.g., the default configuration associated with the groups
configs = {'thymio': enki_env.ThymioConfig(), 'e-puck': enki_env.EPuckConfig()}
Then, you can call the factory function, customizing the other parameters as you see fit
env = enki_env.parallel_env(MyScenario(), configs, max_duration=10)
- __init__(scenario: Scenario, config: dict[str, GroupConfig], time_step: float = 0.1, physics_substeps: int = 3, max_duration: float = -1, render_mode: str | None = None, render_fps: float = 10.0, render_kwargs: dict[str, Any] = {}, notebook: bool | None = None, terminate_on: Literal['any', 'all'] | None = 'all', success_info: bool = True, default_success: bool | None = None)#
Constructs a new instance.
- Parameters:
scenario – The scenario that generates worlds at
gymnasium.Env.reset().config – The configuration for all groups. Robots with a
pyenki.PhysicalObject.nameequal to the group will be assigned to the group and use its configuration. The group with name “” will catch the remaining robots, if it appears inconfig.time_step – The time step of the simulation [s].
max_duration – The maximum duration of the episodes [s].
physics_substeps – The number of physics sub-steps for each simulation step, see
pyenki.World.step().render_mode – The render mode (one of
None,rgb_arrayorhuman).render_fps – The render fps (only relevant when
render_mode="human".render_kwargs – The render keywords arguments arguments forwarded to
pyenki.viewer.render()when rendering an environment.notebook – Whether to use a notebook-compatible renderer. If
None, it will select it if we are running a notebook.terminate_on – Whether to terminate the episode as soon as the first robot terminates (
"any") or whether to wait for all agents to terminate before removing all of them at once ("all". If set toNone, it will terminate robots independently from each other and remove them from the environment before the episode terminates.success_info – Whether to include key
"is_success"in the final info dictionary for each robot. It will be included only if it has been set by one ofenki_env.GroupConfig.terminationsor ifdefault_successis notNone.default_success – The value associated with
"is_success"in the final info dictionary when, at the end of the episode, the robot has not been yet terminated.
- display_in_notebook() None#
Displays the environment in a notebook using a an interactive
pyenki.buffer.EnkiRemoteFrameBuffer.Requires
render_mode="human"and a notebook.
- make_world(policies: dict[str, Predictor] = {}, seed: int = 0, deterministic: bool = True, cutoff: float = 0) pyenki.World#
Generates a world using the scenario and assign a policy to the robot controllers for each group specified in
policies.- Parameters:
policies – The policies assigned to groups.
seed – The random seed.
deterministic – Whether to evaluate the policy deterministically.
cutoff – When the absolute value of actions is below this threshold, they will be set to zero.
- Returns:
The world
- rollout(policies: Mapping[str, TypeAliasForwardRef('Predictor')] = {}, max_steps: int = -1, seed: int = 0, deterministic: bool = True, cutoff: float = 0) dict[str, Rollout]#
Performs a rollout of an episode
- Parameters:
policies – The policies assigned to groups. If a group misses a policy, it will randomly generate actions.
max_steps – The maximum number of steps to perform.
seed – The random seed.
deterministic – Whether to evaluate the policies deterministically.
cutoff – When the absolute value of actions is below this threshold, they will be set to zero.
- Returns:
A dictionary, keyed by group, with the data collected during the rollout.
- property spec: ParallelEnkiEnvSpec#
The parameters used to construct the environment.