ぱたへね

はてなダイアリーはrustの色分けができないのでこっちに来た

diffusion policyのuv環境

空調の関係でdiffusion policyの開発環境を移動させた。 せっかくなのでuvでできるようにした。

gym=0.21.0がuvでは上手く入らず、これだけはpipで手で入れる。

qiita.com

ChatGPTにgym=0.21.0をどうしても入れたいって聞いても教えてくれる。

pyproject.tomlはこれでOK。これで、pushtの学習は進んだ。

[project]  
name = "diffusion-policy"  
version = "0.1.0"  
description = "Add your description here"  
requires-python = "==3.10.16"  
dependencies = [  
    "av>=15.0.0",  
    "diffusers>=0.34.0",  
    "dill>=0.4.0",  
    "einops>=0.8.1",  
    "hydra-core==1.2.0",  
    "matplotlib>=3.10.3",  
    "numba>=0.61.2",  
    "opencv-python>=4.11.0.86",  
    "pandas>=2.3.0",  
    "pygame>=2.6.1",  
    "pymunk==6.2.1",  
    "scikit-image>=0.25.2",  
    "shapely>=2.1.1",  
    "torch>=2.7.1",  
    "tqdm>=4.67.1",  
    "wandb>=0.21.0",  
    "zarr>=2.18.3",  
]

Genesisで剛体同士をくっつける

Genesisで物体同士をくっつける処理のやり方。吸着のシミュレーション等では必須の機能です。

これはドキュメントがあるので簡単

genesis-world.readthedocs.io

add_weld_constraint()を使うことで実現できます。

今回はロボットのgripperのうち2つの指先が箱に接触したら取れたことにする処理を追加しました。

接触判定の記事はこちらです。

natsutan.hatenablog.com

サンプルの通り、linkのidxが必要になるのですが、起動時に一度取得しておけばOKです。

    self.link_cube = np.array([self.cube.get_link("box_baselink").idx], dtype=gs.np_int)
    self.link_ee_l = np.array([self.robot.get_link("robotiq_85_left_finger_tip_link").idx], dtype=gs.np_int)
    self.link_ee_r = np.array([self.robot.get_link("robotiq_85_right_finger_tip_link").idx], dtype=gs.np_int)

get_contacts()で接触の情報を取り、箱との接触情報にrobotiq_85_left_finger_tip_linkとrobotiq_85_right_finger_tip_linkがあれば、把持に成功したとしてadd_weld_constraintで指先と箱をくっつけています。

    contacts = self.robot.get_contacts(with_entity=self.cube)
    link_b = contacts["link_b"]
    if link_b.shape[0] > 0:
        link_b = link_b.cpu().tolist()
        if self.link_ee_l in link_b and self.link_ee_r in link_b:  
            self.scene.sim.rigid_solver.add_weld_constraint(self.link_ee_l, self.link_cube)
            break

実際にくっつけてみた例です。上手くいってますね。

Genesisの衝突判定

Genesisの衝突判定を調べました。

衝突判定に使うAPIはget_contacts()です。

ソースコードはここ。

github.com

箱を取る瞬間にget_contacts()が返す戻り値を調べました。

ソースコードはこうなっていて、ロボットと箱を指定してget_contacts()を呼び出しています。

contacts = self.robot.get_contacts(with_entity=self.cube)

戻り値でgeom indexというのが出てきます。これはcubeであれば、cube.idxに値が入っています。今回の環境では、self.cube.idxは1です。

戻り値

ソースコードのコメントにあるとおり、戻り値は辞書型で返ってきます。

'geom_a'

ロボットから見た接触情報です。上の画像の状態で[1, 1, 1, 1]が返ってきているので、4箇所でcubeと接触していることがわかります。

'geom_b'

geom_aの逆で箱から見た接触情報です。上の画像の状態で[18, 18, 19, 19]が返ってきています。scene.rigid_solver.geoms[18]等でアクセスすれば接触しているオブジェクトが取得できます。

これはlinkのindexが来ます。今回、[1,1,1,1]が返ってきていて箱を表しています。scene.rigid_solver.links[1]でリンクを取得出来ます。

こちらは箱から見たlink情報です。今回は[13, 13, 14, 14]が返ってきています。 self.robot.links[11].idxが13、self.robot.links[12].idxが14なので、この2つにそれぞれ2箇所接触しています。

'position'

接触している位置を4箇所返します。

tensor([[ 6.1080e-04,  3.1838e-01, -4.0293e-01],
        [-1.7104e-02, -4.4849e+00, -1.4909e+00],
        [-1.3181e+00,  7.8808e+00, -1.3610e+00],
        [-4.9474e-04,  6.4665e+00,  4.4923e-01]], device='cuda:0')

'force_a'

