dm_control 教程

本文为 dm_control 教程的简单翻译.

原教程链接:

https://colab.research.google.com/github/deepmind/dm_control/blob/main/tutorial.ipynb

GitHub链接: deepmind/dm_control

相关文献

在 Colab 中安装 dm_control

运行命令安装MuJoCo和 dm_control

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
#@title Run to install MuJoCo and `dm_control`

import distutils.util
import subprocess
if subprocess.run('nvidia-smi').returncode:
raise RuntimeError(
'Cannot communicate with GPU. '
'Make sure you are using a GPU Colab runtime. '
'Go to the Runtime menu and select Choose runtime type.')

print('Installing dm_control...')
!pip install -q dm_control>=1.0.11

# Configure dm_control to use the EGL rendering backend (requires GPU)
%env MUJOCO_GL=egl

print('Checking that the dm_control installation succeeded...')
try:
from dm_control import suite
env = suite.load('cartpole', 'swingup')
pixels = env.physics.render()
except Exception as e:
raise e from RuntimeError(
'Something went wrong during installation. Check the shell output above '
'for more information.\n'
'If using a hosted Colab runtime, make sure you enable GPU acceleration '
'by going to the Runtime menu and selecting "Choose runtime type".')
else:
del pixels, suite

!echo Installed dm_control $(pip show dm_control | grep -Po "(?<=Version: ).+")

导入包

本教程所需的所有 dm_control 导入包

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
#@title All `dm_control` imports required for this tutorial

# The basic mujoco wrapper.
from dm_control import mujoco

# Access to enums and MuJoCo library functions.
from dm_control.mujoco.wrapper.mjbindings import enums
from dm_control.mujoco.wrapper.mjbindings import mjlib

# PyMJCF
from dm_control import mjcf

# Composer high level imports
from dm_control import composer
from dm_control.composer.observation import observable
from dm_control.composer import variation

# Imports for Composer tutorial example
from dm_control.composer.variation import distributions
from dm_control.composer.variation import noises
from dm_control.locomotion.arenas import floors

# Control Suite
from dm_control import suite

# Run through corridor example
from dm_control.locomotion.walkers import cmu_humanoid
from dm_control.locomotion.arenas import corridors as corridor_arenas
from dm_control.locomotion.tasks import corridors as corridor_tasks

# Soccer
from dm_control.locomotion import soccer

# Manipulation
from dm_control import manipulation

其他导入包和帮助函数

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
#@title Other imports and helper functions

# General
import copy
import os
import itertools
from IPython.display import clear_output
import numpy as np

# Graphics-related
import matplotlib
import matplotlib.animation as animation
import matplotlib.pyplot as plt
from IPython.display import HTML
import PIL.Image
# Internal loading of video libraries.

# Use svg backend for figure rendering
%config InlineBackend.figure_format = 'svg'

# Font sizes
SMALL_SIZE = 8
MEDIUM_SIZE = 10
BIGGER_SIZE = 12
plt.rc('font', size=SMALL_SIZE) # controls default text sizes
plt.rc('axes', titlesize=SMALL_SIZE) # fontsize of the axes title
plt.rc('axes', labelsize=MEDIUM_SIZE) # fontsize of the x and y labels
plt.rc('xtick', labelsize=SMALL_SIZE) # fontsize of the tick labels
plt.rc('ytick', labelsize=SMALL_SIZE) # fontsize of the tick labels
plt.rc('legend', fontsize=SMALL_SIZE) # legend fontsize
plt.rc('figure', titlesize=BIGGER_SIZE) # fontsize of the figure title

# Inline video helper function
if os.environ.get('COLAB_NOTEBOOK_TEST', False):
# We skip video generation during tests, as it is quite expensive.
display_video = lambda *args, **kwargs: None
else:
def display_video(frames, framerate=30):
height, width, _ = frames[0].shape
dpi = 70
orig_backend = matplotlib.get_backend()
matplotlib.use('Agg') # Switch to headless 'Agg' to inhibit figure rendering.
fig, ax = plt.subplots(1, 1, figsize=(width / dpi, height / dpi), dpi=dpi)
matplotlib.use(orig_backend) # Switch back to the original backend.
ax.set_axis_off()
ax.set_aspect('equal')
ax.set_position([0, 0, 1, 1])
im = ax.imshow(frames[0])
def update(frame):
im.set_data(frame)
return [im]
interval = 1000/framerate
anim = animation.FuncAnimation(fig=fig, func=update, frames=frames,
interval=interval, blit=True, repeat=False)
return HTML(anim.to_html5_video())

# Seed numpy's global RNG so that cell outputs are deterministic. We also try to
# use RandomState instances that are local to a single cell wherever possible.
np.random.seed(42)

模型定义, 编译和渲染

我们首先描述 MuJoCo 物理模拟库的一些基本概念, 但推荐使用官方文档了解详细信息.

静态模型

1
2
3
4
5
6
7
8
9
10
11
12
13
14
#@title A static model {vertical-output: true}

static_model = """
<mujoco>
<worldbody>
<light name="top" pos="0 0 1"/>
<geom name="red_box" type="box" size=".2 .2 .2" rgba="1 0 0 1"/>
<geom name="green_sphere" pos=".2 .2 .2" size=".1" rgba="0 1 0 1"/>
</worldbody>
</mujoco>
"""
physics = mujoco.Physics.from_xml_string(static_model)
pixels = physics.render()
PIL.Image.fromarray(pixels)

