これの続き
natsutan.hatenablog.com
まあまあ動くようになった。
改善点
EEの姿勢でクオータニオンを使っていて、Δθを求めるとき、Degに直してアクションの値を加算したいた。この時の係数がクオータニオンのままで、一回のアクションで1degも変わってなかった。最大3deg位までにするとちゃんと向きが合うようになった。
1エピソードの最大数を短く。num_steps_per_envを小さくした。この値が大きいと、変な方向に行ってもそのまま動作を続けるのでjunk epsodeが増えてしまう。本やらAIに聞くと簡単なタスクなら1000~2000となっているが、仕組み的にはベストケースの数倍の方が正しいと思う。
NNを小さく。タスクが単純なので、NNの段数はそのままで大きさを全て半分にした。
結果
箱の少し上の場所に、EE下向きで到達すればOKにしている。
軌道生成を追加すればもう少しましな動きしそう。
最後にプルプルするのは、最短距離でゴールにいくよりゴール近くで粘った方がトータルの報酬が高くなるので、ゴール近くでランダムに動く方が高得点になるんだと思う。
ソース
そろそろgithubにした方がよさそう・・・
ur_tran.py
import argparse
import os
import pickle
import shutil
from importlib import metadata
try:
try:
if metadata.version("rsl-rl"):
raise ImportError
except metadata.PackageNotFoundError:
if metadata.version("rsl-rl-lib") != "2.2.4":
raise ImportError
except (metadata.PackageNotFoundError, ImportError) as e:
raise ImportError("Please uninstall 'rsl_rl' and install 'rsl-rl-lib==2.2.4'.") from e
from rsl_rl.runners import OnPolicyRunner
import genesis as gs
from ur_env import UREnv
def get_train_cfg(exp_name, max_iterations):
train_cfg_dict = {
"algorithm": {
"class_name": "PPO",
"clip_param": 0.2,
"desired_kl": 0.01,
"entropy_coef": 0.01,
"gamma": 0.99,
"lam": 0.95,
"learning_rate": 0.001,
"max_grad_norm": 1.0,
"num_learning_epochs": 5,
"num_mini_batches": 4,
"schedule": "adaptive",
"use_clipped_value_loss": True,
"value_loss_coef": 1.0,
},
"init_member_classes": {},
"policy": {
"activation": "elu",
"actor_hidden_dims": [256, 128, 64],
"critic_hidden_dims": [256, 128, 64],
"init_noise_std": 1.0,
"class_name": "ActorCritic",
},
"runner": {
"checkpoint": -1,
"experiment_name": exp_name,
"load_run": -1,
"log_interval": 1,
"max_iterations": max_iterations,
"record_interval": -1,
"resume": False,
"resume_path": None,
"run_name": "",
},
"runner_class_name": "OnPolicyRunner",
"num_steps_per_env": 100,
"save_interval": 100,
"empirical_normalization": None,
"seed": 1,
}
return train_cfg_dict
def get_cfgs():
env_cfg = {
"num_actions": 6,
"default_joint_angles": {
'shoulder_pan_joint' : -0.0,
'shoulder_lift_joint' : -0.9,
'elbow_joint' : -0.5,
'wrist_1_joint' : -1.4,
'wrist_2_joint' : -1.3,
'wrist_3_joint' : -0.3,
'robotiq_85_left_knuckle_joint' : 0.04,
'robotiq_85_right_knuckle_joint' : 0.04,
'robotiq_85_left_inner_knuckle_joint' : 0.04,
'robotiq_85_right_inner_knuckle_joint' : 0.04,
'robotiq_85_left_finger_tip_joint' : 0.04,
'robotiq_85_right_finger_tip_joint' : 0.04,
},
"joint_names": [
'shoulder_pan_joint',
'shoulder_lift_joint',
'elbow_joint',
'wrist_1_joint',
'wrist_2_joint',
'wrist_3_joint',
'robotiq_85_left_knuckle_joint',
'robotiq_85_right_knuckle_joint',
'robotiq_85_left_inner_knuckle_joint',
'robotiq_85_right_inner_knuckle_joint',
'robotiq_85_left_finger_tip_joint',
'robotiq_85_right_finger_tip_joint',
],
"kp": [4500, 4500, 3500, 3500, 2000, 2000, 100, 100, 100, 100, 100, 100,],
"kd": [450, 450, 350, 350, 200, 200, 10, 10, 10, 10, 10, 10],
"force_limit_l": [-87, -87, -87, -87, -87, -87, -12, -12, -12, -100, -100, -100],
"force_limit_u": [ 87, 87, 87, 87, 87, 87, 12, 12, 12, 100, 100, 100],
"base_init_pos": [ -0, -0.9, -0.5, -1.4, -1.3, -0.3, 0.04, 0.04, 0.04, 0.04, 0.04, 0.04],
"base_init_quat": [0.70710678, -0.70710678, 0.0, 0.0],
"episode_length_s": 2.0,
"action_scale": 0.05,
"action_scale_deg": 3.0,
"simulate_action_latency": True,
"clip_actions": 1.0,
}
obs_cfg = {
"num_obs": 13,
}
reward_cfg = {
"target_pos" : [0.65, 0.0, 0.15],
"target_quat" : [-0.5, 0.5, -0.5, 0.5],
"pos_error_threshold": 0.1,
"quat_angle_threshold_deg": 20,
"quat_max_angle_deg": 120,
"quat_bonus": 3.0,
"pos_bonus": 5.0,
"both_bonus": 30.0,
"bad_pose_penalty": -100.0,
"reward_scales": {
"ee_pos" : 1.0,
"ee_quat" : 1.0,
"ee_both" : 1.0,
"ee_x" : 1.0,
"ee_quat_max" : 1.0,
"collision" : 1.0,
}
}
command_cfg = {
"num_commands": 6,
}
return env_cfg, obs_cfg, reward_cfg, command_cfg
def main():
parser = argparse.ArgumentParser()
parser.add_argument("-e", "--exp_name", type=str, default="ur-pick")
parser.add_argument("-B", "--num_envs", type=int, default=4096)
parser.add_argument("--max_iterations", type=int, default=101)
args = parser.parse_args()
gs.init(logging_level="warning")
log_dir = f"logs/{args.exp_name}"
env_cfg, obs_cfg, reward_cfg, command_cfg = get_cfgs()
train_cfg = get_train_cfg(args.exp_name, args.max_iterations)
if os.path.exists(log_dir):
shutil.rmtree(log_dir)
os.makedirs(log_dir, exist_ok=True)
pickle.dump(
[env_cfg, obs_cfg, reward_cfg, command_cfg, train_cfg],
open(f"{log_dir}/cfgs.pkl", "wb"),
)
env = UREnv(
num_envs=args.num_envs, env_cfg=env_cfg, obs_cfg=obs_cfg, reward_cfg=reward_cfg, command_cfg=command_cfg,
show_viewer=True
)
runner = OnPolicyRunner(env, train_cfg, log_dir, device=gs.device)
runner.learn(num_learning_iterations=args.max_iterations, init_at_random_ep_len=True)
if __name__ == "__main__":
main()
ur_env.py
import torch
import math
import socket
import json
import numpy as np
import genesis as gs
from genesis.utils.geom import quat_to_xyz, transform_by_quat, inv_quat, transform_quat_by_quat
from Genesis.genesis import xyz_to_quat
def gs_rand_float(lower, upper, shape, device):
return (upper - lower) * torch.rand(size=shape, device=device) + lower
HOST = '172.27.55.153'
PORT = 50009
def ompl_waypoints(start, goal, num_waypoint):
start_list = [float(x) for x in start]
goal_list = [float(x) for x in goal]
data_to_send = {
"qpos_start": start_list,
"qpos_goal": goal_list,
"num_waypoint": num_waypoint
}
with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as s:
print(f"Connecting to server {HOST}:{PORT} ...")
s.connect((HOST, PORT))
message = json.dumps(data_to_send).encode()
s.sendall(message)
response_data = recv_all(s)
response = json.loads(response_data.decode())
print("Received:", response)
return response.get("waypoint")
def recv_all(sock):
""" サーバーが送信を完了 or ソケット閉じるまで、繰り返し受信する """
buffers = []
while True:
chunk = sock.recv(4096)
if not chunk:
break
buffers.append(chunk)
return b"".join(buffers)
class UREnv:
def __init__(self, num_envs, env_cfg, obs_cfg, reward_cfg, command_cfg, show_viewer=False):
self.num_envs = num_envs
self.num_obs = obs_cfg["num_obs"]
self.num_privileged_obs = None
self.num_actions = env_cfg["num_actions"]
self.num_commands = command_cfg["num_commands"]
self.device = gs.device
self.simulate_action_latency = True
self.dt = 0.02
self.max_episode_length = math.ceil(env_cfg["episode_length_s"] / self.dt)
self.env_cfg = env_cfg
self.obs_cfg = obs_cfg
self.reward_cfg = reward_cfg
self.command_cfg = command_cfg
self.reward_scales = reward_cfg["reward_scales"]
self.reached_goal = torch.tensor([False] * self.num_envs)
self.scene = gs.Scene(
sim_options=gs.options.SimOptions(dt=self.dt, substeps=2),
viewer_options=gs.options.ViewerOptions(
max_FPS=int(0.5 / self.dt),
camera_pos=(3, -1, 1.5),
camera_lookat=(0, 0, 0.5),
camera_fov=30,
),
vis_options=gs.options.VisOptions(rendered_envs_idx=list(range(1))),
rigid_options=gs.options.RigidOptions(
dt=self.dt,
constraint_solver=gs.constraint_solver.Newton,
enable_collision=True,
enable_joint_limit=True,
),
show_viewer=show_viewer,
)
self.plane = self.scene.add_entity(
gs.morphs.Plane(),
)
self.cube = self.scene.add_entity(
gs.morphs.Box(
size=(0.04, 0.04, 0.04),
pos=(0.65, 0.0, 0.02),
)
)
self.robot = self.scene.add_entity(
gs.morphs.URDF(
file="D:/home/myproj/genesis/UR5/asset/ur5/ur5_robotiq85.urdf",
fixed=True,
),
)
self.scene.build(n_envs=num_envs)
self.motors_dof_idx = list(np.arange(6))
self.all_dof_idx = list(np.arange(12))
self.robot.set_dofs_kp(self.env_cfg["kp"], self.all_dof_idx)
self.robot.set_dofs_kv(self.env_cfg["kd"], self.all_dof_idx)
self.robot.set_dofs_force_range(
self.env_cfg["force_limit_l"],
self.env_cfg["force_limit_u"]
)
qpos = self.env_cfg["base_init_pos"]
num_envs = self.num_envs
qpos = torch.tensor(qpos, device=gs.device, dtype=gs.tc_float).repeat(num_envs, 1)
self.robot.set_qpos(qpos, envs_idx=list(range(num_envs)))
self.scene.step()
self.reward_functions, self.episode_sums = dict(), dict()
for name in self.reward_scales.keys():
self.reward_scales[name] *= self.dt
self.reward_functions[name] = getattr(self, "_reward_" + name)
self.episode_sums[name] = torch.zeros((self.num_envs,), device=gs.device, dtype=gs.tc_float)
self.obs_buf = torch.zeros((self.num_envs, self.num_obs), device=gs.device, dtype=gs.tc_float)
self.rew_buf = torch.zeros((self.num_envs,), device=gs.device, dtype=gs.tc_float)
self.reset_buf = torch.ones((self.num_envs,), device=gs.device, dtype=gs.tc_int)
self.episode_length_buf = torch.zeros((self.num_envs,), device=gs.device, dtype=gs.tc_int)
self.actions = torch.zeros((self.num_envs, self.num_actions), device=gs.device, dtype=gs.tc_float)
self.last_actions = torch.zeros_like(self.actions)
self.dof_pos = torch.zeros_like(self.actions)
self.default_dof_pos = torch.tensor(
[self.env_cfg["default_joint_angles"][name] for name in self.env_cfg["joint_names"]],
device=gs.device,
dtype=gs.tc_float,
)
self.extras = dict()
self.extras["observations"] = dict()
def _resample_commands(self, envs_idx):
pass
def step(self, actions):
self.actions = torch.clip(actions, -self.env_cfg["clip_actions"], self.env_cfg["clip_actions"])
qpos_all = self.robot.get_dofs_position(self.all_dof_idx)
pos, quat = self.robot.forward_kinematics(qpos_all)
eepos = pos[:, 6]
eequat = quat[:, 6]
ee_deg = quat_to_xyz(eequat)
delta_pos = self.actions * self.env_cfg["action_scale"]
target_eepos = eepos + delta_pos[:, 0:3]
target_eedeq = ee_deg + delta_pos[:, 3:6] * self.env_cfg["action_scale_deg"]
target_eequat = xyz_to_quat(target_eedeq)
target_dof_pos = self.robot.inverse_kinematics(
link=self.robot.get_link("wrist_3_link"),
pos=target_eepos,
quat=target_eequat,
respect_joint_limit=True,
dofs_idx_local=self.motors_dof_idx,
)
self.robot.set_qpos(target_dof_pos[:,0:6], self.motors_dof_idx)
self.robot.zero_all_dofs_velocity()
self.scene.step()
self.episode_length_buf += 1
self.dof_pos[:] = self.robot.get_dofs_position(self.motors_dof_idx)
self.reset_buf = self.episode_length_buf > self.max_episode_length
self.reset_buf |= self.reached_goal
time_out_idx = (self.episode_length_buf > self.max_episode_length).nonzero(as_tuple=False).flatten()
self.extras["time_outs"] = torch.zeros_like(self.reset_buf, device=gs.device, dtype=gs.tc_float)
self.extras["time_outs"][time_out_idx] = 1.0
self.reset_idx(self.reset_buf.nonzero(as_tuple=False).flatten())
self.rew_buf[:] = 0.0
for name, reward_func in self.reward_functions.items():
rew = reward_func() * self.reward_scales[name]
self.rew_buf += rew
self.episode_sums[name] += rew
qpos = self.robot.get_dofs_position(self.all_dof_idx)
pos, quat = self.robot.forward_kinematics(qpos)
eepos = pos[:,6]
eequat = quat[:,6]
self.obs_buf = torch.cat(
[
eepos,
eequat,
self.actions,
],
axis=-1,
)
self.last_actions[:] = self.actions[:]
self.extras["observations"]["critic"] = self.obs_buf
return self.obs_buf, self.rew_buf, self.reset_buf, self.extras
def get_observations(self):
self.extras["observations"]["critic"] = self.obs_buf
return self.obs_buf, self.extras
def get_privileged_observations(self):
return None
def reset_idx(self, envs_idx):
if len(envs_idx) == 0:
return
self.dof_pos[envs_idx] = self.default_dof_pos[0:6]
qpos = self.env_cfg["base_init_pos"]
num_envs = self.num_envs
qpos = torch.tensor(qpos, device=gs.device, dtype=gs.tc_float).repeat(len(envs_idx), 1)
self.robot.set_qpos(qpos, envs_idx=envs_idx)
self.robot.zero_all_dofs_velocity(envs_idx)
self.last_actions[envs_idx] = 0.0
self.episode_length_buf[envs_idx] = 0
self.reset_buf[envs_idx] = False
self.reached_goal[envs_idx] = False
self.extras["episode"] = {}
for key in self.episode_sums.keys():
self.extras["episode"]["rew_" + key] = (
torch.mean(self.episode_sums[key][envs_idx]).item() / self.env_cfg["episode_length_s"]
)
self.episode_sums[key][envs_idx] = 0.0
self._resample_commands(envs_idx)
def reset(self):
self.reset_buf[:] = True
self.reset_idx(torch.arange(self.num_envs, device=gs.device))
return self.obs_buf, None
def _reward_ee_pos(self):
qpos = self.robot.get_dofs_position(self.all_dof_idx)
pos, quat = self.robot.forward_kinematics(qpos)
eepos = pos[:,6].cpu().numpy()
target_pos = np.array(self.reward_cfg["target_pos"])
target_pos = np.tile(target_pos, (self.num_envs, 1))
pos_error = np.power(eepos - target_pos, 2).sum(axis=1)
reward = 5.0 / (1.0 + pos_error)
pos_error_ok = pos_error < self.reward_cfg["pos_error_threshold"]
reward[pos_error_ok] += self.reward_cfg["pos_bonus"]
return torch.tensor(reward, device=gs.device, dtype=gs.tc_float)
def _reward_ee_quat(self):
qpos = self.robot.get_dofs_position(self.all_dof_idx)
_, quat = self.robot.forward_kinematics(qpos)
eequat = quat[:, 6]
target_quat = torch.tensor(
self.reward_cfg["target_quat"], device=gs.device, dtype=gs.tc_float
).repeat(self.num_envs, 1)
dot = torch.sum(eequat * target_quat, dim=1).clamp(-1.0, 1.0)
angle = 2.0 * torch.arccos(torch.abs(dot))
angle_threshold_deg = self.reward_cfg["quat_angle_threshold_deg"]
angle_threshold_rad = torch.tensor(angle_threshold_deg * np.pi / 180.0, device=gs.device)
reward = 1.0 / (1.0 + angle)
mask = angle < angle_threshold_rad
reward[mask] += self.reward_cfg["quat_bonus"]
return reward
def _reward_ee_both(self):
qpos = self.robot.get_dofs_position(self.all_dof_idx)
pos, quat = self.robot.forward_kinematics(qpos)
eepos = pos[:, 6]
eequat = quat[:, 6]
target_pos = torch.tensor(
self.reward_cfg["target_pos"], device=gs.device, dtype=gs.tc_float
).repeat(self.num_envs, 1)
target_quat = torch.tensor(
self.reward_cfg["target_quat"], device=gs.device, dtype=gs.tc_float
).repeat(self.num_envs, 1)
pos_error = torch.sum((eepos - target_pos) ** 2, dim=1)
pos_threshold = self.reward_cfg["pos_error_threshold"]
dot = torch.sum(eequat * target_quat, dim=1).clamp(-1.0, 1.0)
angle = 2.0 * torch.arccos(torch.abs(dot))
quat_threshold_deg = self.reward_cfg["quat_angle_threshold_deg"]
quat_threshold_rad = quat_threshold_deg * np.pi / 180.0
pos_ok = pos_error < pos_threshold
quat_ok = angle < quat_threshold_rad
both_ok = pos_ok & quat_ok
reward = torch.zeros(self.num_envs, device=gs.device, dtype=gs.tc_float)
reward[both_ok] += self.reward_cfg["both_bonus"]
return reward
def _reward_ee_x(self):
qpos = self.robot.get_dofs_position(self.all_dof_idx)
pos, quat = self.robot.forward_kinematics(qpos)
eepos = pos[:,6].cpu().numpy()
pos_x_err = eepos[:, 0] < 0.0
self.reached_goal[pos_x_err] = True
self.rew_buf[pos_x_err] = self.reward_cfg["bad_pose_penalty"]
reward = np.zeros(self.num_envs, dtype=np.float32)
reward[pos_x_err] = self.reward_cfg["bad_pose_penalty"]
return torch.tensor(reward, device=gs.device, dtype=gs.tc_float)
def _reward_ee_quat_max(self):
qpos = self.robot.get_dofs_position(self.all_dof_idx)
_, quat = self.robot.forward_kinematics(qpos)
eequat = quat[:, 6]
target_quat = torch.tensor(
self.reward_cfg["target_quat"], device=gs.device, dtype=gs.tc_float
).repeat(self.num_envs, 1)
dot = torch.sum(eequat * target_quat, dim=1).clamp(-1.0, 1.0)
angle = 2.0 * torch.arccos(torch.abs(dot))
max_angle_deg = self.reward_cfg["quat_max_angle_deg"]
max_angle_rad = torch.tensor(max_angle_deg * np.pi / 180.0, device=gs.device)
quat_err = angle > max_angle_rad
self.reached_goal[quat_err] = True
reward = torch.zeros(self.num_envs, device=gs.device, dtype=gs.tc_float)
reward[quat_err] = self.reward_cfg["bad_pose_penalty"]
return reward
def _reward_collision(self):
contacts = self.robot.get_contacts(with_entity=self.plane)
try:
collision_idx = contacts["geom_a"][:,0] == 0
except IndexError:
collision_idx = torch.tensor([False] * self.num_envs)
self.reached_goal[collision_idx] = True
reward = torch.zeros(self.num_envs, device=gs.device, dtype=gs.tc_float)
reward[collision_idx] = -1.0
return reward