{ "cells": [ { "cell_type": "code", "execution_count": null, "metadata": { "collapsed": false }, "outputs": [], "source": [ "# For tips on running notebooks in Google Colab, see\n# https://codelin.vip/beginner/colab\n%matplotlib inline" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "Pendulum: Writing your environment and transforms with TorchRL\n==============================================================\n\n**Author**: [Vincent Moens](https://github.com/vmoens)\n\nCreating an environment (a simulator or an interface to a physical\ncontrol system) is an integrative part of reinforcement learning and\ncontrol engineering.\n\nTorchRL provides a set of tools to do this in multiple contexts. This\ntutorial demonstrates how to use PyTorch and TorchRL code a pendulum\nsimulator from the ground up. It is freely inspired by the Pendulum-v1\nimplementation from [OpenAI-Gym/Farama-Gymnasium control\nlibrary](https://github.com/Farama-Foundation/Gymnasium).\n\n![Simple\nPendulum](https://pytorch.org/tutorials/_static/img/pendulum.gif){.align-center}\n\nKey learnings:\n\n- How to design an environment in TorchRL:\n\n - Writing specs (input, observation and reward);\n - Implementing behavior: seeding, reset and step.\n\n- Transforming your environment inputs and outputs, and writing your\n own transforms;\n\n- How to use `~tensordict.TensorDict`{.interpreted-text role=\"class\"}\n to carry arbitrary data structures through the `codebase`.\n\n In the process, we will touch three crucial components of TorchRL:\n\n- [environments](https://pytorch.org/rl/stable/reference/envs.html)\n\n- [transforms](https://pytorch.org/rl/stable/reference/envs.html#transforms)\n\n- [models (policy and value\n function)](https://pytorch.org/rl/stable/reference/modules.html)\n" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "To give a sense of what can be achieved with TorchRL\\'s environments, we\nwill be designing a *stateless* environment. While stateful environments\nkeep track of the latest physical state encountered and rely on this to\nsimulate the state-to-state transition, stateless environments expect\nthe current state to be provided to them at each step, along with the\naction undertaken. TorchRL supports both types of environments, but\nstateless environments are more generic and hence cover a broader range\nof features of the environment API in TorchRL.\n\nModeling stateless environments gives users full control over the input\nand outputs of the simulator: one can reset an experiment at any stage\nor actively modify the dynamics from the outside. However, it assumes\nthat we have some control over a task, which may not always be the case:\nsolving a problem where we cannot control the current state is more\nchallenging but has a much wider set of applications.\n\nAnother advantage of stateless environments is that they can enable\nbatched execution of transition simulations. If the backend and the\nimplementation allow it, an algebraic operation can be executed\nseamlessly on scalars, vectors, or tensors. This tutorial gives such\nexamples.\n\nThis tutorial will be structured as follows:\n\n- We will first get acquainted with the environment properties: its\n shape (`batch_size`), its methods (mainly\n `~torchrl.envs.EnvBase.step`{.interpreted-text role=\"meth\"},\n `~torchrl.envs.EnvBase.reset`{.interpreted-text role=\"meth\"} and\n `~torchrl.envs.EnvBase.set_seed`{.interpreted-text role=\"meth\"}) and\n finally its specs.\n- After having coded our simulator, we will demonstrate how it can be\n used during training with transforms.\n- We will explore new avenues that follow from the TorchRL\\'s API,\n including: the possibility of transforming inputs, the vectorized\n execution of the simulation and the possibility of backpropagation\n through the simulation graph.\n- Finally, we will train a simple policy to solve the system we\n implemented.\n" ] }, { "cell_type": "code", "execution_count": null, "metadata": { "collapsed": false }, "outputs": [], "source": [ "from collections import defaultdict\nfrom typing import Optional\n\nimport numpy as np\nimport torch\nimport tqdm\nfrom tensordict import TensorDict, TensorDictBase\nfrom tensordict.nn import TensorDictModule\nfrom torch import nn\n\nfrom torchrl.data import BoundedTensorSpec, CompositeSpec, UnboundedContinuousTensorSpec\nfrom torchrl.envs import (\n CatTensors,\n EnvBase,\n Transform,\n TransformedEnv,\n UnsqueezeTransform,\n)\nfrom torchrl.envs.transforms.transforms import _apply_to_composite\nfrom torchrl.envs.utils import check_env_specs, step_mdp\n\nDEFAULT_X = np.pi\nDEFAULT_Y = 1.0" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "There are four things you must take care of when designing a new\nenvironment class:\n\n- `EnvBase._reset`{.interpreted-text role=\"meth\"}, which codes for the\n resetting of the simulator at a (potentially random) initial state;\n- `EnvBase._step`{.interpreted-text role=\"meth\"} which codes for the\n state transition dynamic;\n- `EnvBase._set_seed`{.interpreted-text role=\"meth\"}\\` which\n implements the seeding mechanism;\n- the environment specs.\n\nLet us first describe the problem at hand: we would like to model a\nsimple pendulum over which we can control the torque applied on its\nfixed point. Our goal is to place the pendulum in upward position\n(angular position at 0 by convention) and having it standing still in\nthat position. To design our dynamic system, we need to define two\nequations: the motion equation following an action (the torque applied)\nand the reward equation that will constitute our objective function.\n\nFor the motion equation, we will update the angular velocity following:\n\n$$\\dot{\\theta}_{t+1} = \\dot{\\theta}_t + (3 * g / (2 * L) * \\sin(\\theta_t) + 3 / (m * L^2) * u) * dt$$\n\nwhere $\\dot{\\theta}$ is the angular velocity in rad/sec, $g$ is the\ngravitational force, $L$ is the pendulum length, $m$ is its mass,\n$\\theta$ is its angular position and $u$ is the torque. The angular\nposition is then updated according to\n\n$$\\theta_{t+1} = \\theta_{t} + \\dot{\\theta}_{t+1} dt$$\n\nWe define our reward as\n\n$$r = -(\\theta^2 + 0.1 * \\dot{\\theta}^2 + 0.001 * u^2)$$\n\nwhich will be maximized when the angle is close to 0 (pendulum in upward\nposition), the angular velocity is close to 0 (no motion) and the torque\nis 0 too.\n\nCoding the effect of an action: `~torchrl.envs.EnvBase._step`{.interpreted-text role=\"func\"}\n============================================================================================\n\nThe step method is the first thing to consider, as it will encode the\nsimulation that is of interest to us. In TorchRL, the\n`~torchrl.envs.EnvBase`{.interpreted-text role=\"class\"} class has a\n`EnvBase.step`{.interpreted-text role=\"meth\"} method that receives a\n`tensordict.TensorDict`{.interpreted-text role=\"class\"} instance with an\n`\"action\"` entry indicating what action is to be taken.\n\nTo facilitate the reading and writing from that `tensordict` and to make\nsure that the keys are consistent with what\\'s expected from the\nlibrary, the simulation part has been delegated to a private abstract\nmethod `_step`{.interpreted-text role=\"meth\"} which reads input data\nfrom a `tensordict`, and writes a *new* `tensordict` with the output\ndata.\n\nThe `_step`{.interpreted-text role=\"func\"} method should do the\nfollowing:\n\n> 1. Read the input keys (such as `\"action\"`) and execute the\n> simulation based on these;\n> 2. Retrieve observations, done state and reward;\n> 3. Write the set of observation values along with the reward and done\n> state at the corresponding entries in a new\n> `TensorDict`{.interpreted-text role=\"class\"}.\n\nNext, the `~torchrl.envs.EnvBase.step`{.interpreted-text role=\"meth\"}\nmethod will merge the output of\n`~torchrl.envs.EnvBase.step`{.interpreted-text role=\"meth\"} in the input\n`tensordict` to enforce input/output consistency.\n\nTypically, for stateful environments, this will look like this:\n\n``` {.}\n>>> policy(env.reset())\n>>> print(tensordict)\nTensorDict(\n fields={\n action: Tensor(shape=torch.Size([1]), device=cpu, dtype=torch.float32, is_shared=False),\n done: Tensor(shape=torch.Size([1]), device=cpu, dtype=torch.bool, is_shared=False),\n observation: Tensor(shape=torch.Size([]), device=cpu, dtype=torch.float32, is_shared=False)},\n batch_size=torch.Size([]),\n device=cpu,\n is_shared=False)\n>>> env.step(tensordict)\n>>> print(tensordict)\nTensorDict(\n fields={\n action: Tensor(shape=torch.Size([1]), device=cpu, dtype=torch.float32, is_shared=False),\n done: Tensor(shape=torch.Size([1]), device=cpu, dtype=torch.bool, is_shared=False),\n next: TensorDict(\n fields={\n done: Tensor(shape=torch.Size([1]), device=cpu, dtype=torch.bool, is_shared=False),\n observation: Tensor(shape=torch.Size([]), device=cpu, dtype=torch.float32, is_shared=False),\n reward: Tensor(shape=torch.Size([1]), device=cpu, dtype=torch.float32, is_shared=False)},\n batch_size=torch.Size([]),\n device=cpu,\n is_shared=False),\n observation: Tensor(shape=torch.Size([]), device=cpu, dtype=torch.float32, is_shared=False)},\n batch_size=torch.Size([]),\n device=cpu,\n is_shared=False)\n```\n\nNotice that the root `tensordict` has not changed, the only modification\nis the appearance of a new `\"next\"` entry that contains the new\ninformation.\n\nIn the Pendulum example, our `_step`{.interpreted-text role=\"meth\"}\nmethod will read the relevant entries from the input `tensordict` and\ncompute the position and velocity of the pendulum after the force\nencoded by the `\"action\"` key has been applied onto it. We compute the\nnew angular position of the pendulum `\"new_th\"` as the result of the\nprevious position `\"th\"` plus the new velocity `\"new_thdot\"` over a time\ninterval `dt`.\n\nSince our goal is to turn the pendulum up and maintain it still in that\nposition, our `cost` (negative reward) function is lower for positions\nclose to the target and low speeds. Indeed, we want to discourage\npositions that are far from being \\\"upward\\\" and/or speeds that are far\nfrom 0.\n\nIn our example, `EnvBase._step`{.interpreted-text role=\"meth\"} is\nencoded as a static method since our environment is stateless. In\nstateful settings, the `self` argument is needed as the state needs to\nbe read from the environment.\n" ] }, { "cell_type": "code", "execution_count": null, "metadata": { "collapsed": false }, "outputs": [], "source": [ "def _step(tensordict):\n th, thdot = tensordict[\"th\"], tensordict[\"thdot\"] # th := theta\n\n g_force = tensordict[\"params\", \"g\"]\n mass = tensordict[\"params\", \"m\"]\n length = tensordict[\"params\", \"l\"]\n dt = tensordict[\"params\", \"dt\"]\n u = tensordict[\"action\"].squeeze(-1)\n u = u.clamp(-tensordict[\"params\", \"max_torque\"], tensordict[\"params\", \"max_torque\"])\n costs = angle_normalize(th) ** 2 + 0.1 * thdot**2 + 0.001 * (u**2)\n\n new_thdot = (\n thdot\n + (3 * g_force / (2 * length) * th.sin() + 3.0 / (mass * length**2) * u) * dt\n )\n new_thdot = new_thdot.clamp(\n -tensordict[\"params\", \"max_speed\"], tensordict[\"params\", \"max_speed\"]\n )\n new_th = th + new_thdot * dt\n reward = -costs.view(*tensordict.shape, 1)\n done = torch.zeros_like(reward, dtype=torch.bool)\n out = TensorDict(\n {\n \"th\": new_th,\n \"thdot\": new_thdot,\n \"params\": tensordict[\"params\"],\n \"reward\": reward,\n \"done\": done,\n },\n tensordict.shape,\n )\n return out\n\n\ndef angle_normalize(x):\n return ((x + torch.pi) % (2 * torch.pi)) - torch.pi" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "Resetting the simulator: `~torchrl.envs.EnvBase._reset`{.interpreted-text role=\"func\"}\n======================================================================================\n\nThe second method we need to care about is the\n`~torchrl.envs.EnvBase._reset`{.interpreted-text role=\"meth\"} method.\nLike `~torchrl.envs.EnvBase._step`{.interpreted-text role=\"meth\"}, it\nshould write the observation entries and possibly a done state in the\n`tensordict` it outputs (if the done state is omitted, it will be filled\nas `False` by the parent method\n`~torchrl.envs.EnvBase.reset`{.interpreted-text role=\"meth\"}). In some\ncontexts, it is required that the `_reset` method receives a command\nfrom the function that called it (for example, in multi-agent settings\nwe may want to indicate which agents need to be reset). This is why the\n`~torchrl.envs.EnvBase._reset`{.interpreted-text role=\"meth\"} method\nalso expects a `tensordict` as input, albeit it may perfectly be empty\nor `None`.\n\nThe parent `EnvBase.reset`{.interpreted-text role=\"meth\"} does some\nsimple checks like the `EnvBase.step`{.interpreted-text role=\"meth\"}\ndoes, such as making sure that a `\"done\"` state is returned in the\noutput `tensordict` and that the shapes match what is expected from the\nspecs.\n\nFor us, the only important thing to consider is whether\n`EnvBase._reset`{.interpreted-text role=\"meth\"} contains all the\nexpected observations. Once more, since we are working with a stateless\nenvironment, we pass the configuration of the pendulum in a nested\n`tensordict` named `\"params\"`.\n\nIn this example, we do not pass a done state as this is not mandatory\nfor `_reset`{.interpreted-text role=\"meth\"} and our environment is\nnon-terminating, so we always expect it to be `False`.\n" ] }, { "cell_type": "code", "execution_count": null, "metadata": { "collapsed": false }, "outputs": [], "source": [ "def _reset(self, tensordict):\n if tensordict is None or tensordict.is_empty():\n # if no ``tensordict`` is passed, we generate a single set of hyperparameters\n # Otherwise, we assume that the input ``tensordict`` contains all the relevant\n # parameters to get started.\n tensordict = self.gen_params(batch_size=self.batch_size)\n\n high_th = torch.tensor(DEFAULT_X, device=self.device)\n high_thdot = torch.tensor(DEFAULT_Y, device=self.device)\n low_th = -high_th\n low_thdot = -high_thdot\n\n # for non batch-locked environments, the input ``tensordict`` shape dictates the number\n # of simulators run simultaneously. In other contexts, the initial\n # random state's shape will depend upon the environment batch-size instead.\n th = (\n torch.rand(tensordict.shape, generator=self.rng, device=self.device)\n * (high_th - low_th)\n + low_th\n )\n thdot = (\n torch.rand(tensordict.shape, generator=self.rng, device=self.device)\n * (high_thdot - low_thdot)\n + low_thdot\n )\n out = TensorDict(\n {\n \"th\": th,\n \"thdot\": thdot,\n \"params\": tensordict[\"params\"],\n },\n batch_size=tensordict.shape,\n )\n return out" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "Environment metadata: `env.*_spec`\n==================================\n\nThe specs define the input and output domain of the environment. It is\nimportant that the specs accurately define the tensors that will be\nreceived at runtime, as they are often used to carry information about\nenvironments in multiprocessing and distributed settings. They can also\nbe used to instantiate lazily defined neural networks and test scripts\nwithout actually querying the environment (which can be costly with\nreal-world physical systems for instance).\n\nThere are four specs that we must code in our environment:\n\n- `EnvBase.observation_spec`{.interpreted-text role=\"obj\"}: This will\n be a `~torchrl.data.CompositeSpec`{.interpreted-text role=\"class\"}\n instance where each key is an observation (a\n `CompositeSpec`{.interpreted-text role=\"class\"} can be viewed as a\n dictionary of specs).\n- `EnvBase.action_spec`{.interpreted-text role=\"obj\"}: It can be any\n type of spec, but it is required that it corresponds to the\n `\"action\"` entry in the input `tensordict`;\n- `EnvBase.reward_spec`{.interpreted-text role=\"obj\"}: provides\n information about the reward space;\n- `EnvBase.done_spec`{.interpreted-text role=\"obj\"}: provides\n information about the space of the done flag.\n\nTorchRL specs are organized in two general containers: `input_spec`\nwhich contains the specs of the information that the step function reads\n(divided between `action_spec` containing the action and `state_spec`\ncontaining all the rest), and `output_spec` which encodes the specs that\nthe step outputs (`observation_spec`, `reward_spec` and `done_spec`). In\ngeneral, you should not interact directly with `output_spec` and\n`input_spec` but only with their content: `observation_spec`,\n`reward_spec`, `done_spec`, `action_spec` and `state_spec`. The reason\nif that the specs are organized in a non-trivial way within\n`output_spec` and `input_spec` and neither of these should be directly\nmodified.\n\nIn other words, the `observation_spec` and related properties are\nconvenient shortcuts to the content of the output and input spec\ncontainers.\n\nTorchRL offers multiple `~torchrl.data.TensorSpec`{.interpreted-text\nrole=\"class\"}\n[subclasses](https://pytorch.org/rl/stable/reference/data.html#tensorspec)\nto encode the environment\\'s input and output characteristics.\n\nSpecs shape\n-----------\n\nThe environment specs leading dimensions must match the environment\nbatch-size. This is done to enforce that every component of an\nenvironment (including its transforms) have an accurate representation\nof the expected input and output shapes. This is something that should\nbe accurately coded in stateful settings.\n\nFor non batch-locked environments, such as the one in our example (see\nbelow), this is irrelevant as the environment batch size will most\nlikely be empty.\n" ] }, { "cell_type": "code", "execution_count": null, "metadata": { "collapsed": false }, "outputs": [], "source": [ "def _make_spec(self, td_params):\n # Under the hood, this will populate self.output_spec[\"observation\"]\n self.observation_spec = CompositeSpec(\n th=BoundedTensorSpec(\n low=-torch.pi,\n high=torch.pi,\n shape=(),\n dtype=torch.float32,\n ),\n thdot=BoundedTensorSpec(\n low=-td_params[\"params\", \"max_speed\"],\n high=td_params[\"params\", \"max_speed\"],\n shape=(),\n dtype=torch.float32,\n ),\n # we need to add the ``params`` to the observation specs, as we want\n # to pass it at each step during a rollout\n params=make_composite_from_td(td_params[\"params\"]),\n shape=(),\n )\n # since the environment is stateless, we expect the previous output as input.\n # For this, ``EnvBase`` expects some state_spec to be available\n self.state_spec = self.observation_spec.clone()\n # action-spec will be automatically wrapped in input_spec when\n # `self.action_spec = spec` will be called supported\n self.action_spec = BoundedTensorSpec(\n low=-td_params[\"params\", \"max_torque\"],\n high=td_params[\"params\", \"max_torque\"],\n shape=(1,),\n dtype=torch.float32,\n )\n self.reward_spec = UnboundedContinuousTensorSpec(shape=(*td_params.shape, 1))\n\n\ndef make_composite_from_td(td):\n # custom function to convert a ``tensordict`` in a similar spec structure\n # of unbounded values.\n composite = CompositeSpec(\n {\n key: make_composite_from_td(tensor)\n if isinstance(tensor, TensorDictBase)\n else UnboundedContinuousTensorSpec(\n dtype=tensor.dtype, device=tensor.device, shape=tensor.shape\n )\n for key, tensor in td.items()\n },\n shape=td.shape,\n )\n return composite" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "Reproducible experiments: seeding\n=================================\n\nSeeding an environment is a common operation when initializing an\nexperiment. The only goal of `EnvBase._set_seed`{.interpreted-text\nrole=\"func\"} is to set the seed of the contained simulator. If possible,\nthis operation should not call `reset()` or interact with the\nenvironment execution. The parent `EnvBase.set_seed`{.interpreted-text\nrole=\"func\"} method incorporates a mechanism that allows seeding\nmultiple environments with a different pseudo-random and reproducible\nseed.\n" ] }, { "cell_type": "code", "execution_count": null, "metadata": { "collapsed": false }, "outputs": [], "source": [ "def _set_seed(self, seed: Optional[int]):\n rng = torch.manual_seed(seed)\n self.rng = rng" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "Wrapping things together: the `~torchrl.envs.EnvBase`{.interpreted-text role=\"class\"} class\n===========================================================================================\n\nWe can finally put together the pieces and design our environment class.\nThe specs initialization needs to be performed during the environment\nconstruction, so we must take care of calling the\n`_make_spec`{.interpreted-text role=\"func\"} method within\n`PendulumEnv.__init__`{.interpreted-text role=\"func\"}.\n\nWe add a static method `PendulumEnv.gen_params`{.interpreted-text\nrole=\"meth\"} which deterministically generates a set of hyperparameters\nto be used during execution:\n" ] }, { "cell_type": "code", "execution_count": null, "metadata": { "collapsed": false }, "outputs": [], "source": [ "def gen_params(g=10.0, batch_size=None) -> TensorDictBase:\n \"\"\"Returns a ``tensordict`` containing the physical parameters such as gravitational force and torque or speed limits.\"\"\"\n if batch_size is None:\n batch_size = []\n td = TensorDict(\n {\n \"params\": TensorDict(\n {\n \"max_speed\": 8,\n \"max_torque\": 2.0,\n \"dt\": 0.05,\n \"g\": g,\n \"m\": 1.0,\n \"l\": 1.0,\n },\n [],\n )\n },\n [],\n )\n if batch_size:\n td = td.expand(batch_size).contiguous()\n return td" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "We define the environment as non-`batch_locked` by turning the\n`homonymous` attribute to `False`. This means that we will **not**\nenforce the input `tensordict` to have a `batch-size` that matches the\none of the environment.\n\nThe following code will just put together the pieces we have coded\nabove.\n" ] }, { "cell_type": "code", "execution_count": null, "metadata": { "collapsed": false }, "outputs": [], "source": [ "class PendulumEnv(EnvBase):\n metadata = {\n \"render_modes\": [\"human\", \"rgb_array\"],\n \"render_fps\": 30,\n }\n batch_locked = False\n\n def __init__(self, td_params=None, seed=None, device=\"cpu\"):\n if td_params is None:\n td_params = self.gen_params()\n\n super().__init__(device=device, batch_size=[])\n self._make_spec(td_params)\n if seed is None:\n seed = torch.empty((), dtype=torch.int64).random_().item()\n self.set_seed(seed)\n\n # Helpers: _make_step and gen_params\n gen_params = staticmethod(gen_params)\n _make_spec = _make_spec\n\n # Mandatory methods: _step, _reset and _set_seed\n _reset = _reset\n _step = staticmethod(_step)\n _set_seed = _set_seed" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "Testing our environment\n=======================\n\nTorchRL provides a simple function\n`~torchrl.envs.utils.check_env_specs`{.interpreted-text role=\"func\"} to\ncheck that a (transformed) environment has an input/output structure\nthat matches the one dictated by its specs. Let us try it out:\n" ] }, { "cell_type": "code", "execution_count": null, "metadata": { "collapsed": false }, "outputs": [], "source": [ "env = PendulumEnv()\ncheck_env_specs(env)" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "We can have a look at our specs to have a visual representation of the\nenvironment signature:\n" ] }, { "cell_type": "code", "execution_count": null, "metadata": { "collapsed": false }, "outputs": [], "source": [ "print(\"observation_spec:\", env.observation_spec)\nprint(\"state_spec:\", env.state_spec)\nprint(\"reward_spec:\", env.reward_spec)" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "We can execute a couple of commands too to check that the output\nstructure matches what is expected.\n" ] }, { "cell_type": "code", "execution_count": null, "metadata": { "collapsed": false }, "outputs": [], "source": [ "td = env.reset()\nprint(\"reset tensordict\", td)" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "We can run the `env.rand_step`{.interpreted-text role=\"func\"} to\ngenerate an action randomly from the `action_spec` domain. A\n`tensordict` containing the hyperparameters and the current state\n**must** be passed since our environment is stateless. In stateful\ncontexts, `env.rand_step()` works perfectly too.\n" ] }, { "cell_type": "code", "execution_count": null, "metadata": { "collapsed": false }, "outputs": [], "source": [ "td = env.rand_step(td)\nprint(\"random step tensordict\", td)" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "Transforming an environment\n===========================\n\nWriting environment transforms for stateless simulators is slightly more\ncomplicated than for stateful ones: transforming an output entry that\nneeds to be read at the following iteration requires to apply the\ninverse transform before calling `meth.step`{.interpreted-text\nrole=\"func\"} at the next step. This is an ideal scenario to showcase all\nthe features of TorchRL\\'s transforms!\n\nFor instance, in the following transformed environment we `unsqueeze`\nthe entries `[\"th\", \"thdot\"]` to be able to stack them along the last\ndimension. We also pass them as `in_keys_inv` to squeeze them back to\ntheir original shape once they are passed as input in the next\niteration.\n" ] }, { "cell_type": "code", "execution_count": null, "metadata": { "collapsed": false }, "outputs": [], "source": [ "env = TransformedEnv(\n env,\n # ``Unsqueeze`` the observations that we will concatenate\n UnsqueezeTransform(\n dim=-1,\n in_keys=[\"th\", \"thdot\"],\n in_keys_inv=[\"th\", \"thdot\"],\n ),\n)" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "Writing custom transforms\n=========================\n\nTorchRL\\'s transforms may not cover all the operations one wants to\nexecute after an environment has been executed. Writing a transform does\nnot require much effort. As for the environment design, there are two\nsteps in writing a transform:\n\n- Getting the dynamics right (forward and inverse);\n- Adapting the environment specs.\n\nA transform can be used in two settings: on its own, it can be used as a\n`~torch.nn.Module`{.interpreted-text role=\"class\"}. It can also be used\nappended to a\n`~torchrl.envs.transforms.TransformedEnv`{.interpreted-text\nrole=\"class\"}. The structure of the class allows to customize the\nbehavior in the different contexts.\n\nA `~torchrl.envs.transforms.Transform`{.interpreted-text role=\"class\"}\nskeleton can be summarized as follows:\n\n``` {.}\nclass Transform(nn.Module):\n def forward(self, tensordict):\n ...\n def _apply_transform(self, tensordict):\n ...\n def _step(self, tensordict):\n ...\n def _call(self, tensordict):\n ...\n def inv(self, tensordict):\n ...\n def _inv_apply_transform(self, tensordict):\n ...\n```\n\nThere are three entry points (`forward`{.interpreted-text role=\"func\"},\n`_step`{.interpreted-text role=\"func\"} and `inv`{.interpreted-text\nrole=\"func\"}) which all receive\n`tensordict.TensorDict`{.interpreted-text role=\"class\"} instances. The\nfirst two will eventually go through the keys indicated by\n`~tochrl.envs.transforms.Transform.in_keys`{.interpreted-text\nrole=\"obj\"} and call\n`~torchrl.envs.transforms.Transform._apply_transform`{.interpreted-text\nrole=\"meth\"} to each of these. The results will be written in the\nentries pointed by `Transform.out_keys`{.interpreted-text role=\"obj\"} if\nprovided (if not the `in_keys` will be updated with the transformed\nvalues). If inverse transforms need to be executed, a similar data flow\nwill be executed but with the `Transform.inv`{.interpreted-text\nrole=\"func\"} and `Transform._inv_apply_transform`{.interpreted-text\nrole=\"func\"} methods and across the `in_keys_inv` and `out_keys_inv`\nlist of keys. The following figure summarized this flow for environments\nand replay buffers.\n\n> Transform API\n\nIn some cases, a transform will not work on a subset of keys in a\nunitary manner, but will execute some operation on the parent\nenvironment or work with the entire input `tensordict`. In those cases,\nthe `_call`{.interpreted-text role=\"func\"} and\n`forward`{.interpreted-text role=\"func\"} methods should be re-written,\nand the `_apply_transform`{.interpreted-text role=\"func\"} method can be\nskipped.\n\nLet us code new transforms that will compute the `sine` and `cosine`\nvalues of the position angle, as these values are more useful to us to\nlearn a policy than the raw angle value:\n" ] }, { "cell_type": "code", "execution_count": null, "metadata": { "collapsed": false }, "outputs": [], "source": [ "class SinTransform(Transform):\n def _apply_transform(self, obs: torch.Tensor) -> None:\n return obs.sin()\n\n # The transform must also modify the data at reset time\n def _reset(\n self, tensordict: TensorDictBase, tensordict_reset: TensorDictBase\n ) -> TensorDictBase:\n return self._call(tensordict_reset)\n\n # _apply_to_composite will execute the observation spec transform across all\n # in_keys/out_keys pairs and write the result in the observation_spec which\n # is of type ``Composite``\n @_apply_to_composite\n def transform_observation_spec(self, observation_spec):\n return BoundedTensorSpec(\n low=-1,\n high=1,\n shape=observation_spec.shape,\n dtype=observation_spec.dtype,\n device=observation_spec.device,\n )\n\n\nclass CosTransform(Transform):\n def _apply_transform(self, obs: torch.Tensor) -> None:\n return obs.cos()\n\n # The transform must also modify the data at reset time\n def _reset(\n self, tensordict: TensorDictBase, tensordict_reset: TensorDictBase\n ) -> TensorDictBase:\n return self._call(tensordict_reset)\n\n # _apply_to_composite will execute the observation spec transform across all\n # in_keys/out_keys pairs and write the result in the observation_spec which\n # is of type ``Composite``\n @_apply_to_composite\n def transform_observation_spec(self, observation_spec):\n return BoundedTensorSpec(\n low=-1,\n high=1,\n shape=observation_spec.shape,\n dtype=observation_spec.dtype,\n device=observation_spec.device,\n )\n\n\nt_sin = SinTransform(in_keys=[\"th\"], out_keys=[\"sin\"])\nt_cos = CosTransform(in_keys=[\"th\"], out_keys=[\"cos\"])\nenv.append_transform(t_sin)\nenv.append_transform(t_cos)" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "Concatenates the observations onto an \\\"observation\\\" entry.\n`del_keys=False` ensures that we keep these values for the next\niteration.\n" ] }, { "cell_type": "code", "execution_count": null, "metadata": { "collapsed": false }, "outputs": [], "source": [ "cat_transform = CatTensors(\n in_keys=[\"sin\", \"cos\", \"thdot\"], dim=-1, out_key=\"observation\", del_keys=False\n)\nenv.append_transform(cat_transform)" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "Once more, let us check that our environment specs match what is\nreceived:\n" ] }, { "cell_type": "code", "execution_count": null, "metadata": { "collapsed": false }, "outputs": [], "source": [ "check_env_specs(env)" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "Executing a rollout\n===================\n\nExecuting a rollout is a succession of simple steps:\n\n- reset the environment\n- while some condition is not met:\n - compute an action given a policy\n - execute a step given this action\n - collect the data\n - make a `MDP` step\n- gather the data and return\n\nThese operations have been conveniently wrapped in the\n`~torchrl.envs.EnvBase.rollout`{.interpreted-text role=\"meth\"} method,\nfrom which we provide a simplified version here below.\n" ] }, { "cell_type": "code", "execution_count": null, "metadata": { "collapsed": false }, "outputs": [], "source": [ "def simple_rollout(steps=100):\n # preallocate:\n data = TensorDict({}, [steps])\n # reset\n _data = env.reset()\n for i in range(steps):\n _data[\"action\"] = env.action_spec.rand()\n _data = env.step(_data)\n data[i] = _data\n _data = step_mdp(_data, keep_other=True)\n return data\n\n\nprint(\"data from rollout:\", simple_rollout(100))" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "Batching computations\n=====================\n\nThe last unexplored end of our tutorial is the ability that we have to\nbatch computations in TorchRL. Because our environment does not make any\nassumptions regarding the input data shape, we can seamlessly execute it\nover batches of data. Even better: for non-batch-locked environments\nsuch as our Pendulum, we can change the batch size on the fly without\nrecreating the environment. To do this, we just generate parameters with\nthe desired shape.\n" ] }, { "cell_type": "code", "execution_count": null, "metadata": { "collapsed": false }, "outputs": [], "source": [ "batch_size = 10 # number of environments to be executed in batch\ntd = env.reset(env.gen_params(batch_size=[batch_size]))\nprint(\"reset (batch size of 10)\", td)\ntd = env.rand_step(td)\nprint(\"rand step (batch size of 10)\", td)" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "Executing a rollout with a batch of data requires us to reset the\nenvironment out of the rollout function, since we need to define the\nbatch\\_size dynamically and this is not supported by\n`~torchrl.envs.EnvBase.rollout`{.interpreted-text role=\"meth\"}:\n" ] }, { "cell_type": "code", "execution_count": null, "metadata": { "collapsed": false }, "outputs": [], "source": [ "rollout = env.rollout(\n 3,\n auto_reset=False, # we're executing the reset out of the ``rollout`` call\n tensordict=env.reset(env.gen_params(batch_size=[batch_size])),\n)\nprint(\"rollout of len 3 (batch size of 10):\", rollout)" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "Training a simple policy\n========================\n\nIn this example, we will train a simple policy using the reward as a\ndifferentiable objective, such as a negative loss. We will take\nadvantage of the fact that our dynamic system is fully differentiable to\nbackpropagate through the trajectory return and adjust the weights of\nour policy to maximize this value directly. Of course, in many settings\nmany of the assumptions we make do not hold, such as differentiable\nsystem and full access to the underlying mechanics.\n\nStill, this is a very simple example that showcases how a training loop\ncan be coded with a custom environment in TorchRL.\n\nLet us first write the policy network:\n" ] }, { "cell_type": "code", "execution_count": null, "metadata": { "collapsed": false }, "outputs": [], "source": [ "torch.manual_seed(0)\nenv.set_seed(0)\n\nnet = nn.Sequential(\n nn.LazyLinear(64),\n nn.Tanh(),\n nn.LazyLinear(64),\n nn.Tanh(),\n nn.LazyLinear(64),\n nn.Tanh(),\n nn.LazyLinear(1),\n)\npolicy = TensorDictModule(\n net,\n in_keys=[\"observation\"],\n out_keys=[\"action\"],\n)" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "and our optimizer:\n" ] }, { "cell_type": "code", "execution_count": null, "metadata": { "collapsed": false }, "outputs": [], "source": [ "optim = torch.optim.Adam(policy.parameters(), lr=2e-3)" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "Training loop\n=============\n\nWe will successively:\n\n- generate a trajectory\n- sum the rewards\n- backpropagate through the graph defined by these operations\n- clip the gradient norm and make an optimization step\n- repeat\n\nAt the end of the training loop, we should have a final reward close to\n0 which demonstrates that the pendulum is upward and still as desired.\n" ] }, { "cell_type": "code", "execution_count": null, "metadata": { "collapsed": false }, "outputs": [], "source": [ "batch_size = 32\npbar = tqdm.tqdm(range(20_000 // batch_size))\nscheduler = torch.optim.lr_scheduler.CosineAnnealingLR(optim, 20_000)\nlogs = defaultdict(list)\n\nfor _ in pbar:\n init_td = env.reset(env.gen_params(batch_size=[batch_size]))\n rollout = env.rollout(100, policy, tensordict=init_td, auto_reset=False)\n traj_return = rollout[\"next\", \"reward\"].mean()\n (-traj_return).backward()\n gn = torch.nn.utils.clip_grad_norm_(net.parameters(), 1.0)\n optim.step()\n optim.zero_grad()\n pbar.set_description(\n f\"reward: {traj_return: 4.4f}, \"\n f\"last reward: {rollout[..., -1]['next', 'reward'].mean(): 4.4f}, gradient norm: {gn: 4.4}\"\n )\n logs[\"return\"].append(traj_return.item())\n logs[\"last_reward\"].append(rollout[..., -1][\"next\", \"reward\"].mean().item())\n scheduler.step()\n\n\ndef plot():\n import matplotlib\n from matplotlib import pyplot as plt\n\n is_ipython = \"inline\" in matplotlib.get_backend()\n if is_ipython:\n from IPython import display\n\n with plt.ion():\n plt.figure(figsize=(10, 5))\n plt.subplot(1, 2, 1)\n plt.plot(logs[\"return\"])\n plt.title(\"returns\")\n plt.xlabel(\"iteration\")\n plt.subplot(1, 2, 2)\n plt.plot(logs[\"last_reward\"])\n plt.title(\"last reward\")\n plt.xlabel(\"iteration\")\n if is_ipython:\n display.display(plt.gcf())\n display.clear_output(wait=True)\n plt.show()\n\n\nplot()" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "Conclusion\n==========\n\nIn this tutorial, we have learned how to code a stateless environment\nfrom scratch. We touched the subjects of:\n\n- The four essential components that need to be taken care of when\n coding an environment (`step`, `reset`, seeding and building specs).\n We saw how these methods and classes interact with the\n `~tensordict.TensorDict`{.interpreted-text role=\"class\"} class;\n- How to test that an environment is properly coded using\n `~torchrl.envs.utils.check_env_specs`{.interpreted-text\n role=\"func\"};\n- How to append transforms in the context of stateless environments\n and how to write custom transformations;\n- How to train a policy on a fully differentiable simulator.\n" ] } ], "metadata": { "kernelspec": { "display_name": "Python 3", "language": "python", "name": "python3" }, "language_info": { "codemirror_mode": { "name": "ipython", "version": 3 }, "file_extension": ".py", "mimetype": "text/x-python", "name": "python", "nbconvert_exporter": "python", "pygments_lexer": "ipython3", "version": "3.10.12" } }, "nbformat": 4, "nbformat_minor": 0 }