static_model 是用 MuJoCo 基于 XML 的 MJCF 建模语言编写的. from_xml_string() 方法调用模型编译器, 该编译器实例化库的内部数据结构. 这些可以通过 physics 对象访问, 见下文.

添加自由度和仿真, 高级渲染

这是一个完全合理的模型, 但如果我们模拟它, 除了时间推进, 什么都不会发生. 这是因为这个模型没有自由度(DOFs). 我们通过给物体添加关节(joints)来添加自由度, 指定它们如何相对于它们的 parents 移动. 让我们添加一个铰链关节并重新渲染, 并可视化关节轴.

带关节的子 body

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
#@title A child body with a joint { vertical-output: true }

swinging_body = """
<mujoco>
<worldbody>
<light name="top" pos="0 0 1"/>
<body name="box_and_sphere" euler="0 0 -30">
<joint name="swing" type="hinge" axis="1 -1 0" pos="-.2 -.2 -.2"/>
<geom name="red_box" type="box" size=".2 .2 .2" rgba="1 0 0 1"/>
<geom name="green_sphere" pos=".2 .2 .2" size=".1" rgba="0 1 0 1"/>
</body>
</worldbody>
</mujoco>
"""
physics = mujoco.Physics.from_xml_string(swinging_body)
# Visualize the joint axis.
scene_option = mujoco.wrapper.core.MjvOption()
scene_option.flags[enums.mjtVisFlag.mjVIS_JOINT] = True
pixels = physics.render(scene_option=scene_option)
PIL.Image.fromarray(pixels)

运动的(有惯性的)物体叫做 bodies. Body 的子 joint 指定了该 body 如何相对于它的 parent 移动, 在本例中 box_and_sphere 关于worldbody.

注意, body 的坐标系是用 euler 指令旋转(rotated)的, 它的附属几何体和关节也随之旋转. 这是为了强调 MJCF 中位置和方向指令的 local-to-parent-frame 的性质.

我们来做个视频, 了解一下动力学, 看看物体在重力作用下的摆动.

制作视频

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
#@title Making a video {vertical-output: true}

duration = 2 # (seconds)
framerate = 30 # (Hz)

# Visualize the joint axis
scene_option = mujoco.wrapper.core.MjvOption()
scene_option.flags[enums.mjtVisFlag.mjVIS_JOINT] = True

# Simulate and display video.
frames = []
physics.reset() # Reset state and time
while physics.data.time < duration:
physics.step()
if len(frames) < physics.data.time * framerate:
pixels = physics.render(scene_option=scene_option)
frames.append(pixels)
display_video(frames, framerate)

注意我们是如何采样视频帧的. 因为物理模拟时间步长通常比帧速率小得多(默认时间步长是2毫秒), 所以我们不会在每一步之后都渲染.

渲染选项

与关节可视化一样, 附加的渲染选项作为参数公开给 render 方法.

启用透明度和框架可视化

1
2
3
4
5
6
7
#@title Enable transparency and frame visualization {vertical-output: true}

scene_option = mujoco.wrapper.core.MjvOption()
scene_option.frame = enums.mjtFrame.mjFRAME_GEOM
scene_option.flags[enums.mjtVisFlag.mjVIS_TRANSPARENT] = True
pixels = physics.render(scene_option=scene_option)
PIL.Image.fromarray(pixels)

深度渲染

1
2
3
4
5
6
7
8
9
10
11
#@title Depth rendering {vertical-output: true}

# depth is a float array, in meters.
depth = physics.render(depth=True)
# Shift nearest values to the origin.
depth -= depth.min()
# Scale by 2 mean distances of near rays.
depth /= 2*depth[depth <= 1].mean()
# Scale to [0, 255]
pixels = 255*np.clip(depth, 0, 1)
PIL.Image.fromarray(pixels.astype(np.uint8))

分割渲染

1
2
3
4
5
6
7
8
9
10
11
12
#@title Segmentation rendering {vertical-output: true}

seg = physics.render(segmentation=True)
# Display the contents of the first channel, which contains object
# IDs. The second channel, seg[:, :, 1], contains object types.
geom_ids = seg[:, :, 0]
# Infinity is mapped to -1
geom_ids = geom_ids.astype(np.float64) + 1
# Scale to [0, 1]
geom_ids = geom_ids / geom_ids.max()
pixels = 255*geom_ids
PIL.Image.fromarray(pixels.astype(np.uint8))

从世界到相机坐标系投影

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
#@title Projecting from world to camera coordinates {vertical-output: true}

# Get the world coordinates of the box corners
box_pos = physics.named.data.geom_xpos['red_box']
box_mat = physics.named.data.geom_xmat['red_box'].reshape(3, 3)
box_size = physics.named.model.geom_size['red_box']
offsets = np.array([-1, 1]) * box_size[:, None]
xyz_local = np.stack(itertools.product(*offsets)).T
xyz_global = box_pos[:, None] + box_mat @ xyz_local

# Camera matrices multiply homogenous [x, y, z, 1] vectors.
corners_homogeneous = np.ones((4, xyz_global.shape[1]), dtype=float)
corners_homogeneous[:3, :] = xyz_global