箱にかかっている力を返します。ワールド座標系で、4箇所それぞれの力の値が入ってます。

tensor([[ 6.1080e-04,  3.1838e-01, -4.0293e-01],
        [-1.7104e-02, -4.4849e+00, -1.4909e+00],
        [-1.3181e+00,  7.8808e+00, -1.3610e+00],
        [-4.9474e-04,  6.4665e+00,  4.4923e-01]], device='cuda:0')

'force_b'

ロボット側にかかっている力です。作用反作用の法則でforce_aのちょうど逆になっています。

tensor([[-6.1080e-04, -3.1838e-01,  4.0293e-01],
        [ 1.7104e-02,  4.4849e+00,  1.4909e+00],
        [ 1.3181e+00, -7.8808e+00,  1.3610e+00],
        [ 4.9474e-04, -6.4665e+00, -4.4923e-01]], device='cuda:0')

"valid_mask"

並列実行をしているときはどの環境が有効かのbool値が入るようですが、今回は並列実行をしていないので値はありません。

Genesisで強化学習5

もう一度ピッキングに戻ってきたが上手く行かくなっていたのでメモ。 2箇所でおかしくなっていたが、なんとか取れるようになった。

箱を固定していた

natsutan.hatenablog.com

これの試行錯誤をしていたとき、箱が転がって目標位置が変わるのが嫌だったので箱を固定していたのを忘れました。

self.cube = self.scene.add_entity(  
    gs.morphs.Box(  
        size=(0.04, 0.04, 0.04),  
        pos=self.env_cfg["base_box_pos"],  
        #fixed=True,  
    )  
)

fixed=Trueが有効だと、当然動かないので取れなかったです。

力で制御

こっちは少し面白い。 一箇所、IDEの候補の選択を間違えて力の制御であるcontrol_dofs_positionの代わりに、set_qposが使われていた。ちょうどGripperを閉じて持ち上げるところです。この状態だとgripperを閉めるのに力は使ってますが、ロボット本体の姿勢は力では無く強制的に姿勢を設定しています。

for qpos in waypoints:  
    self.robot.set_qpos(qpos[0:6], self.motors_dof_idx)  
    self.robot.control_dofs_position(self.finger_qpos, self.fingers_dof_idx)  
    self.scene.step()

このやりかただと、持ち上げるための摩擦が働かないようで、上手く持ち上げてくれません。

ここを力制御であるcontrol_dofs_positionに変えたら上手く取れるようになりました。

for qpos in waypoints:  
    self.robot.control_dofs_position(qpos[0:6], self.motors_dof_idx)  
    self.robot.control_dofs_position(self.finger_qpos, self.fingers_dof_idx)  
    self.scene.step()

方向性としては把持が成功したら別の方法でロボットと箱を拘束し、set_qposで動かしたい。

Genesisで強化学習4

これの続き

natsutan.hatenablog.com

箱の場所をランダムにしてみた。

箱の場所は、なぜか分かっていることにします。本来は、カメラの画像から推論して欲しいが、順を踏みたい。

observationに箱のx,y,zを追加します。大事な変更はこれくらい。

        self.obs_buf = torch.cat(
            [
                eepos,  # 3
                eequat,  # 4
                box_pos, # 3
                self.actions,  # 6
            ],
            axis=-1,
        )

ソースはgithubへ

github.com

実行結果、まあまあ期待通り。

Genesisで強化学習3

これの続き

natsutan.hatenablog.com

まあまあ動くようになった。

改善点

  • EEの姿勢でクオータニオンを使っていて、Δθを求めるとき、Degに直してアクションの値を加算したいた。この時の係数がクオータニオンのままで、一回のアクションで1degも変わってなかった。最大3deg位までにするとちゃんと向きが合うようになった。

  • 1エピソードの最大数を短く。num_steps_per_envを小さくした。この値が大きいと、変な方向に行ってもそのまま動作を続けるのでjunk epsodeが増えてしまう。本やらAIに聞くと簡単なタスクなら1000~2000となっているが、仕組み的にはベストケースの数倍の方が正しいと思う。

  • NNを小さく。タスクが単純なので、NNの段数はそのままで大きさを全て半分にした。

結果

箱の少し上の場所に、EE下向きで到達すればOKにしている。

軌道生成を追加すればもう少しましな動きしそう。 最後にプルプルするのは、最短距離でゴールにいくよりゴール近くで粘った方がトータルの報酬が高くなるので、ゴール近くでランダムに動く方が高得点になるんだと思う。

ソース

そろそろgithubにした方がよさそう・・・

ur_tran.py

import argparse
import os
import pickle
import shutil
from importlib import metadata

try:
    try:
        if metadata.version("rsl-rl"):
            raise ImportError
    except metadata.PackageNotFoundError:
        if metadata.version("rsl-rl-lib") != "2.2.4":
            raise ImportError
