commit
91fa46f232
|
@ -0,0 +1,56 @@
|
||||||
|
# Enviroments
|
||||||
|
|
||||||
|
| Name | Linear | Nonlinear | State Size | Input size |
|
||||||
|
|:----------|:---------------:|:----------------:|:----------------:|:----------------:|
|
||||||
|
| First Order Lag System | ✓ | x | 4 | 2 |
|
||||||
|
| Two wheeled System (Constant Goal) | x | ✓ | 3 | 2 |
|
||||||
|
| Two wheeled System (Moving Goal) (Coming soon) | x | ✓ | 3 | 2 |
|
||||||
|
| Cartpole (Swing up) | x | ✓ | 4 | 1 |
|
||||||
|
|
||||||
|
## FistOrderLagEnv
|
||||||
|
|
||||||
|
### System equation.
|
||||||
|
|
||||||
|
<img src="assets/firstorderlag.png" width="550">
|
||||||
|
|
||||||
|
You can set arbinatry time constant, tau. The default is 0.63 s
|
||||||
|
|
||||||
|
### Cost.
|
||||||
|
|
||||||
|
<img src="assets/quadratic_score.png" width="300">
|
||||||
|
|
||||||
|
Q = diag[1., 1., 1., 1.],
|
||||||
|
R = diag[1., 1.]
|
||||||
|
|
||||||
|
X_g denote the goal states.
|
||||||
|
|
||||||
|
## TwoWheeledEnv
|
||||||
|
|
||||||
|
### System equation.
|
||||||
|
|
||||||
|
<img src="assets/twowheeled.png" width="300">
|
||||||
|
|
||||||
|
### Cost.
|
||||||
|
|
||||||
|
<img src="assets/quadratic_score.png" width="300">
|
||||||
|
|
||||||
|
Q = diag[5., 5., 1.],
|
||||||
|
R = diag[0.1, 0.1]
|
||||||
|
|
||||||
|
X_g denote the goal states.
|
||||||
|
|
||||||
|
## CatpoleEnv (Swing up)
|
||||||
|
|
||||||
|
System equation.
|
||||||
|
|
||||||
|
<img src="assets/cartpole.png" width="600">
|
||||||
|
|
||||||
|
You can set arbinatry parameters, mc, mp, l and g.
|
||||||
|
|
||||||
|
Default settings are as follows:
|
||||||
|
|
||||||
|
mc = 1, mp = 0.2, l = 0.5, g = 9.81
|
||||||
|
|
||||||
|
### Cost.
|
||||||
|
|
||||||
|
<img src="assets/cartpole_score.png" width="300">
|
|
@ -1,2 +1 @@
|
||||||
import numpy as np
|
import numpy as np
|
||||||
|
|
||||||
|
|
|
@ -0,0 +1,218 @@
|
||||||
|
import numpy as np
|
||||||
|
|
||||||
|
class CartPoleConfigModule():
|
||||||
|
# parameters
|
||||||
|
ENV_NAME = "CartPole-v0"
|
||||||
|
TYPE = "Nonlinear"
|
||||||
|
TASK_HORIZON = 500
|
||||||
|
PRED_LEN = 50
|
||||||
|
STATE_SIZE = 4
|
||||||
|
INPUT_SIZE = 1
|
||||||
|
DT = 0.02
|
||||||
|
# cost parameters
|
||||||
|
R = np.diag([0.01])
|
||||||
|
# bounds
|
||||||
|
INPUT_LOWER_BOUND = np.array([-3.])
|
||||||
|
INPUT_UPPER_BOUND = np.array([3.])
|
||||||
|
# parameters
|
||||||
|
MP = 0.2
|
||||||
|
MC = 1.
|
||||||
|
L = 0.5
|
||||||
|
G = 9.81
|
||||||
|
|
||||||
|
def __init__(self):
|
||||||
|
"""
|
||||||
|
"""
|
||||||
|
# opt configs
|
||||||
|
self.opt_config = {
|
||||||
|
"Random": {
|
||||||
|
"popsize": 5000
|
||||||
|
},
|
||||||
|
"CEM": {
|
||||||
|
"popsize": 500,
|
||||||
|
"num_elites": 50,
|
||||||
|
"max_iters": 15,
|
||||||
|
"alpha": 0.3,
|
||||||
|
"init_var":9.,
|
||||||
|
"threshold":0.001
|
||||||
|
},
|
||||||
|
"MPPI":{
|
||||||
|
"beta" : 0.6,
|
||||||
|
"popsize": 5000,
|
||||||
|
"kappa": 0.9,
|
||||||
|
"noise_sigma": 0.5,
|
||||||
|
},
|
||||||
|
"MPPIWilliams":{
|
||||||
|
"popsize": 5000,
|
||||||
|
"lambda": 1.,
|
||||||
|
"noise_sigma": 0.9,
|
||||||
|
},
|
||||||
|
"iLQR":{
|
||||||
|
"max_iter": 500,
|
||||||
|
"init_mu": 1.,
|
||||||
|
"mu_min": 1e-6,
|
||||||
|
"mu_max": 1e10,
|
||||||
|
"init_delta": 2.,
|
||||||
|
"threshold": 1e-6,
|
||||||
|
},
|
||||||
|
"DDP":{
|
||||||
|
"max_iter": 500,
|
||||||
|
"init_mu": 1.,
|
||||||
|
"mu_min": 1e-6,
|
||||||
|
"mu_max": 1e10,
|
||||||
|
"init_delta": 2.,
|
||||||
|
"threshold": 1e-6,
|
||||||
|
},
|
||||||
|
"NMPC-CGMRES":{
|
||||||
|
},
|
||||||
|
"NMPC-Newton":{
|
||||||
|
},
|
||||||
|
}
|
||||||
|
|
||||||
|
@staticmethod
|
||||||
|
def input_cost_fn(u):
|
||||||
|
""" input cost functions
|
||||||
|
Args:
|
||||||
|
u (numpy.ndarray): input, shape(pred_len, input_size)
|
||||||
|
or shape(pop_size, pred_len, input_size)
|
||||||
|
Returns:
|
||||||
|
cost (numpy.ndarray): cost of input, shape(pred_len, input_size) or
|
||||||
|
shape(pop_size, pred_len, input_size)
|
||||||
|
"""
|
||||||
|
return (u**2) * np.diag(CartPoleConfigModule.R)
|
||||||
|
|
||||||
|
@staticmethod
|
||||||
|
def state_cost_fn(x, g_x):
|
||||||
|
""" state cost function
|
||||||
|
Args:
|
||||||
|
x (numpy.ndarray): state, shape(pred_len, state_size)
|
||||||
|
or shape(pop_size, pred_len, state_size)
|
||||||
|
g_x (numpy.ndarray): goal state, shape(pred_len, state_size)
|
||||||
|
or shape(pop_size, pred_len, state_size)
|
||||||
|
Returns:
|
||||||
|
cost (numpy.ndarray): cost of state, shape(pred_len, 1) or
|
||||||
|
shape(pop_size, pred_len, 1)
|
||||||
|
"""
|
||||||
|
|
||||||
|
if len(x.shape) > 2:
|
||||||
|
return (6. * (x[:, :, 0]**2) \
|
||||||
|
+ 12. * ((np.cos(x[:, :, 2]) + 1.)**2) \
|
||||||
|
+ 0.1 * (x[:, :, 1]**2) \
|
||||||
|
+ 0.1 * (x[:, :, 3]**2))[:, :, np.newaxis]
|
||||||
|
|
||||||
|
elif len(x.shape) > 1:
|
||||||
|
return (6. * (x[:, 0]**2) \
|
||||||
|
+ 12. * ((np.cos(x[:, 2]) + 1.)**2) \
|
||||||
|
+ 0.1 * (x[:, 1]**2) \
|
||||||
|
+ 0.1 * (x[:, 3]**2))[:, np.newaxis]
|
||||||
|
|
||||||
|
return 6. * (x[0]**2) \
|
||||||
|
+ 12. * ((np.cos(x[2]) + 1.)**2) \
|
||||||
|
+ 0.1 * (x[1]**2) \
|
||||||
|
+ 0.1 * (x[3]**2)
|
||||||
|
|
||||||
|
@staticmethod
|
||||||
|
def terminal_state_cost_fn(terminal_x, terminal_g_x):
|
||||||
|
"""
|
||||||
|
Args:
|
||||||
|
terminal_x (numpy.ndarray): terminal state,
|
||||||
|
shape(state_size, ) or shape(pop_size, state_size)
|
||||||
|
terminal_g_x (numpy.ndarray): terminal goal state,
|
||||||
|
shape(state_size, ) or shape(pop_size, state_size)
|
||||||
|
Returns:
|
||||||
|
cost (numpy.ndarray): cost of state, shape(pred_len, ) or
|
||||||
|
shape(pop_size, pred_len)
|
||||||
|
"""
|
||||||
|
|
||||||
|
if len(terminal_x.shape) > 1:
|
||||||
|
return (6. * (terminal_x[:, 0]**2) \
|
||||||
|
+ 12. * ((np.cos(terminal_x[:, 2]) + 1.)**2) \
|
||||||
|
+ 0.1 * (terminal_x[:, 1]**2) \
|
||||||
|
+ 0.1 * (terminal_x[:, 3]**2))[:, np.newaxis]
|
||||||
|
|
||||||
|
return 6. * (terminal_x[0]**2) \
|
||||||
|
+ 12. * ((np.cos(terminal_x[2]) + 1.)**2) \
|
||||||
|
+ 0.1 * (terminal_x[1]**2) \
|
||||||
|
+ 0.1 * (terminal_x[3]**2)
|
||||||
|
|
||||||
|
@staticmethod
|
||||||
|
def gradient_cost_fn_with_state(x, g_x, terminal=False):
|
||||||
|
""" gradient of costs with respect to the state
|
||||||
|
|
||||||
|
Args:
|
||||||
|
x (numpy.ndarray): state, shape(pred_len, state_size)
|
||||||
|
g_x (numpy.ndarray): goal state, shape(pred_len, state_size)
|
||||||
|
|
||||||
|
Returns:
|
||||||
|
l_x (numpy.ndarray): gradient of cost, shape(pred_len, state_size)
|
||||||
|
or shape(1, state_size)
|
||||||
|
"""
|
||||||
|
if not terminal:
|
||||||
|
return None
|
||||||
|
|
||||||
|
return None
|
||||||
|
|
||||||
|
@staticmethod
|
||||||
|
def gradient_cost_fn_with_input(x, u):
|
||||||
|
""" gradient of costs with respect to the input
|
||||||
|
|
||||||
|
Args:
|
||||||
|
x (numpy.ndarray): state, shape(pred_len, state_size)
|
||||||
|
u (numpy.ndarray): goal state, shape(pred_len, input_size)
|
||||||
|
|
||||||
|
Returns:
|
||||||
|
l_u (numpy.ndarray): gradient of cost, shape(pred_len, input_size)
|
||||||
|
"""
|
||||||
|
return None
|
||||||
|
|
||||||
|
@staticmethod
|
||||||
|
def hessian_cost_fn_with_state(x, g_x, terminal=False):
|
||||||
|
""" hessian costs with respect to the state
|
||||||
|
|
||||||
|
Args:
|
||||||
|
x (numpy.ndarray): state, shape(pred_len, state_size)
|
||||||
|
g_x (numpy.ndarray): goal state, shape(pred_len, state_size)
|
||||||
|
|
||||||
|
Returns:
|
||||||
|
l_xx (numpy.ndarray): gradient of cost,
|
||||||
|
shape(pred_len, state_size, state_size) or
|
||||||
|
shape(1, state_size, state_size) or
|
||||||
|
"""
|
||||||
|
if not terminal:
|
||||||
|
(pred_len, _) = x.shape
|
||||||
|
return None
|
||||||
|
|
||||||
|
return None
|
||||||
|
|
||||||
|
@staticmethod
|
||||||
|
def hessian_cost_fn_with_input(x, u):
|
||||||
|
""" hessian costs with respect to the input
|
||||||
|
|
||||||
|
Args:
|
||||||
|
x (numpy.ndarray): state, shape(pred_len, state_size)
|
||||||
|
u (numpy.ndarray): goal state, shape(pred_len, input_size)
|
||||||
|
|
||||||
|
Returns:
|
||||||
|
l_uu (numpy.ndarray): gradient of cost,
|
||||||
|
shape(pred_len, input_size, input_size)
|
||||||
|
"""
|
||||||
|
(pred_len, _) = u.shape
|
||||||
|
|
||||||
|
return None
|
||||||
|
|
||||||
|
@staticmethod
|
||||||
|
def hessian_cost_fn_with_input_state(x, u):
|
||||||
|
""" hessian costs with respect to the state and input
|
||||||
|
|
||||||
|
Args:
|
||||||
|
x (numpy.ndarray): state, shape(pred_len, state_size)
|
||||||
|
u (numpy.ndarray): goal state, shape(pred_len, input_size)
|
||||||
|
|
||||||
|
Returns:
|
||||||
|
l_ux (numpy.ndarray): gradient of cost ,
|
||||||
|
shape(pred_len, input_size, state_size)
|
||||||
|
"""
|
||||||
|
(_, state_size) = x.shape
|
||||||
|
(pred_len, input_size) = u.shape
|
||||||
|
|
||||||
|
return np.zeros((pred_len, input_size, state_size))
|
|
@ -5,7 +5,7 @@ class FirstOrderLagConfigModule():
|
||||||
ENV_NAME = "FirstOrderLag-v0"
|
ENV_NAME = "FirstOrderLag-v0"
|
||||||
TYPE = "Linear"
|
TYPE = "Linear"
|
||||||
TASK_HORIZON = 1000
|
TASK_HORIZON = 1000
|
||||||
PRED_LEN = 10
|
PRED_LEN = 50
|
||||||
STATE_SIZE = 4
|
STATE_SIZE = 4
|
||||||
INPUT_SIZE = 2
|
INPUT_SIZE = 2
|
||||||
DT = 0.05
|
DT = 0.05
|
||||||
|
@ -43,8 +43,33 @@ class FirstOrderLagConfigModule():
|
||||||
"kappa": 0.9,
|
"kappa": 0.9,
|
||||||
"noise_sigma": 0.5,
|
"noise_sigma": 0.5,
|
||||||
},
|
},
|
||||||
|
"MPPIWilliams":{
|
||||||
|
"popsize": 5000,
|
||||||
|
"lambda": 1.,
|
||||||
|
"noise_sigma": 0.9,
|
||||||
|
},
|
||||||
"MPC":{
|
"MPC":{
|
||||||
}
|
},
|
||||||
|
"iLQR":{
|
||||||
|
"max_iter": 500,
|
||||||
|
"init_mu": 1.,
|
||||||
|
"mu_min": 1e-6,
|
||||||
|
"mu_max": 1e10,
|
||||||
|
"init_delta": 2.,
|
||||||
|
"threshold": 1e-6,
|
||||||
|
},
|
||||||
|
"DDP":{
|
||||||
|
"max_iter": 500,
|
||||||
|
"init_mu": 1.,
|
||||||
|
"mu_min": 1e-6,
|
||||||
|
"mu_max": 1e10,
|
||||||
|
"init_delta": 2.,
|
||||||
|
"threshold": 1e-6,
|
||||||
|
},
|
||||||
|
"NMPC-CGMRES":{
|
||||||
|
},
|
||||||
|
"NMPC-Newton":{
|
||||||
|
},
|
||||||
}
|
}
|
||||||
|
|
||||||
@staticmethod
|
@staticmethod
|
||||||
|
@ -86,4 +111,89 @@ class FirstOrderLagConfigModule():
|
||||||
shape(pop_size, pred_len)
|
shape(pop_size, pred_len)
|
||||||
"""
|
"""
|
||||||
return ((terminal_x - terminal_g_x)**2) \
|
return ((terminal_x - terminal_g_x)**2) \
|
||||||
* np.diag(FirstOrderLagConfigModule.Sf)
|
* np.diag(FirstOrderLagConfigModule.Sf)
|
||||||
|
|
||||||
|
@staticmethod
|
||||||
|
def gradient_cost_fn_with_state(x, g_x, terminal=False):
|
||||||
|
""" gradient of costs with respect to the state
|
||||||
|
|
||||||
|
Args:
|
||||||
|
x (numpy.ndarray): state, shape(pred_len, state_size)
|
||||||
|
g_x (numpy.ndarray): goal state, shape(pred_len, state_size)
|
||||||
|
|
||||||
|
Returns:
|
||||||
|
l_x (numpy.ndarray): gradient of cost, shape(pred_len, state_size)
|
||||||
|
or shape(1, state_size)
|
||||||
|
"""
|
||||||
|
if not terminal:
|
||||||
|
return 2. * (x - g_x) * np.diag(FirstOrderLagConfigModule.Q)
|
||||||
|
|
||||||
|
return (2. * (x - g_x) \
|
||||||
|
* np.diag(FirstOrderLagConfigModule.Sf))[np.newaxis, :]
|
||||||
|
|
||||||
|
@staticmethod
|
||||||
|
def gradient_cost_fn_with_input(x, u):
|
||||||
|
""" gradient of costs with respect to the input
|
||||||
|
|
||||||
|
Args:
|
||||||
|
x (numpy.ndarray): state, shape(pred_len, state_size)
|
||||||
|
u (numpy.ndarray): goal state, shape(pred_len, input_size)
|
||||||
|
|
||||||
|
Returns:
|
||||||
|
l_u (numpy.ndarray): gradient of cost, shape(pred_len, input_size)
|
||||||
|
"""
|
||||||
|
return 2. * u * np.diag(FirstOrderLagConfigModule.R)
|
||||||
|
|
||||||
|
@staticmethod
|
||||||
|
def hessian_cost_fn_with_state(x, g_x, terminal=False):
|
||||||
|
""" hessian costs with respect to the state
|
||||||
|
|
||||||
|
Args:
|
||||||
|
x (numpy.ndarray): state, shape(pred_len, state_size)
|
||||||
|
g_x (numpy.ndarray): goal state, shape(pred_len, state_size)
|
||||||
|
|
||||||
|
Returns:
|
||||||
|
l_xx (numpy.ndarray): gradient of cost,
|
||||||
|
shape(pred_len, state_size, state_size) or
|
||||||
|
shape(1, state_size, state_size) or
|
||||||
|
"""
|
||||||
|
if not terminal:
|
||||||
|
(pred_len, _) = x.shape
|
||||||
|
return -g_x[:, :, np.newaxis] \
|
||||||
|
* np.tile(2.*FirstOrderLagConfigModule.Q, (pred_len, 1, 1))
|
||||||
|
|
||||||
|
return -g_x[:, np.newaxis] \
|
||||||
|
* np.tile(2.*FirstOrderLagConfigModule.Sf, (1, 1, 1))
|
||||||
|
|
||||||
|
@staticmethod
|
||||||
|
def hessian_cost_fn_with_input(x, u):
|
||||||
|
""" hessian costs with respect to the input
|
||||||
|
|
||||||
|
Args:
|
||||||
|
x (numpy.ndarray): state, shape(pred_len, state_size)
|
||||||
|
u (numpy.ndarray): goal state, shape(pred_len, input_size)
|
||||||
|
|
||||||
|
Returns:
|
||||||
|
l_uu (numpy.ndarray): gradient of cost,
|
||||||
|
shape(pred_len, input_size, input_size)
|
||||||
|
"""
|
||||||
|
(pred_len, _) = u.shape
|
||||||
|
|
||||||
|
return np.tile(2.*FirstOrderLagConfigModule.R, (pred_len, 1, 1))
|
||||||
|
|
||||||
|
@staticmethod
|
||||||
|
def hessian_cost_fn_with_input_state(x, u):
|
||||||
|
""" hessian costs with respect to the state and input
|
||||||
|
|
||||||
|
Args:
|
||||||
|
x (numpy.ndarray): state, shape(pred_len, state_size)
|
||||||
|
u (numpy.ndarray): goal state, shape(pred_len, input_size)
|
||||||
|
|
||||||
|
Returns:
|
||||||
|
l_ux (numpy.ndarray): gradient of cost ,
|
||||||
|
shape(pred_len, input_size, state_size)
|
||||||
|
"""
|
||||||
|
(_, state_size) = x.shape
|
||||||
|
(pred_len, input_size) = u.shape
|
||||||
|
|
||||||
|
return np.zeros((pred_len, input_size, state_size))
|
||||||
|
|
|
@ -1,5 +1,6 @@
|
||||||
from .first_order_lag import FirstOrderLagConfigModule
|
from .first_order_lag import FirstOrderLagConfigModule
|
||||||
from .two_wheeled import TwoWheeledConfigModule
|
from .two_wheeled import TwoWheeledConfigModule
|
||||||
|
from .cartpole import CartPoleConfigModule
|
||||||
|
|
||||||
def make_config(args):
|
def make_config(args):
|
||||||
"""
|
"""
|
||||||
|
@ -9,4 +10,6 @@ def make_config(args):
|
||||||
if args.env == "FirstOrderLag":
|
if args.env == "FirstOrderLag":
|
||||||
return FirstOrderLagConfigModule()
|
return FirstOrderLagConfigModule()
|
||||||
elif args.env == "TwoWheeledConst" or args.env == "TwoWheeled":
|
elif args.env == "TwoWheeledConst" or args.env == "TwoWheeled":
|
||||||
return TwoWheeledConfigModule()
|
return TwoWheeledConfigModule()
|
||||||
|
elif args.env == "CartPole":
|
||||||
|
return CartPoleConfigModule()
|
|
@ -39,6 +39,11 @@ class TwoWheeledConfigModule():
|
||||||
"kappa": 0.9,
|
"kappa": 0.9,
|
||||||
"noise_sigma": 0.5,
|
"noise_sigma": 0.5,
|
||||||
},
|
},
|
||||||
|
"MPPIWilliams":{
|
||||||
|
"popsize": 5000,
|
||||||
|
"lambda": 1,
|
||||||
|
"noise_sigma": 1.,
|
||||||
|
},
|
||||||
"iLQR":{
|
"iLQR":{
|
||||||
"max_iter": 500,
|
"max_iter": 500,
|
||||||
"init_mu": 1.,
|
"init_mu": 1.,
|
||||||
|
|
|
@ -23,10 +23,6 @@ class DDP(Controller):
|
||||||
"""
|
"""
|
||||||
super(DDP, self).__init__(config, model)
|
super(DDP, self).__init__(config, model)
|
||||||
|
|
||||||
if config.TYPE != "Nonlinear":
|
|
||||||
raise ValueError("{} could be not applied to \
|
|
||||||
this controller".format(model))
|
|
||||||
|
|
||||||
# model
|
# model
|
||||||
self.model = model
|
self.model = model
|
||||||
|
|
||||||
|
@ -296,6 +292,7 @@ class DDP(Controller):
|
||||||
|
|
||||||
def backward(self, f_x, f_u, f_xx, f_ux, f_uu, l_x, l_xx, l_u, l_uu, l_ux):
|
def backward(self, f_x, f_u, f_xx, f_ux, f_uu, l_x, l_xx, l_u, l_uu, l_ux):
|
||||||
""" backward step of iLQR
|
""" backward step of iLQR
|
||||||
|
|
||||||
Args:
|
Args:
|
||||||
f_x (numpy.ndarray): gradient of model with respecto to state,
|
f_x (numpy.ndarray): gradient of model with respecto to state,
|
||||||
shape(pred_len+1, state_size, state_size)
|
shape(pred_len+1, state_size, state_size)
|
||||||
|
@ -317,7 +314,6 @@ class DDP(Controller):
|
||||||
shape(pred_len, input_size, input_size)
|
shape(pred_len, input_size, input_size)
|
||||||
l_ux (numpy.ndarray): hessian of cost with respect
|
l_ux (numpy.ndarray): hessian of cost with respect
|
||||||
to state and input, shape(pred_len, input_size, state_size)
|
to state and input, shape(pred_len, input_size, state_size)
|
||||||
|
|
||||||
Returns:
|
Returns:
|
||||||
k (numpy.ndarray): gain, shape(pred_len, input_size)
|
k (numpy.ndarray): gain, shape(pred_len, input_size)
|
||||||
K (numpy.ndarray): gain, shape(pred_len, input_size, state_size)
|
K (numpy.ndarray): gain, shape(pred_len, input_size, state_size)
|
||||||
|
@ -353,7 +349,8 @@ class DDP(Controller):
|
||||||
|
|
||||||
def _Q(self, f_x, f_u, f_xx, f_ux, f_uu,
|
def _Q(self, f_x, f_u, f_xx, f_ux, f_uu,
|
||||||
l_x, l_u, l_xx, l_ux, l_uu, V_x, V_xx):
|
l_x, l_u, l_xx, l_ux, l_uu, V_x, V_xx):
|
||||||
"""Computes second order expansion.
|
""" compute Q function valued
|
||||||
|
|
||||||
Args:
|
Args:
|
||||||
f_x (numpy.ndarray): gradient of model with respecto to state,
|
f_x (numpy.ndarray): gradient of model with respecto to state,
|
||||||
shape(state_size, state_size)
|
shape(state_size, state_size)
|
||||||
|
|
|
@ -21,10 +21,6 @@ class iLQR(Controller):
|
||||||
"""
|
"""
|
||||||
super(iLQR, self).__init__(config, model)
|
super(iLQR, self).__init__(config, model)
|
||||||
|
|
||||||
if config.TYPE != "Nonlinear":
|
|
||||||
raise ValueError("{} could be not applied to \
|
|
||||||
this controller".format(model))
|
|
||||||
|
|
||||||
# model
|
# model
|
||||||
self.model = model
|
self.model = model
|
||||||
|
|
||||||
|
|
|
@ -2,6 +2,7 @@ from .mpc import LinearMPC
|
||||||
from .cem import CEM
|
from .cem import CEM
|
||||||
from .random import RandomShooting
|
from .random import RandomShooting
|
||||||
from .mppi import MPPI
|
from .mppi import MPPI
|
||||||
|
from .mppi_williams import MPPIWilliams
|
||||||
from .ilqr import iLQR
|
from .ilqr import iLQR
|
||||||
from .ddp import DDP
|
from .ddp import DDP
|
||||||
|
|
||||||
|
@ -15,6 +16,8 @@ def make_controller(args, config, model):
|
||||||
return RandomShooting(config, model)
|
return RandomShooting(config, model)
|
||||||
elif args.controller_type == "MPPI":
|
elif args.controller_type == "MPPI":
|
||||||
return MPPI(config, model)
|
return MPPI(config, model)
|
||||||
|
elif args.controller_type == "MPPIWilliams":
|
||||||
|
return MPPIWilliams(config, model)
|
||||||
elif args.controller_type == "iLQR":
|
elif args.controller_type == "iLQR":
|
||||||
return iLQR(config, model)
|
return iLQR(config, model)
|
||||||
elif args.controller_type == "DDP":
|
elif args.controller_type == "DDP":
|
||||||
|
|
|
@ -0,0 +1,143 @@
|
||||||
|
from logging import getLogger
|
||||||
|
|
||||||
|
import numpy as np
|
||||||
|
import scipy.stats as stats
|
||||||
|
|
||||||
|
from .controller import Controller
|
||||||
|
from ..envs.cost import calc_cost
|
||||||
|
|
||||||
|
logger = getLogger(__name__)
|
||||||
|
|
||||||
|
class MPPIWilliams(Controller):
|
||||||
|
""" Model Predictive Path Integral for linear and nonlinear method
|
||||||
|
|
||||||
|
Attributes:
|
||||||
|
history_u (list[numpy.ndarray]): time history of optimal input
|
||||||
|
Ref:
|
||||||
|
G. Williams et al., "Information theoretic MPC
|
||||||
|
for model-based reinforcement learning,"
|
||||||
|
2017 IEEE International Conference on Robotics and Automation (ICRA),
|
||||||
|
Singapore, 2017, pp. 1714-1721.
|
||||||
|
"""
|
||||||
|
def __init__(self, config, model):
|
||||||
|
super(MPPIWilliams, self).__init__(config, model)
|
||||||
|
|
||||||
|
# model
|
||||||
|
self.model = model
|
||||||
|
|
||||||
|
# general parameters
|
||||||
|
self.pred_len = config.PRED_LEN
|
||||||
|
self.input_size = config.INPUT_SIZE
|
||||||
|
|
||||||
|
# mppi parameters
|
||||||
|
self.pop_size = config.opt_config["MPPIWilliams"]["popsize"]
|
||||||
|
self.lam = config.opt_config["MPPIWilliams"]["lambda"]
|
||||||
|
self.noise_sigma = config.opt_config["MPPIWilliams"]["noise_sigma"]
|
||||||
|
self.opt_dim = self.input_size * self.pred_len
|
||||||
|
|
||||||
|
# get bound
|
||||||
|
self.input_upper_bounds = np.tile(config.INPUT_UPPER_BOUND,
|
||||||
|
(self.pred_len, 1))
|
||||||
|
self.input_lower_bounds = np.tile(config.INPUT_LOWER_BOUND,
|
||||||
|
(self.pred_len, 1))
|
||||||
|
|
||||||
|
# get cost func
|
||||||
|
self.state_cost_fn = config.state_cost_fn
|
||||||
|
self.terminal_state_cost_fn = config.terminal_state_cost_fn
|
||||||
|
self.input_cost_fn = config.input_cost_fn
|
||||||
|
|
||||||
|
# init mean
|
||||||
|
self.prev_sol = np.tile((config.INPUT_UPPER_BOUND \
|
||||||
|
+ config.INPUT_LOWER_BOUND) / 2.,
|
||||||
|
self.pred_len)
|
||||||
|
self.prev_sol = self.prev_sol.reshape(self.pred_len, self.input_size)
|
||||||
|
|
||||||
|
# save
|
||||||
|
self.history_u = [np.zeros(self.input_size)]
|
||||||
|
|
||||||
|
def clear_sol(self):
|
||||||
|
""" clear prev sol
|
||||||
|
"""
|
||||||
|
logger.debug("Clear Solution")
|
||||||
|
self.prev_sol = \
|
||||||
|
(self.input_upper_bounds + self.input_lower_bounds) / 2.
|
||||||
|
self.prev_sol = self.prev_sol.reshape(self.pred_len, self.input_size)
|
||||||
|
|
||||||
|
def calc_cost(self, curr_x, samples, g_xs):
|
||||||
|
""" calculate the cost of input samples by using MPPI's eq
|
||||||
|
|
||||||
|
Args:
|
||||||
|
curr_x (numpy.ndarray): shape(state_size),
|
||||||
|
current robot position
|
||||||
|
samples (numpy.ndarray): shape(pop_size, opt_dim),
|
||||||
|
input samples
|
||||||
|
g_xs (numpy.ndarray): shape(pred_len, state_size),
|
||||||
|
goal states
|
||||||
|
Returns:
|
||||||
|
costs (numpy.ndarray): shape(pop_size, )
|
||||||
|
"""
|
||||||
|
# get size
|
||||||
|
pop_size = samples.shape[0]
|
||||||
|
g_xs = np.tile(g_xs, (pop_size, 1, 1))
|
||||||
|
|
||||||
|
# calc cost, pred_xs.shape = (pop_size, pred_len+1, state_size)
|
||||||
|
pred_xs = self.model.predict_traj(curr_x, samples)
|
||||||
|
|
||||||
|
# get particle cost
|
||||||
|
costs = calc_cost(pred_xs, samples, g_xs,
|
||||||
|
self.state_cost_fn, None, \
|
||||||
|
self.terminal_state_cost_fn)
|
||||||
|
|
||||||
|
return costs
|
||||||
|
|
||||||
|
def obtain_sol(self, curr_x, g_xs):
|
||||||
|
""" calculate the optimal inputs
|
||||||
|
|
||||||
|
Args:
|
||||||
|
curr_x (numpy.ndarray): current state, shape(state_size, )
|
||||||
|
g_xs (numpy.ndarrya): goal trajectory, shape(plan_len, state_size)
|
||||||
|
Returns:
|
||||||
|
opt_input (numpy.ndarray): optimal input, shape(input_size, )
|
||||||
|
"""
|
||||||
|
# get noised inputs
|
||||||
|
noise = np.random.normal(
|
||||||
|
loc=0, scale=1.0, size=(self.pop_size, self.pred_len,
|
||||||
|
self.input_size)) * self.noise_sigma
|
||||||
|
|
||||||
|
noised_inputs = self.prev_sol + noise
|
||||||
|
|
||||||
|
# clip actions
|
||||||
|
noised_inputs = np.clip(
|
||||||
|
noised_inputs, self.input_lower_bounds, self.input_upper_bounds)
|
||||||
|
|
||||||
|
# calc cost
|
||||||
|
costs = self.calc_cost(curr_x, noised_inputs, g_xs)
|
||||||
|
|
||||||
|
costs += np.sum(np.sum(
|
||||||
|
self.lam * self.prev_sol * noise / self.noise_sigma,
|
||||||
|
axis=-1), axis=-1)
|
||||||
|
|
||||||
|
# mppi update
|
||||||
|
beta = np.min(costs)
|
||||||
|
eta = np.sum(np.exp(- 1. / self.lam * (costs - beta)), axis=0) \
|
||||||
|
+ 1e-10
|
||||||
|
|
||||||
|
# weight
|
||||||
|
# eta.shape = (pred_len, input_size)
|
||||||
|
weights = np.exp(- 1. / self.lam * (costs - beta)) / eta
|
||||||
|
|
||||||
|
# update inputs
|
||||||
|
sol = self.prev_sol \
|
||||||
|
+ np.sum(weights[:, np.newaxis, np.newaxis] * noise, axis=0)
|
||||||
|
|
||||||
|
# update
|
||||||
|
self.prev_sol[:-1] = sol[1:]
|
||||||
|
self.prev_sol[-1] = sol[-1] # last use the terminal input
|
||||||
|
|
||||||
|
# log
|
||||||
|
self.history_u.append(sol[0])
|
||||||
|
|
||||||
|
return sol[0]
|
||||||
|
|
||||||
|
def __str__(self):
|
||||||
|
return "MPPIWilliams"
|
|
@ -0,0 +1,114 @@
|
||||||
|
import numpy as np
|
||||||
|
|
||||||
|
from .env import Env
|
||||||
|
|
||||||
|
class CartPoleEnv(Env):
|
||||||
|
""" Cartpole Environment
|
||||||
|
|
||||||
|
Ref :
|
||||||
|
https://ocw.mit.edu/courses/
|
||||||
|
electrical-engineering-and-computer-science/
|
||||||
|
6-832-underactuated-robotics-spring-2009/readings/
|
||||||
|
MIT6_832s09_read_ch03.pdf
|
||||||
|
"""
|
||||||
|
def __init__(self):
|
||||||
|
"""
|
||||||
|
"""
|
||||||
|
self.config = {"state_size" : 4,
|
||||||
|
"input_size" : 1,
|
||||||
|
"dt" : 0.02,
|
||||||
|
"max_step" : 500,
|
||||||
|
"input_lower_bound": [-3.],
|
||||||
|
"input_upper_bound": [3.],
|
||||||
|
"mp": 0.2,
|
||||||
|
"mc": 1.,
|
||||||
|
"l": 0.5,
|
||||||
|
"g": 9.81,
|
||||||
|
}
|
||||||
|
|
||||||
|
super(CartPoleEnv, self).__init__(self.config)
|
||||||
|
|
||||||
|
def reset(self, init_x=None):
|
||||||
|
""" reset state
|
||||||
|
|
||||||
|
Returns:
|
||||||
|
init_x (numpy.ndarray): initial state, shape(state_size, )
|
||||||
|
info (dict): information
|
||||||
|
"""
|
||||||
|
self.step_count = 0
|
||||||
|
|
||||||
|
self.curr_x = np.array([0., 0., 0., 0.])
|
||||||
|
|
||||||
|
if init_x is not None:
|
||||||
|
self.curr_x = init_x
|
||||||
|
|
||||||
|
# goal
|
||||||
|
self.g_x = np.array([0., 0., -np.pi, 0.])
|
||||||
|
|
||||||
|
# clear memory
|
||||||
|
self.history_x = []
|
||||||
|
self.history_g_x = []
|
||||||
|
|
||||||
|
return self.curr_x, {"goal_state": self.g_x}
|
||||||
|
|
||||||
|
def step(self, u):
|
||||||
|
""" step environments
|
||||||
|
|
||||||
|
Args:
|
||||||
|
u (numpy.ndarray) : input, shape(input_size, )
|
||||||
|
Returns:
|
||||||
|
next_x (numpy.ndarray): next state, shape(state_size, )
|
||||||
|
cost (float): costs
|
||||||
|
done (bool): end the simulation or not
|
||||||
|
info (dict): information
|
||||||
|
"""
|
||||||
|
# clip action
|
||||||
|
if self.config["input_lower_bound"] is not None:
|
||||||
|
u = np.clip(u,
|
||||||
|
self.config["input_lower_bound"],
|
||||||
|
self.config["input_upper_bound"])
|
||||||
|
|
||||||
|
# step
|
||||||
|
# x
|
||||||
|
d_x0 = self.curr_x[1]
|
||||||
|
# v_x
|
||||||
|
d_x1 = (u[0] + self.config["mp"] * np.sin(self.curr_x[2]) \
|
||||||
|
* (self.config["l"] * (self.curr_x[3]**2) \
|
||||||
|
+ self.config["g"] * np.cos(self.curr_x[2]))) \
|
||||||
|
/ (self.config["mc"] + self.config["mp"] \
|
||||||
|
* (np.sin(self.curr_x[2])**2))
|
||||||
|
# theta
|
||||||
|
d_x2 = self.curr_x[3]
|
||||||
|
|
||||||
|
# v_theta
|
||||||
|
d_x3 = (-u[0] * np.cos(self.curr_x[2]) \
|
||||||
|
- self.config["mp"] * self.config["l"] * (self.curr_x[3]**2) \
|
||||||
|
* np.cos(self.curr_x[2]) * np.sin(self.curr_x[2]) \
|
||||||
|
- (self.config["mc"] + self.config["mp"]) * self.config["g"] \
|
||||||
|
* np.sin(self.curr_x[2])) \
|
||||||
|
/ (self.config["l"] * (self.config["mc"] + self.config["mp"] \
|
||||||
|
* (np.sin(self.curr_x[2])**2)))
|
||||||
|
|
||||||
|
next_x = self.curr_x +\
|
||||||
|
np.array([d_x0, d_x1, d_x2, d_x3]) * self.config["dt"]
|
||||||
|
|
||||||
|
# TODO: costs
|
||||||
|
costs = 0.
|
||||||
|
costs += 0.1 * np.sum(u**2)
|
||||||
|
costs += 6. * self.curr_x[0]**2 \
|
||||||
|
+ 12. * (np.cos(self.curr_x[2]) + 1.)**2 \
|
||||||
|
+ 0.1 * self.curr_x[1]**2 \
|
||||||
|
+ 0.1 * self.curr_x[3]**2
|
||||||
|
|
||||||
|
# save history
|
||||||
|
self.history_x.append(next_x.flatten())
|
||||||
|
self.history_g_x.append(self.g_x.flatten())
|
||||||
|
|
||||||
|
# update
|
||||||
|
self.curr_x = next_x.flatten().copy()
|
||||||
|
# update costs
|
||||||
|
self.step_count += 1
|
||||||
|
|
||||||
|
return next_x.flatten(), costs, \
|
||||||
|
self.step_count > self.config["max_step"], \
|
||||||
|
{"goal_state" : self.g_x}
|
|
@ -22,16 +22,22 @@ def calc_cost(pred_xs, input_sample, g_xs,
|
||||||
cost (numpy.ndarray): cost of the input sample, shape(pop_size, )
|
cost (numpy.ndarray): cost of the input sample, shape(pop_size, )
|
||||||
"""
|
"""
|
||||||
# state cost
|
# state cost
|
||||||
state_pred_par_cost = state_cost_fn(pred_xs[:, 1:-1, :], g_xs[:, 1:-1, :])
|
state_cost = 0.
|
||||||
state_cost = np.sum(np.sum(state_pred_par_cost, axis=-1), axis=-1)
|
if state_cost_fn is not None:
|
||||||
|
state_pred_par_cost = state_cost_fn(pred_xs[:, 1:-1, :], g_xs[:, 1:-1, :])
|
||||||
|
state_cost = np.sum(np.sum(state_pred_par_cost, axis=-1), axis=-1)
|
||||||
|
|
||||||
# terminal cost
|
# terminal cost
|
||||||
terminal_state_par_cost = terminal_state_cost_fn(pred_xs[:, -1, :],
|
terminal_state_cost = 0.
|
||||||
g_xs[:, -1, :])
|
if terminal_state_cost_fn is not None:
|
||||||
terminal_state_cost = np.sum(terminal_state_par_cost, axis=-1)
|
terminal_state_par_cost = terminal_state_cost_fn(pred_xs[:, -1, :],
|
||||||
|
g_xs[:, -1, :])
|
||||||
|
terminal_state_cost = np.sum(terminal_state_par_cost, axis=-1)
|
||||||
|
|
||||||
# act cost
|
# act cost
|
||||||
act_pred_par_cost = input_cost_fn(input_sample)
|
act_cost = 0.
|
||||||
act_cost = np.sum(np.sum(act_pred_par_cost, axis=-1), axis=-1)
|
if input_cost_fn is not None:
|
||||||
|
act_pred_par_cost = input_cost_fn(input_sample)
|
||||||
|
act_cost = np.sum(np.sum(act_pred_par_cost, axis=-1), axis=-1)
|
||||||
|
|
||||||
return state_cost + terminal_state_cost + act_cost
|
return state_cost + terminal_state_cost + act_cost
|
|
@ -1,5 +1,6 @@
|
||||||
from .first_order_lag import FirstOrderLagEnv
|
from .first_order_lag import FirstOrderLagEnv
|
||||||
from .two_wheeled import TwoWheeledConstEnv
|
from .two_wheeled import TwoWheeledConstEnv
|
||||||
|
from .cartpole import CartPoleEnv
|
||||||
|
|
||||||
def make_env(args):
|
def make_env(args):
|
||||||
|
|
||||||
|
@ -7,5 +8,7 @@ def make_env(args):
|
||||||
return FirstOrderLagEnv()
|
return FirstOrderLagEnv()
|
||||||
elif args.env == "TwoWheeledConst":
|
elif args.env == "TwoWheeledConst":
|
||||||
return TwoWheeledConstEnv()
|
return TwoWheeledConstEnv()
|
||||||
|
elif args.env == "CartPole":
|
||||||
|
return CartPoleEnv()
|
||||||
|
|
||||||
raise NotImplementedError("There is not {} Env".format(args.env))
|
raise NotImplementedError("There is not {} Env".format(args.env))
|
|
@ -86,7 +86,7 @@ class TwoWheeledConstEnv(Env):
|
||||||
# TODO: costs
|
# TODO: costs
|
||||||
costs = 0.
|
costs = 0.
|
||||||
costs += 0.1 * np.sum(u**2)
|
costs += 0.1 * np.sum(u**2)
|
||||||
costs += np.sum((self.curr_x - self.g_x)**2)
|
costs += np.sum(((self.curr_x - self.g_x)**2) * np.array([5., 5., 1.]))
|
||||||
|
|
||||||
# save history
|
# save history
|
||||||
self.history_x.append(next_x.flatten())
|
self.history_x.append(next_x.flatten())
|
||||||
|
|
|
@ -0,0 +1,186 @@
|
||||||
|
import numpy as np
|
||||||
|
|
||||||
|
from .model import Model
|
||||||
|
|
||||||
|
class CartPoleModel(Model):
|
||||||
|
""" cartpole model
|
||||||
|
"""
|
||||||
|
def __init__(self, config):
|
||||||
|
"""
|
||||||
|
"""
|
||||||
|
super(CartPoleModel, self).__init__()
|
||||||
|
self.dt = config.DT
|
||||||
|
self.mc = config.MC
|
||||||
|
self.mp = config.MP
|
||||||
|
self.l = config.L
|
||||||
|
self.g = config.G
|
||||||
|
|
||||||
|
def predict_next_state(self, curr_x, u):
|
||||||
|
""" predict next state
|
||||||
|
|
||||||
|
Args:
|
||||||
|
curr_x (numpy.ndarray): current state, shape(state_size, ) or
|
||||||
|
shape(pop_size, state_size)
|
||||||
|
u (numpy.ndarray): input, shape(input_size, ) or
|
||||||
|
shape(pop_size, input_size)
|
||||||
|
Returns:
|
||||||
|
next_x (numpy.ndarray): next state, shape(state_size, ) or
|
||||||
|
shape(pop_size, state_size)
|
||||||
|
"""
|
||||||
|
if len(u.shape) == 1:
|
||||||
|
# x
|
||||||
|
d_x0 = curr_x[1]
|
||||||
|
# v_x
|
||||||
|
d_x1 = (u[0] + self.mp * np.sin(curr_x[2]) \
|
||||||
|
* (self.l * (curr_x[3]**2) \
|
||||||
|
+ self.g * np.cos(curr_x[2]))) \
|
||||||
|
/ (self.mc + self.mp * (np.sin(curr_x[2])**2))
|
||||||
|
# theta
|
||||||
|
d_x2 = curr_x[3]
|
||||||
|
# v_theta
|
||||||
|
d_x3 = (-u[0] * np.cos(curr_x[2]) \
|
||||||
|
- self.mp * self.l * (curr_x[3]**2) \
|
||||||
|
* np.cos(curr_x[2]) * np.sin(curr_x[2]) \
|
||||||
|
- (self.mc + self.mp) * self.g * np.sin(curr_x[2])) \
|
||||||
|
/ (self.l * (self.mc + self.mp * (np.sin(curr_x[2])**2)))
|
||||||
|
|
||||||
|
next_x = curr_x +\
|
||||||
|
np.array([d_x0, d_x1, d_x2, d_x3]) * self.dt
|
||||||
|
|
||||||
|
return next_x
|
||||||
|
|
||||||
|
elif len(u.shape) == 2:
|
||||||
|
# x
|
||||||
|
d_x0 = curr_x[:, 1]
|
||||||
|
# v_x
|
||||||
|
d_x1 = (u[:, 0] + self.mp * np.sin(curr_x[:, 2]) \
|
||||||
|
* (self.l * (curr_x[:, 3]**2) \
|
||||||
|
+ self.g * np.cos(curr_x[:, 2]))) \
|
||||||
|
/ (self.mc + self.mp * (np.sin(curr_x[:, 2])**2))
|
||||||
|
# theta
|
||||||
|
d_x2 = curr_x[:, 3]
|
||||||
|
# v_theta
|
||||||
|
d_x3 = (-u[:, 0] * np.cos(curr_x[:, 2]) \
|
||||||
|
- self.mp * self.l * (curr_x[:, 3]**2) \
|
||||||
|
* np.cos(curr_x[:, 2]) * np.sin(curr_x[:, 2]) \
|
||||||
|
- (self.mc + self.mp) * self.g * np.sin(curr_x[:, 2])) \
|
||||||
|
/ (self.l * (self.mc + self.mp * (np.sin(curr_x[:, 2])**2)))
|
||||||
|
|
||||||
|
next_x = curr_x +\
|
||||||
|
np.stack((d_x0, d_x1, d_x2, d_x3), axis=1) * self.dt
|
||||||
|
|
||||||
|
return next_x
|
||||||
|
|
||||||
|
def calc_f_x(self, xs, us, dt):
|
||||||
|
""" gradient of model with respect to the state in batch form
|
||||||
|
Args:
|
||||||
|
xs (numpy.ndarray): state, shape(pred_len+1, state_size)
|
||||||
|
us (numpy.ndarray): input, shape(pred_len, input_size,)
|
||||||
|
|
||||||
|
Return:
|
||||||
|
f_x (numpy.ndarray): gradient of model with respect to x,
|
||||||
|
shape(pred_len, state_size, state_size)
|
||||||
|
|
||||||
|
Notes:
|
||||||
|
This should be discrete form !!
|
||||||
|
"""
|
||||||
|
# get size
|
||||||
|
(_, state_size) = xs.shape
|
||||||
|
(pred_len, _) = us.shape
|
||||||
|
|
||||||
|
f_x = np.zeros((pred_len, state_size, state_size))
|
||||||
|
|
||||||
|
f_x[:, 0, 2] = -np.sin(xs[:, 2]) * us[:, 0]
|
||||||
|
f_x[:, 1, 2] = np.cos(xs[:, 2]) * us[:, 0]
|
||||||
|
|
||||||
|
return f_x * dt + np.eye(state_size) # to discrete form
|
||||||
|
|
||||||
|
def calc_f_u(self, xs, us, dt):
|
||||||
|
""" gradient of model with respect to the input in batch form
|
||||||
|
Args:
|
||||||
|
xs (numpy.ndarray): state, shape(pred_len+1, state_size)
|
||||||
|
us (numpy.ndarray): input, shape(pred_len, input_size,)
|
||||||
|
|
||||||
|
Return:
|
||||||
|
f_u (numpy.ndarray): gradient of model with respect to x,
|
||||||
|
shape(pred_len, state_size, input_size)
|
||||||
|
|
||||||
|
Notes:
|
||||||
|
This should be discrete form !!
|
||||||
|
"""
|
||||||
|
# get size
|
||||||
|
(_, state_size) = xs.shape
|
||||||
|
(pred_len, input_size) = us.shape
|
||||||
|
|
||||||
|
f_u = np.zeros((pred_len, state_size, input_size))
|
||||||
|
|
||||||
|
f_u[:, 1, 0] = 1. / (self.mc + self.mp * (np.sin(xs[:, 2])**2))
|
||||||
|
|
||||||
|
f_u[:, 3, 0] = -np.cos(xs[:, 2]) \
|
||||||
|
/ (self.l * (self.mc \
|
||||||
|
+ self.mp * (np.sin(xs[:, 2])**2)))
|
||||||
|
|
||||||
|
return f_u * dt # to discrete form
|
||||||
|
|
||||||
|
def calc_f_xx(self, xs, us, dt):
|
||||||
|
""" hessian of model with respect to the state in batch form
|
||||||
|
|
||||||
|
Args:
|
||||||
|
xs (numpy.ndarray): state, shape(pred_len+1, state_size)
|
||||||
|
us (numpy.ndarray): input, shape(pred_len, input_size,)
|
||||||
|
|
||||||
|
Return:
|
||||||
|
f_xx (numpy.ndarray): gradient of model with respect to x,
|
||||||
|
shape(pred_len, state_size, state_size, state_size)
|
||||||
|
"""
|
||||||
|
# get size
|
||||||
|
(_, state_size) = xs.shape
|
||||||
|
(pred_len, _) = us.shape
|
||||||
|
|
||||||
|
f_xx = np.zeros((pred_len, state_size, state_size, state_size))
|
||||||
|
|
||||||
|
f_xx[:, 0, 2, 2] = -np.cos(xs[:, 2]) * us[:, 0]
|
||||||
|
f_xx[:, 1, 2, 2] = -np.sin(xs[:, 2]) * us[:, 0]
|
||||||
|
|
||||||
|
return f_xx * dt
|
||||||
|
|
||||||
|
def calc_f_ux(self, xs, us, dt):
|
||||||
|
""" hessian of model with respect to state and input in batch form
|
||||||
|
|
||||||
|
Args:
|
||||||
|
xs (numpy.ndarray): state, shape(pred_len+1, state_size)
|
||||||
|
us (numpy.ndarray): input, shape(pred_len, input_size,)
|
||||||
|
|
||||||
|
Return:
|
||||||
|
f_ux (numpy.ndarray): gradient of model with respect to x,
|
||||||
|
shape(pred_len, state_size, input_size, state_size)
|
||||||
|
"""
|
||||||
|
# get size
|
||||||
|
(_, state_size) = xs.shape
|
||||||
|
(pred_len, input_size) = us.shape
|
||||||
|
|
||||||
|
f_ux = np.zeros((pred_len, state_size, input_size, state_size))
|
||||||
|
|
||||||
|
f_ux[:, 0, 0, 2] = -np.sin(xs[:, 2])
|
||||||
|
f_ux[:, 1, 0, 2] = np.cos(xs[:, 2])
|
||||||
|
|
||||||
|
return f_ux * dt
|
||||||
|
|
||||||
|
def calc_f_uu(self, xs, us, dt):
|
||||||
|
""" hessian of model with respect to input in batch form
|
||||||
|
|
||||||
|
Args:
|
||||||
|
xs (numpy.ndarray): state, shape(pred_len+1, state_size)
|
||||||
|
us (numpy.ndarray): input, shape(pred_len, input_size,)
|
||||||
|
|
||||||
|
Return:
|
||||||
|
f_uu (numpy.ndarray): gradient of model with respect to x,
|
||||||
|
shape(pred_len, state_size, input_size, input_size)
|
||||||
|
"""
|
||||||
|
# get size
|
||||||
|
(_, state_size) = xs.shape
|
||||||
|
(pred_len, input_size) = us.shape
|
||||||
|
|
||||||
|
f_uu = np.zeros((pred_len, state_size, input_size, input_size))
|
||||||
|
|
||||||
|
return f_uu * dt
|
|
@ -1,5 +1,6 @@
|
||||||
from .first_order_lag import FirstOrderLagModel
|
from .first_order_lag import FirstOrderLagModel
|
||||||
from .two_wheeled import TwoWheeledModel
|
from .two_wheeled import TwoWheeledModel
|
||||||
|
from .cartpole import CartPoleModel
|
||||||
|
|
||||||
def make_model(args, config):
|
def make_model(args, config):
|
||||||
|
|
||||||
|
@ -7,5 +8,7 @@ def make_model(args, config):
|
||||||
return FirstOrderLagModel(config)
|
return FirstOrderLagModel(config)
|
||||||
elif args.env == "TwoWheeledConst" or args.env == "TwoWheeled":
|
elif args.env == "TwoWheeledConst" or args.env == "TwoWheeled":
|
||||||
return TwoWheeledModel(config)
|
return TwoWheeledModel(config)
|
||||||
|
elif args.env == "CartPole":
|
||||||
|
return CartPoleModel(config)
|
||||||
|
|
||||||
raise NotImplementedError("There is not {} Model".format(args.env))
|
raise NotImplementedError("There is not {} Model".format(args.env))
|
|
@ -211,3 +211,94 @@ class LinearModel(Model):
|
||||||
next_x = np.matmul(curr_x, self.A.T) + np.matmul(u, self.B.T)
|
next_x = np.matmul(curr_x, self.A.T) + np.matmul(u, self.B.T)
|
||||||
|
|
||||||
return next_x
|
return next_x
|
||||||
|
|
||||||
|
def calc_f_x(self, xs, us, dt):
|
||||||
|
""" gradient of model with respect to the state in batch form
|
||||||
|
|
||||||
|
Args:
|
||||||
|
xs (numpy.ndarray): state, shape(pred_len+1, state_size)
|
||||||
|
us (numpy.ndarray): input, shape(pred_len, input_size,)
|
||||||
|
Return:
|
||||||
|
f_x (numpy.ndarray): gradient of model with respect to x,
|
||||||
|
shape(pred_len, state_size, state_size)
|
||||||
|
Notes:
|
||||||
|
This should be discrete form !!
|
||||||
|
"""
|
||||||
|
# get size
|
||||||
|
(pred_len, _) = us.shape
|
||||||
|
|
||||||
|
return np.tile(self.A, (pred_len, 1, 1))
|
||||||
|
|
||||||
|
def calc_f_u(self, xs, us, dt):
|
||||||
|
""" gradient of model with respect to the input in batch form
|
||||||
|
|
||||||
|
Args:
|
||||||
|
xs (numpy.ndarray): state, shape(pred_len+1, state_size)
|
||||||
|
us (numpy.ndarray): input, shape(pred_len, input_size,)
|
||||||
|
Return:
|
||||||
|
f_u (numpy.ndarray): gradient of model with respect to x,
|
||||||
|
shape(pred_len, state_size, input_size)
|
||||||
|
Notes:
|
||||||
|
This should be discrete form !!
|
||||||
|
"""
|
||||||
|
# get size
|
||||||
|
(pred_len, input_size) = us.shape
|
||||||
|
|
||||||
|
return np.tile(self.B, (pred_len, 1, 1))
|
||||||
|
|
||||||
|
@staticmethod
|
||||||
|
def calc_f_xx(xs, us, dt):
|
||||||
|
""" hessian of model with respect to the state in batch form
|
||||||
|
|
||||||
|
Args:
|
||||||
|
xs (numpy.ndarray): state, shape(pred_len+1, state_size)
|
||||||
|
us (numpy.ndarray): input, shape(pred_len, input_size,)
|
||||||
|
Return:
|
||||||
|
f_xx (numpy.ndarray): gradient of model with respect to x,
|
||||||
|
shape(pred_len, state_size, state_size, state_size)
|
||||||
|
"""
|
||||||
|
# get size
|
||||||
|
(_, state_size) = xs.shape
|
||||||
|
(pred_len, _) = us.shape
|
||||||
|
|
||||||
|
f_xx = np.zeros((pred_len, state_size, state_size, state_size))
|
||||||
|
|
||||||
|
return f_xx
|
||||||
|
|
||||||
|
@staticmethod
|
||||||
|
def calc_f_ux(xs, us, dt):
|
||||||
|
""" hessian of model with respect to state and input in batch form
|
||||||
|
|
||||||
|
Args:
|
||||||
|
xs (numpy.ndarray): state, shape(pred_len+1, state_size)
|
||||||
|
us (numpy.ndarray): input, shape(pred_len, input_size,)
|
||||||
|
Return:
|
||||||
|
f_ux (numpy.ndarray): gradient of model with respect to x,
|
||||||
|
shape(pred_len, state_size, input_size, state_size)
|
||||||
|
"""
|
||||||
|
# get size
|
||||||
|
(_, state_size) = xs.shape
|
||||||
|
(pred_len, input_size) = us.shape
|
||||||
|
|
||||||
|
f_ux = np.zeros((pred_len, state_size, input_size, state_size))
|
||||||
|
|
||||||
|
return f_ux
|
||||||
|
|
||||||
|
@staticmethod
|
||||||
|
def calc_f_uu(xs, us, dt):
|
||||||
|
""" hessian of model with respect to input in batch form
|
||||||
|
|
||||||
|
Args:
|
||||||
|
xs (numpy.ndarray): state, shape(pred_len+1, state_size)
|
||||||
|
us (numpy.ndarray): input, shape(pred_len, input_size,)
|
||||||
|
Return:
|
||||||
|
f_uu (numpy.ndarray): gradient of model with respect to x,
|
||||||
|
shape(pred_len, state_size, input_size, input_size)
|
||||||
|
"""
|
||||||
|
# get size
|
||||||
|
(_, state_size) = xs.shape
|
||||||
|
(pred_len, input_size) = us.shape
|
||||||
|
|
||||||
|
f_uu = np.zeros((pred_len, state_size, input_size, input_size))
|
||||||
|
|
||||||
|
return f_uu
|
||||||
|
|
|
@ -3,6 +3,8 @@ import os
|
||||||
import numpy as np
|
import numpy as np
|
||||||
import matplotlib.pyplot as plt
|
import matplotlib.pyplot as plt
|
||||||
|
|
||||||
|
from ..helper import save_pickle, load_pickle
|
||||||
|
|
||||||
def plot_result(history, history_g=None, ylabel="x",
|
def plot_result(history, history_g=None, ylabel="x",
|
||||||
save_dir="./result", name="state_history"):
|
save_dir="./result", name="state_history"):
|
||||||
"""
|
"""
|
||||||
|
@ -47,14 +49,108 @@ def plot_result(history, history_g=None, ylabel="x",
|
||||||
|
|
||||||
def plot_results(args, history_x, history_u, history_g=None):
|
def plot_results(args, history_x, history_u, history_g=None):
|
||||||
"""
|
"""
|
||||||
|
|
||||||
Args:
|
Args:
|
||||||
history_x (numpy.ndarray): history of state, shape(iters, state_size)
|
history_x (numpy.ndarray): history of state, shape(iters, state_size)
|
||||||
history_u (numpy.ndarray): history of state, shape(iters, input_size)
|
history_u (numpy.ndarray): history of state, shape(iters, input_size)
|
||||||
Returns:
|
Returns:
|
||||||
|
None
|
||||||
"""
|
"""
|
||||||
plot_result(history_x, history_g=history_g, ylabel="x",
|
plot_result(history_x, history_g=history_g, ylabel="x",
|
||||||
name="state_history",
|
name= args.env + "-state_history",
|
||||||
save_dir="./result/" + args.controller_type)
|
save_dir="./result/" + args.controller_type)
|
||||||
plot_result(history_u, history_g=np.zeros_like(history_u), ylabel="u",
|
plot_result(history_u, history_g=np.zeros_like(history_u), ylabel="u",
|
||||||
name="input_history",
|
name= args.env + "-input_history",
|
||||||
save_dir="./result/" + args.controller_type)
|
save_dir="./result/" + args.controller_type)
|
||||||
|
|
||||||
|
def save_plot_data(args, history_x, history_u, history_g=None):
|
||||||
|
""" save plot data
|
||||||
|
|
||||||
|
Args:
|
||||||
|
history_x (numpy.ndarray): history of state, shape(iters, state_size)
|
||||||
|
history_u (numpy.ndarray): history of state, shape(iters, input_size)
|
||||||
|
Returns:
|
||||||
|
None
|
||||||
|
"""
|
||||||
|
path = os.path.join("./result/" + args.controller_type,
|
||||||
|
args.env + "-history_x.pkl")
|
||||||
|
save_pickle(path, history_x)
|
||||||
|
|
||||||
|
path = os.path.join("./result/" + args.controller_type,
|
||||||
|
args.env + "-history_u.pkl")
|
||||||
|
save_pickle(path, history_u)
|
||||||
|
|
||||||
|
path = os.path.join("./result/" + args.controller_type,
|
||||||
|
args.env + "-history_g.pkl")
|
||||||
|
save_pickle(path, history_g)
|
||||||
|
|
||||||
|
def load_plot_data(env, controller_type, result_dir="./result"):
|
||||||
|
"""
|
||||||
|
Args:
|
||||||
|
env (str): environments name
|
||||||
|
controller_type (str): controller type
|
||||||
|
result_dir (str): result directory
|
||||||
|
Returns:
|
||||||
|
history_x (numpy.ndarray): history of state, shape(iters, state_size)
|
||||||
|
history_u (numpy.ndarray): history of state, shape(iters, input_size)
|
||||||
|
history_g (numpy.ndarray): history of state, shape(iters, input_size)
|
||||||
|
"""
|
||||||
|
path = os.path.join("./result/" + controller_type,
|
||||||
|
env + "-history_x.pkl")
|
||||||
|
history_x = load_pickle(path)
|
||||||
|
|
||||||
|
path = os.path.join("./result/" + controller_type,
|
||||||
|
env + "-history_u.pkl")
|
||||||
|
history_u = load_pickle(path)
|
||||||
|
|
||||||
|
path = os.path.join("./result/" + controller_type,
|
||||||
|
env + "-history_g.pkl")
|
||||||
|
history_g = load_pickle(path)
|
||||||
|
|
||||||
|
return history_x, history_u, history_g
|
||||||
|
|
||||||
|
def plot_multi_result(histories, histories_g=None, labels=None, ylabel="x",
|
||||||
|
save_dir="./result", name="state_history"):
|
||||||
|
"""
|
||||||
|
Args:
|
||||||
|
history (numpy.ndarray): history, shape(iters, size)
|
||||||
|
"""
|
||||||
|
(_, iters, size) = histories.shape
|
||||||
|
|
||||||
|
for i in range(0, size, 2):
|
||||||
|
|
||||||
|
figure = plt.figure()
|
||||||
|
axis1 = figure.add_subplot(211)
|
||||||
|
axis2 = figure.add_subplot(212)
|
||||||
|
|
||||||
|
axis1.set_ylabel(ylabel + "_{}".format(i))
|
||||||
|
axis2.set_ylabel(ylabel + "_{}".format(i+1))
|
||||||
|
axis2.set_xlabel("time steps")
|
||||||
|
|
||||||
|
# gt
|
||||||
|
def plot(axis, history, history_g=None, label=""):
|
||||||
|
axis.plot(range(iters), history,
|
||||||
|
linewidth=3, label=label, alpha=0.7, linestyle="dashed")
|
||||||
|
if history_g is not None:
|
||||||
|
axis.plot(range(iters), history_g,\
|
||||||
|
c="b", linewidth=3)
|
||||||
|
|
||||||
|
if i < size:
|
||||||
|
for j, (history, history_g) \
|
||||||
|
in enumerate(zip(histories, histories_g)):
|
||||||
|
plot(axis1, history[:, i],
|
||||||
|
history_g=history_g[:, i], label=labels[j])
|
||||||
|
if i+1 < size:
|
||||||
|
for j, (history, history_g) in \
|
||||||
|
enumerate(zip(histories, histories_g)):
|
||||||
|
plot(axis2, history[:, i+1],
|
||||||
|
history_g=history_g[:, i+1], label=labels[j])
|
||||||
|
|
||||||
|
# save
|
||||||
|
if save_dir is not None:
|
||||||
|
path = os.path.join(save_dir, name + "-{}".format(i))
|
||||||
|
else:
|
||||||
|
path = name
|
||||||
|
|
||||||
|
axis1.legend(ncol=3, bbox_to_anchor=(0., 1.02, 1., 0.102), loc=3)
|
||||||
|
figure.savefig(path, bbox_inches="tight", pad_inches=0.05)
|
||||||
|
|
13
README.md
13
README.md
|
@ -14,7 +14,8 @@ PythonLinearNonLinearControl is a library implementing the linear and nonlinear
|
||||||
|:----------|:---------------: |:----------------:|:----------------:|:----------------:|:----------------:|
|
|:----------|:---------------: |:----------------:|:----------------:|:----------------:|:----------------:|
|
||||||
| Linear Model Predictive Control (MPC) | ✓ | x | x | x | x |
|
| Linear Model Predictive Control (MPC) | ✓ | x | x | x | x |
|
||||||
| Cross Entropy Method (CEM) | ✓ | ✓ | x | x | x |
|
| Cross Entropy Method (CEM) | ✓ | ✓ | x | x | x |
|
||||||
| Model Preidictive Path Integral Control (MPPI) | ✓ | ✓ | x | x | x |
|
| Model Preidictive Path Integral Control of Nagabandi, A. (MPPI) | ✓ | ✓ | x | x | x |
|
||||||
|
| Model Preidictive Path Integral Control of Williams, G. (MPPIWilliams) | ✓ | ✓ | x | x | x |
|
||||||
| Random Shooting Method (Random) | ✓ | ✓ | x | x | x |
|
| Random Shooting Method (Random) | ✓ | ✓ | x | x | x |
|
||||||
| Iterative LQR (iLQR) | x | ✓ | x | ✓ | x |
|
| Iterative LQR (iLQR) | x | ✓ | x | ✓ | x |
|
||||||
| Differential Dynamic Programming (DDP) | x | ✓ | x | ✓ | ✓ |
|
| Differential Dynamic Programming (DDP) | x | ✓ | x | ✓ | ✓ |
|
||||||
|
@ -33,9 +34,12 @@ Following algorithms are implemented in PythonLinearNonlinearControl
|
||||||
- [Cross Entropy Method (CEM)](https://arxiv.org/abs/1805.12114)
|
- [Cross Entropy Method (CEM)](https://arxiv.org/abs/1805.12114)
|
||||||
- Ref: Chua, K., Calandra, R., McAllister, R., & Levine, S. (2018). Deep reinforcement learning in a handful of trials using probabilistic dynamics models. In Advances in Neural Information Processing Systems (pp. 4754-4765)
|
- Ref: Chua, K., Calandra, R., McAllister, R., & Levine, S. (2018). Deep reinforcement learning in a handful of trials using probabilistic dynamics models. In Advances in Neural Information Processing Systems (pp. 4754-4765)
|
||||||
- [script](PythonLinearNonlinearControl/controllers/cem.py)
|
- [script](PythonLinearNonlinearControl/controllers/cem.py)
|
||||||
- [Model Preidictive Path Integral Control (MPPI)](https://arxiv.org/abs/1909.11652)
|
- [Model Preidictive Path Integral Control of Nagabandi, A. (MPPI)](https://arxiv.org/abs/1909.11652)
|
||||||
- Ref: Nagabandi, A., Konoglie, K., Levine, S., & Kumar, V. (2019). Deep Dynamics Models for Learning Dexterous Manipulation. arXiv preprint arXiv:1909.11652.
|
- Ref: Nagabandi, A., Konoglie, K., Levine, S., & Kumar, V. (2019). Deep Dynamics Models for Learning Dexterous Manipulation. arXiv preprint arXiv:1909.11652.
|
||||||
- [script](PythonLinearNonlinearControl/controllers/mppi.py)
|
- [script](PythonLinearNonlinearControl/controllers/mppi.py)
|
||||||
|
- [Model Preidictive Path Integral Control of Williams, G. (MPPIWilliams)](https://ieeexplore.ieee.org/abstract/document/7989202)
|
||||||
|
- Ref: Williams, G., Wagener, N., Goldfain, B., Drews, P., Rehg, J. M., Boots, B., & Theodorou, E. A. (2017, May). Information theoretic MPC for model-based reinforcement learning. In 2017 IEEE International Conference on Robotics and Automation (ICRA) (pp. 1714-1721). IEEE.
|
||||||
|
- [script](PythonLinearNonlinearControl/controllers/mppi_williams.py)
|
||||||
- [Random Shooting Method (Random)](https://arxiv.org/abs/1805.12114)
|
- [Random Shooting Method (Random)](https://arxiv.org/abs/1805.12114)
|
||||||
- Ref: Chua, K., Calandra, R., McAllister, R., & Levine, S. (2018). Deep reinforcement learning in a handful of trials using probabilistic dynamics models. In Advances in Neural Information Processing Systems (pp. 4754-4765)
|
- Ref: Chua, K., Calandra, R., McAllister, R., & Levine, S. (2018). Deep reinforcement learning in a handful of trials using probabilistic dynamics models. In Advances in Neural Information Processing Systems (pp. 4754-4765)
|
||||||
- [script](PythonLinearNonlinearControl/controllers/random.py)
|
- [script](PythonLinearNonlinearControl/controllers/random.py)
|
||||||
|
@ -62,10 +66,13 @@ Following algorithms are implemented in PythonLinearNonlinearControl
|
||||||
| First Order Lag System | ✓ | x | 4 | 2 |
|
| First Order Lag System | ✓ | x | 4 | 2 |
|
||||||
| Two wheeled System (Constant Goal) | x | ✓ | 3 | 2 |
|
| Two wheeled System (Constant Goal) | x | ✓ | 3 | 2 |
|
||||||
| Two wheeled System (Moving Goal) (Coming soon) | x | ✓ | 3 | 2 |
|
| Two wheeled System (Moving Goal) (Coming soon) | x | ✓ | 3 | 2 |
|
||||||
|
| Cartpole (Swing up) | x | ✓ | 4 | 1 |
|
||||||
|
|
||||||
All environments are continuous.
|
All states and inputs of environments are continuous.
|
||||||
**It should be noted that the algorithms for linear model could be applied to nonlinear enviroments if you have linealized the model of nonlinear environments.**
|
**It should be noted that the algorithms for linear model could be applied to nonlinear enviroments if you have linealized the model of nonlinear environments.**
|
||||||
|
|
||||||
|
You could know abount our environmets more in [Environments.md](Environments.md)
|
||||||
|
|
||||||
# Usage
|
# Usage
|
||||||
|
|
||||||
## To install this package
|
## To install this package
|
||||||
|
|
Binary file not shown.
After Width: | Height: | Size: 60 KiB |
Binary file not shown.
After Width: | Height: | Size: 23 KiB |
Binary file not shown.
After Width: | Height: | Size: 37 KiB |
Binary file not shown.
After Width: | Height: | Size: 22 KiB |
Binary file not shown.
After Width: | Height: | Size: 27 KiB |
|
@ -0,0 +1,55 @@
|
||||||
|
import os
|
||||||
|
|
||||||
|
import argparse
|
||||||
|
import pickle
|
||||||
|
import numpy as np
|
||||||
|
import matplotlib.pyplot as plt
|
||||||
|
|
||||||
|
from PythonLinearNonlinearControl.plotters.plot_func import load_plot_data, \
|
||||||
|
plot_multi_result
|
||||||
|
|
||||||
|
def run(args):
|
||||||
|
|
||||||
|
controllers = ["iLQR", "DDP", "CEM", "MPPI"]
|
||||||
|
|
||||||
|
history_xs = None
|
||||||
|
history_us = None
|
||||||
|
history_gs = None
|
||||||
|
|
||||||
|
# load data
|
||||||
|
for controller in controllers:
|
||||||
|
history_x, history_u, history_g = \
|
||||||
|
load_plot_data(args.env, controller,
|
||||||
|
result_dir=args.result_dir)
|
||||||
|
|
||||||
|
if history_xs is None:
|
||||||
|
history_xs = history_x[np.newaxis, :]
|
||||||
|
history_us = history_u[np.newaxis, :]
|
||||||
|
history_gs = history_g[np.newaxis, :]
|
||||||
|
continue
|
||||||
|
|
||||||
|
history_xs = np.concatenate((history_xs,
|
||||||
|
history_x[np.newaxis, :]), axis=0)
|
||||||
|
history_us = np.concatenate((history_us,
|
||||||
|
history_u[np.newaxis, :]), axis=0)
|
||||||
|
history_gs = np.concatenate((history_gs,
|
||||||
|
history_g[np.newaxis, :]), axis=0)
|
||||||
|
|
||||||
|
plot_multi_result(history_xs, histories_g=history_gs, labels=controllers,
|
||||||
|
ylabel="x")
|
||||||
|
|
||||||
|
plot_multi_result(history_us, histories_g=np.zeros_like(history_us),
|
||||||
|
labels=controllers, ylabel="u", name="input_history")
|
||||||
|
|
||||||
|
def main():
|
||||||
|
parser = argparse.ArgumentParser()
|
||||||
|
|
||||||
|
parser.add_argument("--env", type=str, default="FirstOrderLag")
|
||||||
|
parser.add_argument("--result_dir", type=str, default="./result")
|
||||||
|
|
||||||
|
args = parser.parse_args()
|
||||||
|
|
||||||
|
run(args)
|
||||||
|
|
||||||
|
if __name__ == "__main__":
|
||||||
|
main()
|
|
@ -7,7 +7,8 @@ from PythonLinearNonlinearControl.configs.make_configs import make_config
|
||||||
from PythonLinearNonlinearControl.models.make_models import make_model
|
from PythonLinearNonlinearControl.models.make_models import make_model
|
||||||
from PythonLinearNonlinearControl.envs.make_envs import make_env
|
from PythonLinearNonlinearControl.envs.make_envs import make_env
|
||||||
from PythonLinearNonlinearControl.runners.make_runners import make_runner
|
from PythonLinearNonlinearControl.runners.make_runners import make_runner
|
||||||
from PythonLinearNonlinearControl.plotters.plot_func import plot_results
|
from PythonLinearNonlinearControl.plotters.plot_func import plot_results, \
|
||||||
|
save_plot_data
|
||||||
|
|
||||||
def run(args):
|
def run(args):
|
||||||
# logger
|
# logger
|
||||||
|
@ -36,11 +37,12 @@ def run(args):
|
||||||
|
|
||||||
# plot results
|
# plot results
|
||||||
plot_results(args, history_x, history_u, history_g=history_g)
|
plot_results(args, history_x, history_u, history_g=history_g)
|
||||||
|
save_plot_data(args, history_x, history_u, history_g=history_g)
|
||||||
|
|
||||||
def main():
|
def main():
|
||||||
parser = argparse.ArgumentParser()
|
parser = argparse.ArgumentParser()
|
||||||
|
|
||||||
parser.add_argument("--controller_type", type=str, default="DDP")
|
parser.add_argument("--controller_type", type=str, default="CEM")
|
||||||
parser.add_argument("--planner_type", type=str, default="const")
|
parser.add_argument("--planner_type", type=str, default="const")
|
||||||
parser.add_argument("--env", type=str, default="TwoWheeledConst")
|
parser.add_argument("--env", type=str, default="TwoWheeledConst")
|
||||||
parser.add_argument("--result_dir", type=str, default="./result")
|
parser.add_argument("--result_dir", type=str, default="./result")
|
||||||
|
|
|
@ -0,0 +1,31 @@
|
||||||
|
import pytest
|
||||||
|
import numpy as np
|
||||||
|
|
||||||
|
from PythonLinearNonlinearControl.configs.cartpole \
|
||||||
|
import CartPoleConfigModule
|
||||||
|
|
||||||
|
class TestCalcCost():
|
||||||
|
def test_calc_costs(self):
|
||||||
|
# make config
|
||||||
|
config = CartPoleConfigModule()
|
||||||
|
# set
|
||||||
|
pred_len = 5
|
||||||
|
state_size = 4
|
||||||
|
input_size = 1
|
||||||
|
pop_size = 2
|
||||||
|
pred_xs = np.ones((pop_size, pred_len, state_size))
|
||||||
|
g_xs = np.ones((pop_size, pred_len, state_size)) * 0.5
|
||||||
|
input_samples = np.ones((pop_size, pred_len, input_size)) * 0.5
|
||||||
|
|
||||||
|
costs = config.input_cost_fn(input_samples)
|
||||||
|
|
||||||
|
assert costs.shape == (pop_size, pred_len, input_size)
|
||||||
|
|
||||||
|
costs = config.state_cost_fn(pred_xs, g_xs)
|
||||||
|
|
||||||
|
assert costs.shape == (pop_size, pred_len, 1)
|
||||||
|
|
||||||
|
costs = config.terminal_state_cost_fn(pred_xs[:, -1, :],\
|
||||||
|
g_xs[:, -1, :])
|
||||||
|
|
||||||
|
assert costs.shape == (pop_size, 1)
|
|
@ -0,0 +1,34 @@
|
||||||
|
import pytest
|
||||||
|
import numpy as np
|
||||||
|
|
||||||
|
from PythonLinearNonlinearControl.configs.two_wheeled \
|
||||||
|
import TwoWheeledConfigModule
|
||||||
|
|
||||||
|
class TestCalcCost():
|
||||||
|
def test_calc_costs(self):
|
||||||
|
# make config
|
||||||
|
config = TwoWheeledConfigModule()
|
||||||
|
# set
|
||||||
|
pred_len = 5
|
||||||
|
state_size = 3
|
||||||
|
input_size = 2
|
||||||
|
pop_size = 2
|
||||||
|
pred_xs = np.ones((pop_size, pred_len, state_size))
|
||||||
|
g_xs = np.ones((pop_size, pred_len, state_size)) * 0.5
|
||||||
|
input_samples = np.ones((pop_size, pred_len, input_size)) * 0.5
|
||||||
|
|
||||||
|
costs = config.input_cost_fn(input_samples)
|
||||||
|
expected_costs = np.ones((pop_size, pred_len, input_size))*0.5
|
||||||
|
|
||||||
|
assert costs == pytest.approx(expected_costs**2 * np.diag(config.R))
|
||||||
|
|
||||||
|
costs = config.state_cost_fn(pred_xs, g_xs)
|
||||||
|
expected_costs = np.ones((pop_size, pred_len, state_size))*0.5
|
||||||
|
|
||||||
|
assert costs == pytest.approx(expected_costs**2 * np.diag(config.Q))
|
||||||
|
|
||||||
|
costs = config.terminal_state_cost_fn(pred_xs[:, -1, :],\
|
||||||
|
g_xs[:, -1, :])
|
||||||
|
expected_costs = np.ones((pop_size, state_size))*0.5
|
||||||
|
|
||||||
|
assert costs == pytest.approx(expected_costs**2 * np.diag(config.Sf))
|
|
@ -0,0 +1,73 @@
|
||||||
|
import pytest
|
||||||
|
import numpy as np
|
||||||
|
|
||||||
|
from PythonLinearNonlinearControl.envs.cartpole import CartPoleEnv
|
||||||
|
|
||||||
|
class TestCartPoleEnv():
|
||||||
|
"""
|
||||||
|
"""
|
||||||
|
def test_step(self):
|
||||||
|
env = CartPoleEnv()
|
||||||
|
|
||||||
|
curr_x = np.ones(4)
|
||||||
|
curr_x[2] = np.pi / 6.
|
||||||
|
|
||||||
|
env.reset(init_x=curr_x)
|
||||||
|
|
||||||
|
u = np.ones(1)
|
||||||
|
|
||||||
|
next_x, _, _, _ = env.step(u)
|
||||||
|
|
||||||
|
d_x0 = curr_x[1]
|
||||||
|
d_x1 = (1. + env.config["mp"] * np.sin(np.pi / 6.) \
|
||||||
|
* (env.config["l"] * (1.**2) \
|
||||||
|
+ env.config["g"] * np.cos(np.pi / 6.))) \
|
||||||
|
/ (env.config["mc"] + env.config["mp"] * np.sin(np.pi / 6.)**2)
|
||||||
|
d_x2 = curr_x[3]
|
||||||
|
d_x3 = (-1. * np.cos(np.pi / 6.) \
|
||||||
|
- env.config["mp"] * env.config["l"] * (1.**2) \
|
||||||
|
* np.cos(np.pi / 6.) * np.sin(np.pi / 6.) \
|
||||||
|
- (env.config["mp"] + env.config["mc"]) * env.config["g"] \
|
||||||
|
* np.sin(np.pi / 6.)) \
|
||||||
|
/ (env.config["l"] \
|
||||||
|
* (env.config["mc"] \
|
||||||
|
+ env.config["mp"] * np.sin(np.pi / 6.)**2))
|
||||||
|
|
||||||
|
expected = np.array([d_x0, d_x1, d_x2, d_x3]) * env.config["dt"] \
|
||||||
|
+ curr_x
|
||||||
|
|
||||||
|
assert next_x == pytest.approx(expected, abs=1e-5)
|
||||||
|
|
||||||
|
def test_bound_step(self):
|
||||||
|
env = CartPoleEnv()
|
||||||
|
|
||||||
|
curr_x = np.ones(4)
|
||||||
|
curr_x[2] = np.pi / 6.
|
||||||
|
|
||||||
|
env.reset(init_x=curr_x)
|
||||||
|
|
||||||
|
u = np.ones(1) * 1e3
|
||||||
|
|
||||||
|
next_x, _, _, _ = env.step(u)
|
||||||
|
|
||||||
|
u = env.config["input_upper_bound"][0]
|
||||||
|
|
||||||
|
d_x0 = curr_x[1]
|
||||||
|
d_x1 = (u + env.config["mp"] * np.sin(np.pi / 6.) \
|
||||||
|
* (env.config["l"] * (1.**2) \
|
||||||
|
+ env.config["g"] * np.cos(np.pi / 6.))) \
|
||||||
|
/ (env.config["mc"] + env.config["mp"] * np.sin(np.pi / 6.)**2)
|
||||||
|
d_x2 = curr_x[3]
|
||||||
|
d_x3 = (-u * np.cos(np.pi / 6.) \
|
||||||
|
- env.config["mp"] * env.config["l"] * (1.**2) \
|
||||||
|
* np.cos(np.pi / 6.) * np.sin(np.pi / 6.) \
|
||||||
|
- (env.config["mp"] + env.config["mc"]) * env.config["g"] \
|
||||||
|
* np.sin(np.pi / 6.)) \
|
||||||
|
/ (env.config["l"] \
|
||||||
|
* (env.config["mc"] \
|
||||||
|
+ env.config["mp"] * np.sin(np.pi / 6.)**2))
|
||||||
|
|
||||||
|
expected = np.array([d_x0, d_x1, d_x2, d_x3]) * env.config["dt"] \
|
||||||
|
+ curr_x
|
||||||
|
|
||||||
|
assert next_x == pytest.approx(expected, abs=1e-5)
|
|
@ -0,0 +1,57 @@
|
||||||
|
import pytest
|
||||||
|
import numpy as np
|
||||||
|
|
||||||
|
from PythonLinearNonlinearControl.models.cartpole import CartPoleModel
|
||||||
|
from PythonLinearNonlinearControl.configs.cartpole \
|
||||||
|
import CartPoleConfigModule
|
||||||
|
|
||||||
|
class TestCartPoleModel():
|
||||||
|
"""
|
||||||
|
"""
|
||||||
|
def test_step(self):
|
||||||
|
config = CartPoleConfigModule()
|
||||||
|
cartpole_model = CartPoleModel(config)
|
||||||
|
|
||||||
|
curr_x = np.ones(4)
|
||||||
|
curr_x[2] = np.pi / 6.
|
||||||
|
|
||||||
|
us = np.ones((1, 1))
|
||||||
|
|
||||||
|
next_x = cartpole_model.predict_traj(curr_x, us)
|
||||||
|
|
||||||
|
d_x0 = curr_x[1]
|
||||||
|
d_x1 = (1. + config.MP * np.sin(np.pi / 6.) \
|
||||||
|
* (config.L * (1.**2) \
|
||||||
|
+ config.G * np.cos(np.pi / 6.))) \
|
||||||
|
/ (config.MC + config.MP * np.sin(np.pi / 6.)**2)
|
||||||
|
d_x2 = curr_x[3]
|
||||||
|
d_x3 = (-1. * np.cos(np.pi / 6.) \
|
||||||
|
- config.MP * config.L * (1.**2) \
|
||||||
|
* np.cos(np.pi / 6.) * np.sin(np.pi / 6.) \
|
||||||
|
- (config.MP + config.MC) * config.G \
|
||||||
|
* np.sin(np.pi / 6.)) \
|
||||||
|
/ (config.L \
|
||||||
|
* (config.MC \
|
||||||
|
+ config.MP * np.sin(np.pi / 6.)**2))
|
||||||
|
|
||||||
|
expected = np.array([d_x0, d_x1, d_x2, d_x3]) * config.DT \
|
||||||
|
+ curr_x
|
||||||
|
|
||||||
|
expected = np.stack((curr_x, expected), axis=0)
|
||||||
|
|
||||||
|
assert next_x == pytest.approx(expected, abs=1e-5)
|
||||||
|
|
||||||
|
def test_predict_traj(self):
|
||||||
|
config = CartPoleConfigModule()
|
||||||
|
cartpole_model = CartPoleModel(config)
|
||||||
|
|
||||||
|
curr_x = np.ones(config.STATE_SIZE)
|
||||||
|
curr_x[-1] = np.pi / 6.
|
||||||
|
u = np.ones((1, config.INPUT_SIZE))
|
||||||
|
|
||||||
|
pred_xs = cartpole_model.predict_traj(curr_x, u)
|
||||||
|
|
||||||
|
u = np.tile(u, (2, 1, 1))
|
||||||
|
pred_xs_alltogether = cartpole_model.predict_traj(curr_x, u)[0]
|
||||||
|
|
||||||
|
assert pred_xs_alltogether == pytest.approx(pred_xs)
|
|
@ -0,0 +1,43 @@
|
||||||
|
import pytest
|
||||||
|
import numpy as np
|
||||||
|
|
||||||
|
from PythonLinearNonlinearControl.models.model \
|
||||||
|
import LinearModel
|
||||||
|
from PythonLinearNonlinearControl.models.first_order_lag \
|
||||||
|
import FirstOrderLagModel
|
||||||
|
from PythonLinearNonlinearControl.configs.first_order_lag \
|
||||||
|
import FirstOrderLagConfigModule
|
||||||
|
|
||||||
|
from unittest.mock import patch
|
||||||
|
from unittest.mock import Mock
|
||||||
|
|
||||||
|
class TestFirstOrderLagModel():
|
||||||
|
"""
|
||||||
|
"""
|
||||||
|
def test_step(self):
|
||||||
|
config = FirstOrderLagConfigModule()
|
||||||
|
firstorderlag_model = FirstOrderLagModel(config)
|
||||||
|
|
||||||
|
curr_x = np.ones(config.STATE_SIZE)
|
||||||
|
u = np.ones((1, config.INPUT_SIZE))
|
||||||
|
|
||||||
|
with patch.object(LinearModel, "predict_traj") as mock_predict_traj:
|
||||||
|
firstorderlag_model.predict_traj(curr_x, u)
|
||||||
|
|
||||||
|
mock_predict_traj.assert_called_once_with(curr_x, u)
|
||||||
|
|
||||||
|
def test_predict_traj(self):
|
||||||
|
|
||||||
|
config = FirstOrderLagConfigModule()
|
||||||
|
firstorderlag_model = FirstOrderLagModel(config)
|
||||||
|
|
||||||
|
curr_x = np.ones(config.STATE_SIZE)
|
||||||
|
curr_x[-1] = np.pi / 6.
|
||||||
|
u = np.ones((1, config.INPUT_SIZE))
|
||||||
|
|
||||||
|
pred_xs = firstorderlag_model.predict_traj(curr_x, u)
|
||||||
|
|
||||||
|
u = np.tile(u, (1, 1, 1))
|
||||||
|
pred_xs_alltogether = firstorderlag_model.predict_traj(curr_x, u)[0]
|
||||||
|
|
||||||
|
assert pred_xs_alltogether == pytest.approx(pred_xs)
|
Loading…
Reference in New Issue