# Get the camera matrix.
camera = mujoco.Camera(physics)
camera_matrix = camera.matrix

# Project world coordinates into pixel space. See:
# https://en.wikipedia.org/wiki/3D_projection#Mathematical_formula
xs, ys, s = camera_matrix @ corners_homogeneous
# x and y are in the pixel coordinate system.
x = xs / s
y = ys / s

# Render the camera view and overlay the projected corner coordinates.
pixels = camera.render()
fig, ax = plt.subplots(1, 1)
ax.imshow(pixels)
ax.plot(x, y, '+', c='w')
ax.set_axis_off()

MuJoCo基础和命名索引

mjModel

MuJoCo 的 mjModel, 封装在 physics.model, 包含模型描述, 包括默认的初始状态和其他不是状态函数的固定量, 例如几何体在其父坐标系中的位置. 几何体 boxsphere 相对于它们的父体 box_and_sphere 的 (x, y, z) 偏移量由 model.geom_pos 给出:

1
physics.model.geom_pos

array([[0. , 0. , 0. ],
[0.2, 0.2, 0.2]])

model.opt 结构包含全局量, 如

1
2
print('timestep', physics.model.opt.timestep)
print('gravity', physics.model.opt.gravity)

timestep 0.002
gravity [ 0. 0. -9.81]

mjData

mjData, 封装在 physics.data,包含依赖于它的状态和数量. 状态由时间, 广义位置和广义速度组成. 它们分别为 data.time, data.qposdata.qvel.

让我们打印我们之前提到的摆体的状态:

1
print(physics.data.time, physics.data.qpos, physics.data.qvel)

2.0000000000000013 [-0.05319467] [-1.36253678]

physics.data 还包含状态函数, 例如物体在世界坐标系中的笛卡尔位置. 两个几何体的 (x, y, z) 位置在 data.geom_xpos 中:

1
print(physics.data.geom_xpos)

[[ 0.00988513 0.00264871 -0.01532153]
[ 0.29297533 0.0785025 0.16935694]]

命名索引

使用 named 封装器使上述数组的语义更加清晰, 它将名称分配给行, 并将类型名称分配给列.

1
print(physics.named.data.geom_xpos)

FieldIndexer(geom_xpos):
x y z
0 red_box [ 0.00989 0.00265 -0.0153 ]
1 green_sphere [ 0.293 0.0785 0.169 ]

注意 model.geom_posdata.geom_xpos 具有相似的语义, 但含义非常不同.

1
print(physics.named.model.geom_pos)

FieldIndexer(geom_pos):
x y z
0 red_box [ 0 0 0 ]
1 green_sphere [ 0.2 0.2 0.2 ]

名称字符串可用于索引到相关的量, 使代码更具可读性和稳健性.

1
physics.named.data.geom_xpos['green_sphere', 'z']

0.16935693635325777

关节名称可以用于在构型空间中索引相关量(以字母 q 开头):

1
physics.named.data.qpos['swing']

array([-0.05319467])

我们可以将 NumPy 切片操作与命名索引混合使用. 例如, 我们可以使用 box 的名称("red_box")作为 geom_rgba 数组行索引来设置 box 的颜色.

通过命名索引更改颜色

1
2
3
4
5
6
#@title Changing colors using named indexing{vertical-output: true}

random_rgb = np.random.rand(3)
physics.named.model.geom_rgba['red_box', :3] = random_rgb
pixels = physics.render()
PIL.Image.fromarray(pixels)

注意 physics.model 量不会被引擎改变, 我们可以在时间步之间自行改变它们. 然而, 通常不建议这样做, 首选的方法是使用 PyMJCF 库在 XML 级别修改模型, 见下文.

通过 reset_context() 设置状态

为了使作为状态函数的 data 量与状态同步, 需要调用 MuJoCo 的 mj_step1() . 这是由 reset_context() 上下文促成的, 请参阅技术报告2.1节的深入讨论.

1
2
3
4
5
6
7
physics.named.data.qpos['swing'] = np.pi
print('Without reset_context, spatial positions are not updated:',
physics.named.data.geom_xpos['green_sphere', ['z']])
with physics.reset_context():
physics.named.data.qpos['swing'] = np.pi
print('After reset_context, positions are up-to-date:',
physics.named.data.geom_xpos['green_sphere', ['z']])

Without reset_context, spatial positions are not updated: [0.16935694]
After reset_context, positions are up-to-date: [-0.6]

自由体:自反转的“tippe-top”

自由体是具有一个 free 关节的物体, 具有 6 个运动自由度: 3 个平移和 3 个旋转. 我们可以给 box_and_sphere 体一个自由关节, 然后看着它下落, 但是让我们看一些更有趣的东西. “tippe top”是一个旋转的玩具, 它可以翻转过自己的头部(维基百科). 我们将其建模如下:

"Tippe-top" 模型

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
#@title The "tippe-top" model{vertical-output: true}