except (metadata.PackageNotFoundError, ImportError) as e:
    raise ImportError("Please uninstall 'rsl_rl' and install 'rsl-rl-lib==2.2.4'.") from e
from rsl_rl.runners import OnPolicyRunner

import genesis as gs

from ur_env import UREnv


def get_train_cfg(exp_name, max_iterations):
    train_cfg_dict = {
        "algorithm": {
            "class_name": "PPO",
            "clip_param": 0.2,
            "desired_kl": 0.01,
            "entropy_coef": 0.01,
            "gamma": 0.99,
            "lam": 0.95,
            "learning_rate": 0.001,
            "max_grad_norm": 1.0,
            "num_learning_epochs": 5,
            "num_mini_batches": 4,
            "schedule": "adaptive",
            "use_clipped_value_loss": True,
            "value_loss_coef": 1.0,
        },
        "init_member_classes": {},
        "policy": {
            "activation": "elu",
            # "actor_hidden_dims": [512, 256, 128],
            # "critic_hidden_dims": [512, 256, 128],
            "actor_hidden_dims": [256, 128, 64],
            "critic_hidden_dims": [256, 128, 64],
            "init_noise_std": 1.0,
            "class_name": "ActorCritic",
        },
        "runner": {
            "checkpoint": -1,
            "experiment_name": exp_name,
            "load_run": -1,
            "log_interval": 1,
            "max_iterations": max_iterations,
            "record_interval": -1,
            "resume": False,
            "resume_path": None,
            "run_name": "",
        },
        "runner_class_name": "OnPolicyRunner",
        "num_steps_per_env": 100,
        "save_interval": 100,
        "empirical_normalization": None,
        "seed": 1,
    }

    return train_cfg_dict


def get_cfgs():
    env_cfg = {
        "num_actions": 6,  # Δx、Δθ
        # joint/link names
        "default_joint_angles": {  # [rad]
            'shoulder_pan_joint' : -0.0,
            'shoulder_lift_joint' : -0.9,
            'elbow_joint'  : -0.5,
            'wrist_1_joint' :  -1.4,
            'wrist_2_joint' : -1.3,
            'wrist_3_joint' : -0.3,
            'robotiq_85_left_knuckle_joint' : 0.04,
            'robotiq_85_right_knuckle_joint' : 0.04,
            'robotiq_85_left_inner_knuckle_joint' : 0.04,
            'robotiq_85_right_inner_knuckle_joint' : 0.04,
            'robotiq_85_left_finger_tip_joint' : 0.04,
            'robotiq_85_right_finger_tip_joint' : 0.04,
        },
        "joint_names": [
            'shoulder_pan_joint',
            'shoulder_lift_joint',
            'elbow_joint',
            'wrist_1_joint',
            'wrist_2_joint',
            'wrist_3_joint',
            'robotiq_85_left_knuckle_joint',
            'robotiq_85_right_knuckle_joint',
            'robotiq_85_left_inner_knuckle_joint',
            'robotiq_85_right_inner_knuckle_joint',
            'robotiq_85_left_finger_tip_joint',
            'robotiq_85_right_finger_tip_joint',
        ],
        # PD
        "kp": [4500, 4500, 3500, 3500, 2000, 2000, 100, 100, 100, 100, 100, 100,],
        "kd": [450,   450,  350,  350,  200,  200, 10, 10, 10, 10, 10, 10],
        "force_limit_l": [-87, -87, -87, -87, -87, -87, -12, -12, -12, -100, -100, -100],
        "force_limit_u": [ 87,  87,  87,  87,  87,  87,  12,  12,  12,  100,  100,  100],
        # termination

        # base pose
        "base_init_pos": [  -0, -0.9,  -0.5,  -1.4,  -1.3,  -0.3, 0.04, 0.04, 0.04, 0.04, 0.04, 0.04],
        "base_init_quat": [0.70710678, -0.70710678, 0.0, 0.0],

        "episode_length_s": 2.0,
        "action_scale": 0.05,
        "action_scale_deg": 3.0,  # deg for each action
        "simulate_action_latency": True,
        "clip_actions": 1.0,
    }
    obs_cfg = {
#        "num_obs": 25, # 3 for EE postion, 4 for EE quaternion, 12 for joint positions, 6 for action
        "num_obs": 13,  # 3 for EE postion, 4 for EE quaternion,  6 for action

    }
    reward_cfg = {
        "target_pos" : [0.65, 0.0, 0.15],
        "target_quat" : [-0.5,  0.5, -0.5,  0.5],
        "pos_error_threshold": 0.1,
        "quat_angle_threshold_deg": 20,
        "quat_max_angle_deg": 120,
        "quat_bonus": 3.0,
        "pos_bonus": 5.0,
        "both_bonus": 30.0,
        "bad_pose_penalty": -100.0,
        "reward_scales": {
            "ee_pos" : 1.0,
            "ee_quat" : 1.0,
            "ee_both" : 1.0,
            "ee_x" : 1.0,
            "ee_quat_max" : 1.0,
            "collision" : 1.0,
        }
    }
    command_cfg = {
        "num_commands": 6, # no use
    }

    return env_cfg, obs_cfg, reward_cfg, command_cfg


