Multi-robot PettingZoo Environment#

enki_env.parallel_env(scenario: Scenario, config: dict[str, GroupConfig], time_step: float = 0.1, physics_substeps: int = 3, max_duration: float = -1, render_mode: str | None = None, render_fps: float = 10.0, render_kwargs: dict[str, Any] = {}, notebook: bool | None = None, terminate_on: Literal['any', 'all'] | None = 'all', success_info: bool = True, default_success: bool | None = None) ParallelEnv[str, dict[str, ndarray[tuple[Any, ...], dtype[float64]]], ndarray[tuple[Any, ...], dtype[float64]]]#

Helper function that creates a parallel environment, passing all arguments to enki_env.ParallelEnkiEnv.

Parameters:
  • scenario – The scenario that generates worlds at gymnasium.Env.reset().

  • config – The configuration for all groups. Robots with a pyenki.PhysicalObject.name equal to the group will be assigned to the group and use its configuration. The group with name “” will catch the remaining robots, if it appears in config.

  • time_step – The time step of the simulation [s].

  • max_duration – The maximum duration of the episodes [s].

  • physics_substeps – The number of physics sub-steps for each simulation step, see pyenki.World.step().

  • render_mode – The render mode (one of None, rgb_array or human).

  • render_fps – The render fps (only relevant when render_mode="human".

  • render_kwargs – The render keywords arguments arguments forwarded to pyenki.viewer.render() when rendering an environment.

  • notebook – Whether to use a notebook-compatible renderer. If None, it will select it if we are running a notebook.

  • terminate_on – Whether to terminate the episode as soon as the first robot terminates ("any") or whether to wait for all agents to terminate before removing all of them at once ("all". If set to None, it will terminate robots independently from each other and remove them from the environment before the episode terminates.

  • success_info – Whether to include key "is_success" in the final info dictionary for each robot. It will be included only if it has been set by one of enki_env.GroupConfig.terminations or if default_success is not None.

  • default_success – The value associated with "is_success" in the final info dictionary when, at the end of the episode, the robot has not been yet terminated.

class enki_env.ParallelEnkiEnvSpec#

Holds the parameters passed to the constructor of enki_env.ParallelEnkiEnv.

class enki_env.ParallelEnkiEnv(scenario: Scenario, config: dict[str, GroupConfig], time_step: float = 0.1, physics_substeps: int = 3, max_duration: float = -1, render_mode: str | None = None, render_fps: float = 10.0, render_kwargs: dict[str, Any] = {}, notebook: bool | None = None, terminate_on: Literal['any', 'all'] | None = 'all', success_info: bool = True, default_success: bool | None = None)#

Bases: ParallelEnv[str, dict[str, ndarray[tuple[Any, …], dtype[float64]]], ndarray[tuple[Any, …], dtype[float64]]]

A pettingzoo.utils.env.ParallelEnv that exposes robots in a pyenki.World.

Observations, rewards, and information returned by gymnasium.Env.reset() and gymnasium.Env.step(), are generated from the robot sensors and internal state using enki_env.GroupConfig.observation, enki_env.GroupConfig.reward, and enki_env.GroupConfig.info. Termination criteria are specified enki_env.GroupConfig.terminations. Actions are actuated according to enki_env.GroupConfig.action.

Rendering is performed:

To create an environment, you need to first

  1. define a scenario with a least one robot, e.g.

    import pyenki
    import enki_env
    
    class MyScenario(enki_env.BaseScenario):
    
        def init(self, world: pyenki.World) -> None:
            thymio = pyenki.Thymio2()
            world.add_object(thymio)
            epuck = pyenki.EPuck()
            x = world.random_generator.uniform(10, 20)
            epuck.position = (x, 0)
            world.add_object(epuck)
            return world
    
  2. define a configuration, e.g., the default configuration associated with the groups

    configs = {'thymio': enki_env.ThymioConfig(),
               'e-puck': enki_env.EPuckConfig()}
    

Then, you can call the factory function, customizing the other parameters as you see fit

env = enki_env.parallel_env(MyScenario(), configs, max_duration=10)
__init__(scenario: Scenario, config: dict[str, GroupConfig], time_step: float = 0.1, physics_substeps: int = 3, max_duration: float = -1, render_mode: str | None = None, render_fps: float = 10.0, render_kwargs: dict[str, Any] = {}, notebook: bool | None = None, terminate_on: Literal['any', 'all'] | None = 'all', success_info: bool = True, default_success: bool | None = None)#

Constructs a new instance.

Parameters:
  • scenario – The scenario that generates worlds at gymnasium.Env.reset().

  • config – The configuration for all groups. Robots with a pyenki.PhysicalObject.name equal to the group will be assigned to the group and use its configuration. The group with name “” will catch the remaining robots, if it appears in config.

  • time_step – The time step of the simulation [s].

  • max_duration – The maximum duration of the episodes [s].

  • physics_substeps – The number of physics sub-steps for each simulation step, see pyenki.World.step().

  • render_mode – The render mode (one of None, rgb_array or human).

  • render_fps – The render fps (only relevant when render_mode="human".

  • render_kwargs – The render keywords arguments arguments forwarded to pyenki.viewer.render() when rendering an environment.

  • notebook – Whether to use a notebook-compatible renderer. If None, it will select it if we are running a notebook.

  • terminate_on – Whether to terminate the episode as soon as the first robot terminates ("any") or whether to wait for all agents to terminate before removing all of them at once ("all". If set to None, it will terminate robots independently from each other and remove them from the environment before the episode terminates.

  • success_info – Whether to include key "is_success" in the final info dictionary for each robot. It will be included only if it has been set by one of enki_env.GroupConfig.terminations or if default_success is not None.

  • default_success – The value associated with "is_success" in the final info dictionary when, at the end of the episode, the robot has not been yet terminated.

display_in_notebook() None#

Displays the environment in a notebook using a an interactive pyenki.buffer.EnkiRemoteFrameBuffer.

Requires render_mode="human" and a notebook.

property group_map: dict[str, list[str]]#

The names of the robots belonging to each group.

make_world(policies: dict[str, Predictor] = {}, seed: int = 0, deterministic: bool = True, cutoff: float = 0) pyenki.World#

Generates a world using the scenario and assign a policy to the robot controllers for each group specified in policies.

Parameters:
  • policies – The policies assigned to groups.

  • seed – The random seed.

  • deterministic – Whether to evaluate the policy deterministically.

  • cutoff – When the absolute value of actions is below this threshold, they will be set to zero.

Returns:

The world

rollout(policies: Mapping[str, TypeAliasForwardRef('Predictor')] = {}, max_steps: int = -1, seed: int = 0, deterministic: bool = True, cutoff: float = 0) dict[str, Rollout]#

Performs a rollout of an episode

Parameters:
  • policies – The policies assigned to groups. If a group misses a policy, it will randomly generate actions.

  • max_steps – The maximum number of steps to perform.

  • seed – The random seed.

  • deterministic – Whether to evaluate the policies deterministically.

  • cutoff – When the absolute value of actions is below this threshold, they will be set to zero.

Returns:

A dictionary, keyed by group, with the data collected during the rollout.

property spec: ParallelEnkiEnvSpec#

The parameters used to construct the environment.