tippe_top = """
<mujoco model="tippe top">
<option integrator="RK4"/>
<asset>
<texture name="grid" type="2d" builtin="checker" rgb1=".1 .2 .3"
rgb2=".2 .3 .4" width="300" height="300"/>
<material name="grid" texture="grid" texrepeat="8 8" reflectance=".2"/>
</asset>
<worldbody>
<geom size=".2 .2 .01" type="plane" material="grid"/>
<light pos="0 0 .6"/>
<camera name="closeup" pos="0 -.1 .07" xyaxes="1 0 0 0 1 2"/>
<body name="top" pos="0 0 .02">
<freejoint/>
<geom name="ball" type="sphere" size=".02" />
<geom name="stem" type="cylinder" pos="0 0 .02" size="0.004 .008"/>
<geom name="ballast" type="box" size=".023 .023 0.005" pos="0 0 -.015"
contype="0" conaffinity="0" group="3"/>
</body>
</worldbody>
<keyframe>
<key name="spinning" qpos="0 0 0.02 1 0 0 0" qvel="0 0 0 0 1 200" />
</keyframe>
</mujoco>
"""
physics = mujoco.Physics.from_xml_string(tippe_top)
PIL.Image.fromarray(physics.render(camera_id='closeup'))

注意这个模型定义的几个新特性:

  1. 这是使用 <freejoint/> 标签添加的自由关节, 它类似于 <joint type="free"/>, 但禁用了摩擦或刚度等非物理属性.
  2. 我们使用 <option/> 标签将积分器设置为更精确的 4 阶 Runge Kutta.
  3. 我们在 <asset/> 标签中定义地板的网格材质, 并在 "floor” 几何体中引用它.
  4. 我们使用一种不可见的, 无碰撞的 box 几何体, 称为 ballast , 来降低顶部的质心. 低质心是发生翻转行为所必需的(与直觉相反).
  5. 我们将初始旋转状态保存为关键帧. 它绕 Z 轴有很高的旋转速度, 但朝向并不完全重合于世界坐标轴.
  6. 我们在模型中定义了一个 <camera>, 然后使用 camera_id 参数对 update_scene() 进行渲染. 让我们来检查一下现在的状态:
1
2
print('positions', physics.data.qpos)
print('velocities', physics.data.qvel)

positions [0. 0. 0.02 1. 0. 0. 0. ]
velocities [0. 0. 0. 0. 0. 0.]

速度很容易解释, 6 个 0, 每个自由度占一个. 长度7的位置呢? 我们可以看到身体最初的 2 厘米高度; 后面的四个数字是三维朝向, 由单位四元数定义. 这些规范化的四元向量, 它们保留了朝向群的拓扑结构, 这是 data.qposdata.qvel 更长的原因: 三维朝向用 4 个数字表示, 角速度用 3 个数字表示.

"Tippe-top" 视频

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
#@title Video of the tippe-top {vertical-output: true}

duration = 7 # (seconds)
framerate = 60 # (Hz)

# Simulate and display video.
frames = []
physics.reset(0) # Reset to keyframe 0 (load a saved state).
while physics.data.time < duration:
physics.step()
if len(frames) < (physics.data.time) * framerate:
pixels = physics.render(camera_id='closeup')
frames.append(pixels)

display_video(frames, framerate)

physics.data 测量值

physics.data 结构包含仿真产生的所有动态变量和中间结果. 这些将在每个时间步上发生更改.

下面我们仿真 2000 个时间步, 并绘制球体的状态和高度作为时间的函数.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
#@title Measuring values {vertical-output: true}

timevals = []
angular_velocity = []
stem_height = []

# Simulate and save data
physics.reset(0)
while physics.data.time < duration:
physics.step()
timevals.append(physics.data.time)
angular_velocity.append(physics.data.qvel[3:6].copy())
stem_height.append(physics.named.data.geom_xpos['stem', 'z'])

dpi = 100
width = 480
height = 640
figsize = (width / dpi, height / dpi)
_, ax = plt.subplots(2, 1, figsize=figsize, dpi=dpi, sharex=True)

ax[0].plot(timevals, angular_velocity)
ax[0].set_title('angular velocity')
ax[0].set_ylabel('radians / second')

ax[1].plot(timevals, stem_height)
ax[1].set_xlabel('time (seconds)')
ax[1].set_ylabel('meters')
_ = ax[1].set_title('stem height')

PyMJCF 教程

这个库为 MuJoCo 基于 XML 的 MJCF 物理建模语言提供了一个 Python 对象模型. 这个库的目标是允许用户在 Python 中轻松地与 MJCF 模型交互和修改, 类似于 JavaScript DOM 对 HTML 的作用.

这个库的一个关键特性是能够轻松地将多个单独的 MJCF 模型组合成一个更大的模型. 自动消除来自不同模型或同一模型的多个实例的重复名称的歧义.

一个典型的用例是当我们想要机器人具有可变数量的关节时. 这是对运动学的一个根本性改变, 需要编译一个新的 XML 描述符和新的二进制模型.

下面的代码片段实现了这个场景, 并提供了这个库用例的快速示例.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
class Leg(object):
"""A 2-DoF leg with position actuators."""
def __init__(self, length, rgba):
self.model = mjcf.RootElement()

# Defaults:
self.model.default.joint.damping = 2
self.model.default.joint.type = 'hinge'
self.model.default.geom.type = 'capsule'
self.model.default.geom.rgba = rgba # Continued below...

# Thigh:
self.thigh = self.model.worldbody.add('body')
self.hip = self.thigh.add('joint', axis=[0, 0, 1])
self.thigh.add('geom', fromto=[0, 0, 0, length, 0, 0], size=[length/4])