def main():
    parser = argparse.ArgumentParser()
    parser.add_argument("-e", "--exp_name", type=str, default="ur-pick")
    parser.add_argument("-B", "--num_envs", type=int, default=4096)
    parser.add_argument("--max_iterations", type=int, default=101)
    args = parser.parse_args()

    gs.init(logging_level="warning")

    log_dir = f"logs/{args.exp_name}"
    env_cfg, obs_cfg, reward_cfg, command_cfg = get_cfgs()
    train_cfg = get_train_cfg(args.exp_name, args.max_iterations)

    if os.path.exists(log_dir):
        shutil.rmtree(log_dir)
    os.makedirs(log_dir, exist_ok=True)

    pickle.dump(
        [env_cfg, obs_cfg, reward_cfg, command_cfg, train_cfg],
        open(f"{log_dir}/cfgs.pkl", "wb"),
    )

    env = UREnv(
        num_envs=args.num_envs, env_cfg=env_cfg, obs_cfg=obs_cfg, reward_cfg=reward_cfg, command_cfg=command_cfg,
        show_viewer=True
    )

    runner = OnPolicyRunner(env, train_cfg, log_dir, device=gs.device)

    runner.learn(num_learning_iterations=args.max_iterations, init_at_random_ep_len=True)


if __name__ == "__main__":
    main()

ur_env.py

import torch
import math
import socket
import json
import numpy as np
import genesis as gs
from genesis.utils.geom import quat_to_xyz, transform_by_quat, inv_quat, transform_quat_by_quat

from Genesis.genesis import xyz_to_quat


def gs_rand_float(lower, upper, shape, device):
    return (upper - lower) * torch.rand(size=shape, device=device) + lower

# WSL2 マシン(またはサーバー側)のIPアドレスとポートを合わせる
# >wsl -- ip addr で確認できる
HOST = '172.27.55.153'  # localhostで試す場合、WSL2/Windows間の通信は別IPになることも
PORT = 50009

def ompl_waypoints(start, goal, num_waypoint):
    start_list = [float(x) for x in start]
    goal_list = [float(x) for x in goal]

    # 送信したいデータ
    data_to_send = {
        "qpos_start": start_list,
        "qpos_goal":  goal_list,
        "num_waypoint": num_waypoint
    }

    # ソケットを作ってサーバーに接続
    with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as s:
        print(f"Connecting to server {HOST}:{PORT} ...")
        s.connect((HOST, PORT))

        # JSONでエンコードして送信
        message = json.dumps(data_to_send).encode()
        s.sendall(message)

        # レスポンス受信

        # JSONデコードして結果を表示
        response_data = recv_all(s)
        response = json.loads(response_data.decode())
        print("Received:", response)
        return response.get("waypoint")


def recv_all(sock):
    """ サーバーが送信を完了 or ソケット閉じるまで、繰り返し受信する """
    buffers = []
    while True:
        chunk = sock.recv(4096)
        if not chunk:
            # サーバー側が close した
            break
        buffers.append(chunk)
    return b"".join(buffers)


class UREnv:
    def __init__(self, num_envs, env_cfg, obs_cfg, reward_cfg, command_cfg, show_viewer=False):
        self.num_envs = num_envs
        self.num_obs = obs_cfg["num_obs"]
        self.num_privileged_obs = None
        self.num_actions = env_cfg["num_actions"]
        self.num_commands = command_cfg["num_commands"]
        self.device = gs.device

        self.simulate_action_latency = True  # there is a 1 step latency on real robot
        self.dt = 0.02  # control frequency on real robot is 50hz
        self.max_episode_length = math.ceil(env_cfg["episode_length_s"] / self.dt)

        self.env_cfg = env_cfg
        self.obs_cfg = obs_cfg
        self.reward_cfg = reward_cfg
        self.command_cfg = command_cfg

        # self.obs_scales = obs_cfg["obs_scales"]
        self.reward_scales = reward_cfg["reward_scales"]
        self.reached_goal = torch.tensor([False] * self.num_envs)  # flag to indicate if the goal is reached in any environment


        # create scene
        self.scene = gs.Scene(
            sim_options=gs.options.SimOptions(dt=self.dt, substeps=2),
            viewer_options=gs.options.ViewerOptions(
                max_FPS=int(0.5 / self.dt),
                camera_pos=(3, -1, 1.5),
                camera_lookat=(0, 0, 0.5),
                camera_fov=30,

            ),
            vis_options=gs.options.VisOptions(rendered_envs_idx=list(range(1))),
            rigid_options=gs.options.RigidOptions(
                dt=self.dt,
                constraint_solver=gs.constraint_solver.Newton,
                enable_collision=True,
                enable_joint_limit=True,
            ),
            show_viewer=show_viewer,
        )
        self.plane = self.scene.add_entity(
            gs.morphs.Plane(),
        )
        self.cube = self.scene.add_entity(
            gs.morphs.Box(
                size=(0.04, 0.04, 0.04),
                pos=(0.65, 0.0, 0.02),

            )
        )

        # add robot
        self.robot = self.scene.add_entity(
            gs.morphs.URDF(
                file="D:/home/myproj/genesis/UR5/asset/ur5/ur5_robotiq85.urdf",
                fixed=True,
            ),
        )
        # build
        self.scene.build(n_envs=num_envs)

        # names to indices
        self.motors_dof_idx = list(np.arange(6))
        self.all_dof_idx = list(np.arange(12))

        # PD control parameters
        self.robot.set_dofs_kp(self.env_cfg["kp"], self.all_dof_idx)
        self.robot.set_dofs_kv(self.env_cfg["kd"], self.all_dof_idx)
        self.robot.set_dofs_force_range(
            self.env_cfg["force_limit_l"],
            self.env_cfg["force_limit_u"]
        )
        qpos = self.env_cfg["base_init_pos"]
        # qpos[0:12]を [num_envs, 12] の形にコピーしながら変形する。
        num_envs = self.num_envs
        qpos = torch.tensor(qpos, device=gs.device, dtype=gs.tc_float).repeat(num_envs, 1)
        self.robot.set_qpos(qpos, envs_idx=list(range(num_envs)))

        self.scene.step()

        # prepare reward functions and multiply reward scales by dt
        self.reward_functions, self.episode_sums = dict(), dict()
        for name in self.reward_scales.keys():
            self.reward_scales[name] *= self.dt
            self.reward_functions[name] = getattr(self, "_reward_" + name)
            self.episode_sums[name] = torch.zeros((self.num_envs,), device=gs.device, dtype=gs.tc_float)

        # initialize buffers
        self.obs_buf = torch.zeros((self.num_envs, self.num_obs), device=gs.device, dtype=gs.tc_float)
        self.rew_buf = torch.zeros((self.num_envs,), device=gs.device, dtype=gs.tc_float)
        self.reset_buf = torch.ones((self.num_envs,), device=gs.device, dtype=gs.tc_int)
        self.episode_length_buf = torch.zeros((self.num_envs,), device=gs.device, dtype=gs.tc_int)
        self.actions = torch.zeros((self.num_envs, self.num_actions), device=gs.device, dtype=gs.tc_float)
        self.last_actions = torch.zeros_like(self.actions)
        self.dof_pos = torch.zeros_like(self.actions)
        self.default_dof_pos = torch.tensor(
            [self.env_cfg["default_joint_angles"][name] for name in self.env_cfg["joint_names"]],
            device=gs.device,
            dtype=gs.tc_float,
        )
        self.extras = dict()  # extra information for logging
        self.extras["observations"] = dict()

    def _resample_commands(self, envs_idx):
        pass

    def step(self, actions):
        # print("actions", actions[0])
        self.actions = torch.clip(actions, -self.env_cfg["clip_actions"], self.env_cfg["clip_actions"])

        qpos_all = self.robot.get_dofs_position(self.all_dof_idx) # only use the first 6 dofs for control
        pos, quat = self.robot.forward_kinematics(qpos_all)
        eepos = pos[:, 6]  # shape: (num_envs, 3)
        eequat = quat[:, 6]  # shape: (num_envs, 4)

        # eequatをdeg(rx, ry, rz)に変換
        ee_deg = quat_to_xyz(eequat)

        delta_pos = self.actions * self.env_cfg["action_scale"]
        target_eepos = eepos + delta_pos[:, 0:3]
        target_eedeq = ee_deg + delta_pos[:, 3:6] *  self.env_cfg["action_scale_deg"]

        target_eequat = xyz_to_quat(target_eedeq)

        target_dof_pos = self.robot.inverse_kinematics(
            link=self.robot.get_link("wrist_3_link"),
            pos=target_eepos,
            quat=target_eequat,
            respect_joint_limit=True,
            dofs_idx_local=self.motors_dof_idx,
        )

        self.robot.set_qpos(target_dof_pos[:,0:6], self.motors_dof_idx)
        self.robot.zero_all_dofs_velocity()
        self.scene.step()

        # update buffers
        self.episode_length_buf += 1
        self.dof_pos[:] = self.robot.get_dofs_position(self.motors_dof_idx)
        # check termination and reset
        self.reset_buf = self.episode_length_buf > self.max_episode_length
        self.reset_buf |= self.reached_goal

        time_out_idx = (self.episode_length_buf > self.max_episode_length).nonzero(as_tuple=False).flatten()
        self.extras["time_outs"] = torch.zeros_like(self.reset_buf, device=gs.device, dtype=gs.tc_float)
        self.extras["time_outs"][time_out_idx] = 1.0

        self.reset_idx(self.reset_buf.nonzero(as_tuple=False).flatten())

        # compute reward
        self.rew_buf[:] = 0.0
        for name, reward_func in self.reward_functions.items():
            rew = reward_func() * self.reward_scales[name]
            self.rew_buf += rew
            self.episode_sums[name] += rew

        qpos = self.robot.get_dofs_position(self.all_dof_idx)
        pos, quat = self.robot.forward_kinematics(qpos)
        eepos = pos[:,6]
        eequat = quat[:,6]

        # compute observations
        self.obs_buf = torch.cat(
            [
                eepos,  # 3
                eequat,  # 4
#                qpos,  # 12
                self.actions,  # 6
            ],
            axis=-1,
        )

        self.last_actions[:] = self.actions[:]

        self.extras["observations"]["critic"] = self.obs_buf

        return self.obs_buf, self.rew_buf, self.reset_buf, self.extras

    def get_observations(self):
        self.extras["observations"]["critic"] = self.obs_buf
        return self.obs_buf, self.extras

    def get_privileged_observations(self):
        return None

    def reset_idx(self, envs_idx):
        if len(envs_idx) == 0:
            return

        # reset dofs
        self.dof_pos[envs_idx] = self.default_dof_pos[0:6]
        qpos = self.env_cfg["base_init_pos"]
        # qpos[0:12]を [num_envs, 12] の形にコピーしながら変形する。
        num_envs = self.num_envs
        qpos = torch.tensor(qpos, device=gs.device, dtype=gs.tc_float).repeat(len(envs_idx), 1)
        self.robot.set_qpos(qpos, envs_idx=envs_idx)