# Hip:
self.shin = self.thigh.add('body', pos=[length, 0, 0])
self.knee = self.shin.add('joint', axis=[0, 1, 0])
self.shin.add('geom', fromto=[0, 0, 0, 0, 0, -length], size=[length/5])

# Position actuators:
self.model.actuator.add('position', joint=self.hip, kp=10)
self.model.actuator.add('position', joint=self.knee, kp=10)

Leg 类描述了一个抽象的铰接腿, 具有两个关节和相应的 PD 执行器.

注意:

  • MJCF 属性直接对应于 add() 方法的参数.
  • 当引用元素时, 例如当指定执行器连接的关节时, 使用 MJCF 元素本身, 而不是名称字符串.
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
BODY_RADIUS = 0.1
BODY_SIZE = (BODY_RADIUS, BODY_RADIUS, BODY_RADIUS / 2)
random_state = np.random.RandomState(42)

def make_creature(num_legs):
"""Constructs a creature with `num_legs` legs."""
rgba = random_state.uniform([0, 0, 0, 1], [1, 1, 1, 1])
model = mjcf.RootElement()
model.compiler.angle = 'radian' # Use radians.

# Make the torso geom.
model.worldbody.add(
'geom', name='torso', type='ellipsoid', size=BODY_SIZE, rgba=rgba)

# Attach legs to equidistant sites on the circumference.
for i in range(num_legs):
theta = 2 * i * np.pi / num_legs
hip_pos = BODY_RADIUS * np.array([np.cos(theta), np.sin(theta), 0])
hip_site = model.worldbody.add('site', pos=hip_pos, euler=[0, 0, theta])
leg = Leg(length=BODY_RADIUS, rgba=rgba)
hip_site.attach(leg.model)

return model

make_creature 函数使用 PyMJCF 的 attach() 方法从程序上将腿附加到躯干上. 请注意, 在这个阶段, 躯干和臀部附属节点都是 worldbody 的子节点, 因为它们的父节点还没有实例化. 我们现在将制作一个带有方格地板和两盏灯的 arena, 并将我们的生物放置在网格中.

六只生物

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
#@title Six Creatures on a floor.{vertical-output: true}

arena = mjcf.RootElement()
chequered = arena.asset.add('texture', type='2d', builtin='checker', width=300,
height=300, rgb1=[.2, .3, .4], rgb2=[.3, .4, .5])
grid = arena.asset.add('material', name='grid', texture=chequered,
texrepeat=[5, 5], reflectance=.2)
arena.worldbody.add('geom', type='plane', size=[2, 2, .1], material=grid)
for x in [-2, 2]:
arena.worldbody.add('light', pos=[x, -1, 3], dir=[-x, 1, -2])

# Instantiate 6 creatures with 3 to 8 legs.
creatures = [make_creature(num_legs=num_legs) for num_legs in range(3, 9)]

# Place them on a grid in the arena.
height = .15
grid = 5 * BODY_RADIUS
xpos, ypos, zpos = np.meshgrid([-grid, 0, grid], [0, grid], [height])
for i, model in enumerate(creatures):
# Place spawn sites on a grid.
spawn_pos = (xpos.flat[i], ypos.flat[i], zpos.flat[i])
spawn_site = arena.worldbody.add('site', pos=spawn_pos, group=3)
# Attach to the arena at the spawn sites, with a free joint.
spawn_site.attach(model).add('freejoint')

# Instantiate the physics and render.
physics = mjcf.Physics.from_mjcf_model(arena)
PIL.Image.fromarray(physics.render())

多腿生物, 随时准备漫步! 让我们引入一些控制并观察它们的移动. 我们将生成一个固定频率和随机相位的正弦开环控制信号, 记录视频帧和躯干几何体的水平位置, 以绘制运动轨迹.

运动视频

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
#@title Video of the movement{vertical-output: true}
#@test {"timeout": 600}

duration = 10 # (Seconds)
framerate = 30 # (Hz)
video = []
pos_x = []
pos_y = []
torsos = [] # List of torso geom elements.
actuators = [] # List of actuator elements.
for creature in creatures:
torsos.append(creature.find('geom', 'torso'))
actuators.extend(creature.find_all('actuator'))

# Control signal frequency, phase, amplitude.
freq = 5
phase = 2 * np.pi * random_state.rand(len(actuators))
amp = 0.9

# Simulate, saving video frames and torso locations.
physics.reset()
while physics.data.time < duration:
# Inject controls and step the physics.
physics.bind(actuators).ctrl = amp * np.sin(freq * physics.data.time + phase)
physics.step()

# Save torso horizontal positions using bind().
pos_x.append(physics.bind(torsos).xpos[:, 0].copy())
pos_y.append(physics.bind(torsos).xpos[:, 1].copy())

# Save video frames.
if len(video) < physics.data.time * framerate:
pixels = physics.render()
video.append(pixels.copy())

display_video(video, framerate)

运动轨迹

1
2
3
4
5
6
#@title Movement trajectories{vertical-output: true}

creature_colors = physics.bind(torsos).rgba[:, :3]
fig, ax = plt.subplots(figsize=(4, 4))
ax.set_prop_cycle(color=creature_colors)
_ = ax.plot(pos_x, pos_y, linewidth=4)