#        self.robot.set_qpos(self.env_cfg["base_init_pos"], envs_idx=envs_idx)
        # reset base
        self.robot.zero_all_dofs_velocity(envs_idx)


        # reset buffers
        self.last_actions[envs_idx] = 0.0
        # self.last_dof_vel[envs_idx] = 0.0
        self.episode_length_buf[envs_idx] = 0
        self.reset_buf[envs_idx] = False
        self.reached_goal[envs_idx] = False

        # fill extras
        self.extras["episode"] = {}
        for key in self.episode_sums.keys():
            self.extras["episode"]["rew_" + key] = (
                torch.mean(self.episode_sums[key][envs_idx]).item() / self.env_cfg["episode_length_s"]
            )
            self.episode_sums[key][envs_idx] = 0.0

        self._resample_commands(envs_idx)

    def reset(self):
        self.reset_buf[:] = True
        self.reset_idx(torch.arange(self.num_envs, device=gs.device))
        return self.obs_buf, None

    # ------------ reward functions----------------
    def _reward_ee_pos(self):
        qpos = self.robot.get_dofs_position(self.all_dof_idx)
        pos, quat = self.robot.forward_kinematics(qpos)
        eepos = pos[:,6].cpu().numpy()
        target_pos = np.array(self.reward_cfg["target_pos"])

        # tartget_pos をself.scene.envsの形に変形する
        target_pos = np.tile(target_pos, (self.num_envs, 1))

        # Calculate the distance between the end effector position and the target position
        pos_error = np.power(eepos - target_pos, 2).sum(axis=1)
        # Calculate the orientation error using quaternion distance

        reward = 5.0 / (1.0 + pos_error)

        pos_error_ok = pos_error < self.reward_cfg["pos_error_threshold"]
        reward[pos_error_ok] += self.reward_cfg["pos_bonus"]

        return torch.tensor(reward, device=gs.device, dtype=gs.tc_float)

    def _reward_ee_quat(self):
        qpos = self.robot.get_dofs_position(self.all_dof_idx)
        _, quat = self.robot.forward_kinematics(qpos)
        eequat = quat[:, 6]  # shape: (num_envs, 4)

        # 目標クオータニオンをテンソル化
        target_quat = torch.tensor(
            self.reward_cfg["target_quat"], device=gs.device, dtype=gs.tc_float
        ).repeat(self.num_envs, 1)  # shape: (num_envs, 4)

        # dot = cos(θ/2)
        dot = torch.sum(eequat * target_quat, dim=1).clamp(-1.0, 1.0)
        angle = 2.0 * torch.arccos(torch.abs(dot))  # ラジアン

        # 角度しきい値(ラジアン)を指定
        angle_threshold_deg = self.reward_cfg["quat_angle_threshold_deg"]
        angle_threshold_rad = torch.tensor(angle_threshold_deg * np.pi / 180.0, device=gs.device)

        # 報酬:誤差が小さいほど大きく、しきい値以下でボーナス
        # reward = -angle  # 基本は負の報酬(距離の近さに対応)

        reward = 1.0 / (1.0 + angle) # 角度が小さいほど報酬が大きくなる

        # しきい値以下なら追加報酬
        mask = angle < angle_threshold_rad
        reward[mask] += self.reward_cfg["quat_bonus"]

        return reward

    def _reward_ee_both(self):
        qpos = self.robot.get_dofs_position(self.all_dof_idx)
        pos, quat = self.robot.forward_kinematics(qpos)
        eepos = pos[:, 6]  # shape: (num_envs, 3)
        eequat = quat[:, 6]  # shape: (num_envs, 4)

        # 目標姿勢・位置をテンソル化
        target_pos = torch.tensor(
            self.reward_cfg["target_pos"], device=gs.device, dtype=gs.tc_float
        ).repeat(self.num_envs, 1)  # shape: (num_envs, 3)

        target_quat = torch.tensor(
            self.reward_cfg["target_quat"], device=gs.device, dtype=gs.tc_float
        ).repeat(self.num_envs, 1)  # shape: (num_envs, 4)

        # --- 位置誤差(L2ノルムの2乗) ---
        pos_error = torch.sum((eepos - target_pos) ** 2, dim=1)  # shape: (num_envs,)
        pos_threshold = self.reward_cfg["pos_error_threshold"]

        # --- クオータニオン角度誤差 ---
        dot = torch.sum(eequat * target_quat, dim=1).clamp(-1.0, 1.0)
        angle = 2.0 * torch.arccos(torch.abs(dot))  # ラジアン

        quat_threshold_deg = self.reward_cfg["quat_angle_threshold_deg"]
        quat_threshold_rad = quat_threshold_deg * np.pi / 180.0

        # --- 両方OKなら報酬を与える ---
        pos_ok = pos_error < pos_threshold
        quat_ok = angle < quat_threshold_rad
        both_ok = pos_ok & quat_ok

        reward = torch.zeros(self.num_envs, device=gs.device, dtype=gs.tc_float)
        reward[both_ok] += self.reward_cfg["both_bonus"]

        return reward

    def _reward_ee_x(self):
        # xが負の値の時はペナルティを与えエピソードを終了する。
        qpos = self.robot.get_dofs_position(self.all_dof_idx)
        pos, quat = self.robot.forward_kinematics(qpos)
        eepos = pos[:,6].cpu().numpy()

        pos_x_err = eepos[:, 0] < 0.0
        self.reached_goal[pos_x_err] = True
        self.rew_buf[pos_x_err] = self.reward_cfg["bad_pose_penalty"]
        reward = np.zeros(self.num_envs, dtype=np.float32)
        reward[pos_x_err] = self.reward_cfg["bad_pose_penalty"]

        return torch.tensor(reward, device=gs.device, dtype=gs.tc_float)

    def _reward_ee_quat_max(self):
        qpos = self.robot.get_dofs_position(self.all_dof_idx)
        _, quat = self.robot.forward_kinematics(qpos)
        eequat = quat[:, 6]  # shape: (num_envs, 4)

        # 目標クオータニオン
        target_quat = torch.tensor(
            self.reward_cfg["target_quat"], device=gs.device, dtype=gs.tc_float
        ).repeat(self.num_envs, 1)

        # 回転角度 θ = 2 * arccos(|dot|)
        dot = torch.sum(eequat * target_quat, dim=1).clamp(-1.0, 1.0)
        angle = 2.0 * torch.arccos(torch.abs(dot))  # ラジアン

        # 最大許容角度(度 → ラジアン)
        max_angle_deg = self.reward_cfg["quat_max_angle_deg"]
        max_angle_rad = torch.tensor(max_angle_deg * np.pi / 180.0, device=gs.device)

        # マスク:許容範囲を超えた環境
        quat_err = angle > max_angle_rad
        #print("angle", angle.cpu().numpy(), "max_angle_rad", max_angle_rad.cpu().numpy(), "quat_err", quat_err.cpu().numpy())

        # エピソード終了フラグと大きな罰
        self.reached_goal[quat_err] = True
        reward = torch.zeros(self.num_envs, device=gs.device, dtype=gs.tc_float)
        reward[quat_err] = self.reward_cfg["bad_pose_penalty"]

        return reward

    def _reward_collision(self):
        # 衝突があった場合、報酬を-100.0に設定し、エピソードを終了する。
        contacts = self.robot.get_contacts(with_entity=self.plane)

        try:
            collision_idx = contacts["geom_a"][:,0] == 0
        except IndexError:
            # 衝突がない場合、collision_idxは全てfalseになる
            collision_idx = torch.tensor([False] * self.num_envs)
        # print("collision_idx", collision_idx.cpu().numpy())

        self.reached_goal[collision_idx] = True
        reward = torch.zeros(self.num_envs, device=gs.device, dtype=gs.tc_float)
        reward[collision_idx] = -1.0

        return reward