上图显示了 creature 位置对应的运动轨迹. 注意如何使用 physics.bind(torsos) 来访问 xposrgba 值. 一旦由 from_mjcf_model() 实例化了 Physics, bind() 方法将公开 mjcf 元素的关联 mjDatamjModel 字段, 从而提供对仿真中所有量的统一访问.

Composer 教程

在本教程中, 我们将创建一个任务, 要求我们的 “creature” 以规定的力来按地板上的一个变色按钮. 我们首先将角色设定为 composer.Entity:

Creature

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
#@title The `Creature` class
class Creature(composer.Entity):
"""A multi-legged creature derived from `composer.Entity`."""
def _build(self, num_legs):
self._model = make_creature(num_legs)

def _build_observables(self):
return CreatureObservables(self)

@property
def mjcf_model(self):
return self._model

@property
def actuators(self):
return tuple(self._model.find_all('actuator'))

# Add simple observable features for joint angles and velocities.
class CreatureObservables(composer.Observables):

@composer.observable
def joint_positions(self):
all_joints = self._entity.mjcf_model.find_all('joint')
return observable.MJCFFeature('qpos', all_joints)

@composer.observable
def joint_velocities(self):
all_joints = self._entity.mjcf_model.find_all('joint')
return observable.MJCFFeature('qvel', all_joints)

Creature 实体包括关节角度和速度的通用 Observables. 因为 find_all() 是在 Creature 的 MJCF 模型上调用的, 它只会返回生物的腿部关节, 而不是它将连接到世界的“自由”关节. 注意, Composer 实体应该重写 _build_build_observables 方法, 而不是 __init__. 在基类中实现 __init__ 调用 _build_build_observables, 按此顺序,以确保实体的 MJCF 模型在其 observables 之前创建. 这是一种设计选择, 允许用户将 observables 引用为属性(entity.observables.foo), 同时仍然清晰地表明哪些属性是 observables. 有状态的 Button 类派生自 composer.Entity, 并实现 initialize_episodeafter_substep 回调函数.

Button

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
#@title The `Button` class
NUM_SUBSTEPS = 25 # The number of physics substeps per control timestep.

class Button(composer.Entity):
"""A button Entity which changes colour when pressed with certain force."""
def _build(self, target_force_range=(5, 10)):
self._min_force, self._max_force = target_force_range
self._mjcf_model = mjcf.RootElement()
self._geom = self._mjcf_model.worldbody.add(
'geom', type='cylinder', size=[0.25, 0.02], rgba=[1, 0, 0, 1])
self._site = self._mjcf_model.worldbody.add(
'site', type='cylinder', size=self._geom.size*1.01, rgba=[1, 0, 0, 0])
self._sensor = self._mjcf_model.sensor.add('touch', site=self._site)
self._num_activated_steps = 0

def _build_observables(self):
return ButtonObservables(self)

@property
def mjcf_model(self):
return self._mjcf_model
# Update the activation (and colour) if the desired force is applied.
def _update_activation(self, physics):
current_force = physics.bind(self.touch_sensor).sensordata[0]
self._is_activated = (current_force >= self._min_force and
current_force <= self._max_force)
physics.bind(self._geom).rgba = (
[0, 1, 0, 1] if self._is_activated else [1, 0, 0, 1])
self._num_activated_steps += int(self._is_activated)

def initialize_episode(self, physics, random_state):
self._reward = 0.0
self._num_activated_steps = 0
self._update_activation(physics)

def after_substep(self, physics, random_state):
self._update_activation(physics)

@property
def touch_sensor(self):
return self._sensor

@property
def num_activated_steps(self):
return self._num_activated_steps

class ButtonObservables(composer.Observables):
"""A touch sensor which averages contact force over physics substeps."""
@composer.observable
def touch_force(self):
return observable.MJCFFeature('sensordata', self._entity.touch_sensor,
buffer_size=NUM_SUBSTEPS, aggregator='mean')

注意按钮如何在指定力按下期间计算 sub-steps 的数量. 它还暴露了施加在按钮上的力的 Observable, 其值是物理时间步长的读数平均值.

我们导入了一些 variation 模块和一个 arena factory:

使用 composer.variation 的随机初始化器

1
2
3
4
5
6
7
8
9
10
11
12
#@title Random initialiser using `composer.variation`

class UniformCircle(variation.Variation):
"""A uniformly sampled horizontal point on a circle of radius `distance`."""
def __init__(self, distance):
self._distance = distance
self._heading = distributions.Uniform(0, 2*np.pi)

def __call__(self, initial_value=None, current_value=None, random_state=None):
distance, heading = variation.evaluate(
(self._distance, self._heading), random_state=random_state)
return (distance*np.cos(heading), distance*np.sin(heading), 0)

PressWithSpecificForce 任务

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
#@title The `PressWithSpecificForce` task

class PressWithSpecificForce(composer.Task):

def __init__(self, creature):
self._creature = creature
self._arena = floors.Floor()
self._arena.add_free_entity(self._creature)
self._arena.mjcf_model.worldbody.add('light', pos=(0, 0, 4))
self._button = Button()
self._arena.attach(self._button)

# Configure initial poses
self._creature_initial_pose = (0, 0, 0.15)
button_distance = distributions.Uniform(0.5, .75)
self._button_initial_pose = UniformCircle(button_distance)

# Configure variators
self._mjcf_variator = variation.MJCFVariator()
self._physics_variator = variation.PhysicsVariator()

# Configure and enable observables
pos_corrptor = noises.Additive(distributions.Normal(scale=0.01))
self._creature.observables.joint_positions.corruptor = pos_corrptor
self._creature.observables.joint_positions.enabled = True
vel_corruptor = noises.Multiplicative(distributions.LogNormal(sigma=0.01))
self._creature.observables.joint_velocities.corruptor = vel_corruptor
self._creature.observables.joint_velocities.enabled = True
self._button.observables.touch_force.enabled = True

def to_button(physics):
button_pos, _ = self._button.get_pose(physics)
return self._creature.global_vector_to_local_frame(physics, button_pos)

self._task_observables = {}
self._task_observables['button_position'] = observable.Generic(to_button)

for obs in self._task_observables.values():
obs.enabled = True

self.control_timestep = NUM_SUBSTEPS * self.physics_timestep

@property
def root_entity(self):
return self._arena

@property
def task_observables(self):
return self._task_observables

def initialize_episode_mjcf(self, random_state):
self._mjcf_variator.apply_variations(random_state)

def initialize_episode(self, physics, random_state):
self._physics_variator.apply_variations(physics, random_state)
creature_pose, button_pose = variation.evaluate(
(self._creature_initial_pose, self._button_initial_pose),
random_state=random_state)
self._creature.set_pose(physics, position=creature_pose)
self._button.set_pose(physics, position=button_pose)

def get_reward(self, physics):
return self._button.num_activated_steps / NUM_SUBSTEPS

实例化环境

1
2
3
4
5
6
7
8
#@title Instantiating an environment{vertical-output: true}

creature = Creature(num_legs=4)
task = PressWithSpecificForce(creature)
env = composer.Environment(task, random_state=np.random.RandomState(42))

env.reset()
PIL.Image.fromarray(env.physics.render())

The Control Suite

Control Suite 是一组稳定的, 经过良好测试的任务, 旨在作为持续控制 learning agents 的基准(benchmark). 任务是使用基本的 MuJoCo 封装接口编写的. 标准化的行动(action), 观察(observation)和奖励(reward)结构使 suit-wide 的基准测试变得简单, 学习曲线也易于解释. 为了便于进行基准测试, Control Suite 域不应该被修改. 有关基准测试的详细信息, 请参阅我们的原始文章.

解决的基准测试任务的视频可以在这里找到.

该 suite 为迭代任务提供了方便的模块级元组:

遍历任务

1
2
3
4
5
#@title Iterating over tasks{vertical-output: true}

max_len = max(len(d) for d, _ in suite.BENCHMARKING)
for domain, task in suite.BENCHMARKING:
print(f'{domain:<{max_len}} {task}')

加载和仿真一个 suite 任务

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
#@title Loading and simulating a `suite` task{vertical-output: true}

# Load the environment
random_state = np.random.RandomState(42)
env = suite.load('hopper', 'stand', task_kwargs={'random': random_state})

# Simulate episode with random actions
duration = 4 # Seconds
frames = []
ticks = []
rewards = []
observations = []

spec = env.action_spec()
time_step = env.reset()

while env.physics.data.time < duration:

action = random_state.uniform(spec.minimum, spec.maximum, spec.shape)
time_step = env.step(action)

camera0 = env.physics.render(camera_id=0, height=200, width=200)
camera1 = env.physics.render(camera_id=1, height=200, width=200)
frames.append(np.hstack((camera0, camera1)))
rewards.append(time_step.reward)
observations.append(copy.deepcopy(time_step.observation))
ticks.append(env.physics.data.time)

html_video = display_video(frames, framerate=1./env.control_timestep())

# Show video and plot reward and observations
num_sensors = len(time_step.observation)

_, ax = plt.subplots(1 + num_sensors, 1, sharex=True, figsize=(4, 8))
ax[0].plot(ticks, rewards)
ax[0].set_ylabel('reward')
ax[-1].set_xlabel('time')

for i, key in enumerate(time_step.observation):
data = np.asarray([observations[j][key] for j in range(len(observations))])
ax[i+1].plot(ticks, data, label=key)
ax[i+1].set_ylabel(key)

html_video

在 Control Suite 中可视化每个域的一个任务的初始状态

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
#@title Visualizing an initial state of one task per domain in the Control Suite
domains_tasks = {domain: task for domain, task in suite.ALL_TASKS}
random_state = np.random.RandomState(42)
num_domains = len(domains_tasks)
n_col = num_domains // int(np.sqrt(num_domains))
n_row = num_domains // n_col + int(0 < num_domains % n_col)
_, ax = plt.subplots(n_row, n_col, figsize=(12, 12))
for a in ax.flat:
a.axis('off')
a.grid(False)

print(f'Iterating over all {num_domains} domains in the Suite:')
for j, [domain, task] in enumerate(domains_tasks.items()):
print(domain, task)

env = suite.load(domain, task, task_kwargs={'random': random_state})
timestep = env.reset()
pixels = env.physics.render(height=200, width=200, camera_id=0)

ax.flat[j].imshow(pixels)
ax.flat[j].set_title(domain + ': ' + task)

clear_output()

运动