Genesisで強化学習2

これの続き

natsutan.hatenablog.com

始めの状態

スタート地点はここ。とりあえず箱の近くにEEを持っていてほしい。アルゴリズムはとりあえずPPO、この程度なら十分だと思ったが上手く行かない。

なんか学習するんだけど、途中からおかしくなっておかしいままになる。 報酬も高くなった後どんどん下がっていくのがグラフで分かる。

いろいろやったこと

最初はアクションを各ジョイントの変位(Δθ)で出して、それを使ってPD制御をしてみた。実行してみると、上の通り変な勢いがついてアームをグルングルン回すようになる。勢いをつけて回した方が、高い報酬を受け取る領域を通過しやすかったんじゃないかと思う。当然、実際のURロボットはこんな動きをしないので、まずは動きを安定させるところから。

次はΔθを使ってPD制御ではなくset_qposを使って直接ロボの姿勢を変えるようにした。こうすると、実機ではあり得ない姿勢を取ったり、EEが地面にめり込んだりする。とはいえ、学習は進んでいる様子

地面との衝突判定(後述)を追加し、ペナルティーを与える。これで地面貫通は無くなったが、ペナルティが強すぎるとびびってEEを下げてくれない。この状態でも慣性がつきすぎてる感じがしてので、zero_all_dofs_velocity()でロボの姿勢を変えた後に全ての速度を0にした。