人物模型在有障碍物的走廊上奔跑

作为一个使用移动基础结构来构建 RL 环境的示例, 考虑将一个人形角色放置在一个有墙壁的走廊中, 并指定一个任务, 该任务指定人形角色将通过沿着这个走廊奔跑, 使用视觉绕过墙壁障碍物来获得奖励. 我们将环境实例化为 Walker, Arena 和 Task 的组合, 如下所示. 首先, 我们构建了一个位置控制的 CMU 人形角色.

一个位置控制 cmu_humanoid

1
2
3
4
#@title A position controlled `cmu_humanoid`

walker = cmu_humanoid.CMUHumanoidPositionControlledV2020(
observable_options={'egocentric_camera': dict(enabled=True)})

接下来, 我们建造了一个走廊形状的 arena, 它被墙壁挡住.

一个带有墙壁障碍的走廊 arena

1
2
3
4
5
6
7
8
9
#@title A corridor arena with wall obstacles

arena = corridor_arenas.WallsCorridor(
wall_gap=3.,
wall_width=distributions.Uniform(2., 3.),
wall_height=distributions.Uniform(2.5, 3.5),
corridor_width=4.,
corridor_length=30.,
)

任务构造函数将 walker 放置在arena 中.

在 arena 中导航的任务

1
2
3
4
5
6
7
8
9
10
#@title A task to navigate the arena

task = corridor_tasks.RunThroughCorridor(
walker=walker,
arena=arena,
walker_spawn_position=(0.5, 0, 0),
target_velocity=3.0,
physics_timestep=0.005,
control_timestep=0.03,
)

最后, 一个奖励 agent 以特定速度跑下走廊的任务被实例化为 composer.Environment.

RunThroughCorridor 环境

1
2
3
4
5
6
7
8
9
10
11
12
13
#@title The `RunThroughCorridor` environment

env = composer.Environment(
task=task,
time_limit=10,
random_state=np.random.RandomState(42),
strip_singleton_obs_buffer_dim=True,
)
env.reset()
pixels = []
for camera_id in range(3):
pixels.append(env.physics.render(camera_id=camera_id, width=240))
PIL.Image.fromarray(np.hstack(pixels))

多智能体足球

多智能体足球环境(文章)建立在 Composer 和 Locomotion 库的基础上, 遵循 Walkers, Arena 和 Task 的一致任务结构, 其中我们引入多个 walkers, 而不是单个, 可以在同一场景中相互进行物理交互. 下面的代码片段展示了如何使用简单的 5 自由度 BoxHead walker 类型实例化一个 2-vs-2 的多代理足球环境.

2-v-2 Boxhead 足球

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
#@title 2-v-2 `Boxhead` soccer

random_state = np.random.RandomState(42)
env = soccer.load(
team_size=2,
time_limit=45.,
random_state=random_state,
disable_walker_contacts=False,
walker_type=soccer.WalkerType.BOXHEAD,
)
env.reset()
pixels = []
# Select a random subset of 6 cameras (soccer envs have lots of cameras)
cameras = random_state.choice(env.physics.model.ncam, 6, replace=False)
for camera_id in cameras:
pixels.append(env.physics.render(camera_id=camera_id, width=240))
image = np.vstack((np.hstack(pixels[:3]), np.hstack(pixels[3:])))
PIL.Image.fromarray(image)

其中的 walker 可以被简单地替换为 WalkerType.ANT:

3-v-3 Ant 足球

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
#@title 3-v-3 `Ant` soccer

random_state = np.random.RandomState(42)
env = soccer.load(
team_size=3,
time_limit=45.,
random_state=random_state,
disable_walker_contacts=False,
walker_type=soccer.WalkerType.ANT,
)
env.reset()

pixels = []
cameras = random_state.choice(env.physics.model.ncam, 6, replace=False)
for camera_id in cameras:
pixels.append(env.physics.render(camera_id=camera_id, width=240))
image = np.vstack((np.hstack(pixels[:3]), np.hstack(pixels[3:])))
PIL.Image.fromarray(image)

Manipulation

manipulation 模块提供了一个机械臂, 一组简单的对象, 以及为操作任务构建奖励函数的工具.

列出所有 manipulation 任务

1
2
3
4
#@title Listing all `manipulation` tasks{vertical-output: true}

# `ALL` is a tuple containing the names of all of the environments in the suite.
print('\n'.join(manipulation.ALL))

列出使用视觉的 manipulation 任务

1
2
#@title Listing `manipulation` tasks that use vision{vertical-output: true}
print('\n'.join(manipulation.get_environments_by_tag('vision')))

加载和模拟 manipulation 任务

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
#@title Loading and simulating a `manipulation` task{vertical-output: true}

env = manipulation.load('stack_2_of_3_bricks_random_order_vision', seed=42)
action_spec = env.action_spec()

def sample_random_action():
return env.random_state.uniform(
low=action_spec.minimum,
high=action_spec.maximum,
).astype(action_spec.dtype, copy=False)

# Step the environment through a full episode using random actions and record
# the camera observations.
frames = []
timestep = env.reset()
frames.append(timestep.observation['front_close'])
while not timestep.last():
timestep = env.step(sample_random_action())
frames.append(timestep.observation['front_close'])
all_frames = np.concatenate(frames, axis=0)
display_video(all_frames, 30)