ジョイントの変位で制御するのが難しく感じたので、RT1とかと同じようにEEの変位(Δx、Δθ)をアクションにするようにした。これだとシンプルに近づければよいので、タスクの難易度が下がるはず。Δx、Δθを使って次のEEの姿勢を求め、IKを使って、set_qposを実行する。IKが安定しなくてロボが一瞬変な姿勢を取るが、上手く言ったらちゃんと軌道生成すれば良くなると思っていったん放置する。

そういうのとは別に、学習を1つのロボットでやると局所解に入ったときに抜け出せなくなっていたので、サンプルの通り4096並列で実行した。これは上手く効いて変な局所解に入らないようになった。こういうのを知るために、学習の様子は可視化して目で確認していった方が良い。いったん良くなって悪くなる場合と、全く良くならない場合は対処が変わってくる。可視化せずに学習して、最後のモデルの結果だけ見てもそれは分からない。

Genesisでの衝突判定

Genesisでも当然各物体の衝突判定はできるはずだが、ドキュメントが空っぽ。

genesis-world.readthedocs.io

なので、ソースからそれっぽく調べていく。

github.com

get_contacts()を使うと、接触情報が取れる。

並列環境じゃ無い環境だと比較的簡単に衝突判定ができる。

    constacts = ur.get_contacts(with_entity=plane)
    if constacts["geom_a"].shape[0] > 0:
        print("contact detected")

urがロボットで地面との衝突判定をする場合は、その対象を引数にget_contactsを呼ぶ。接触しているときは、位置や力等の情報が返ってくるが接触していないときは、それぞれが空のTensorになっているのでshapeで接触判定ができる。

並列環境だと難しいというか良く分からなくて、接触していようがいまいが並列環境分のTensorが返ってくる。torch.tensorなので、何かしらの長方形にしないといけないからだとおもう。実際に移動させてやってみると、geom_aの先頭をみると接触しているかどうかわかりそう。 接触していないときは、 tensor(14, 14, 14, 12, 12, 12, 0, 0, 0, 0, 0, device='cuda:0', dtype=torch.int32)、接触しているとtensor( 0, 0, 0, 12, 12, 12, 14, 14, 14, 0, 0, 0, 0, 0, device='cuda:0', dtype=torch.int32)の様なTensorが返ってくる。多分これが何かのindexの気がするんだが、ちょっとよくわかなくて、ドキュメントとサンプル待ち。

ただし、まれに並列環境でも空tensorが返ってくる事があり、これはたぶん並列分全てで接触が全く無かったときだと思う。

ドキュメントとサンプル欲しい。

現状

この辺をケアして動かしたのがこれ。

EEの向きがおかしいのは、EEの姿勢が最初クオータニオンで、Deg、クオータニオンの変換をしていて、ここの計算が怪しいとにらんでる。

難しい。