ADD: added iLQR and DDP

This commit is contained in:
Shunichi09 2020-04-05 15:40:48 +09:00
parent abb4d75fc2
commit d574d82c79
15 changed files with 1127 additions and 41 deletions

1
.gitignore vendored
View File

@ -109,7 +109,6 @@ celerybeat.pid
# Environments # Environments
.env .env
.venv .venv
env/
venv/ venv/
ENV/ ENV/
env.bak/ env.bak/

View File

@ -43,22 +43,19 @@ class FirstOrderLagConfigModule():
"kappa": 0.9, "kappa": 0.9,
"noise_sigma": 0.5, "noise_sigma": 0.5,
}, },
"iLQR":{ "MPC":{
}, }
"cgmres-NMPC":{
},
"newton-NMPC":{
},
} }
@staticmethod @staticmethod
def input_cost_fn(u): def input_cost_fn(u):
""" input cost functions """ input cost functions
Args: Args:
u (numpy.ndarray): input, shape(input_size, ) u (numpy.ndarray): input, shape(pred_len, input_size)
or shape(pop_size, input_size) or shape(pop_size, pred_len, input_size)
Returns: Returns:
cost (numpy.ndarray): cost of input, none or shape(pop_size, ) cost (numpy.ndarray): cost of input, shape(pred_len, input_size) or
shape(pop_size, pred_len, input_size)
""" """
return (u**2) * np.diag(FirstOrderLagConfigModule.R) return (u**2) * np.diag(FirstOrderLagConfigModule.R)
@ -67,11 +64,12 @@ class FirstOrderLagConfigModule():
""" state cost function """ state cost function
Args: Args:
x (numpy.ndarray): state, shape(pred_len, state_size) x (numpy.ndarray): state, shape(pred_len, state_size)
or shape(pop_size, pred_len, state_size) or shape(pop_size, pred_len, state_size)
g_x (numpy.ndarray): goal state, shape(state_size, ) g_x (numpy.ndarray): goal state, shape(pred_len, state_size)
or shape(pop_size, state_size) or shape(pop_size, pred_len, state_size)
Returns: Returns:
cost (numpy.ndarray): cost of state, none or shape(pop_size, ) cost (numpy.ndarray): cost of state, shape(pred_len, state_size) or
shape(pop_size, pred_len, state_size)
""" """
return ((x - g_x)**2) * np.diag(FirstOrderLagConfigModule.Q) return ((x - g_x)**2) * np.diag(FirstOrderLagConfigModule.Q)
@ -84,7 +82,8 @@ class FirstOrderLagConfigModule():
terminal_g_x (numpy.ndarray): terminal goal state, terminal_g_x (numpy.ndarray): terminal goal state,
shape(state_size, ) or shape(pop_size, state_size) shape(state_size, ) or shape(pop_size, state_size)
Returns: Returns:
cost (numpy.ndarray): cost of state, none or shape(pop_size, ) cost (numpy.ndarray): cost of state, shape(pred_len, ) or
shape(pop_size, pred_len)
""" """
return ((terminal_x - terminal_g_x)**2) \ return ((terminal_x - terminal_g_x)**2) \
* np.diag(FirstOrderLagConfigModule.Sf) * np.diag(FirstOrderLagConfigModule.Sf)

View File

@ -5,13 +5,13 @@ class TwoWheeledConfigModule():
ENV_NAME = "TwoWheeled-v0" ENV_NAME = "TwoWheeled-v0"
TYPE = "Nonlinear" TYPE = "Nonlinear"
TASK_HORIZON = 1000 TASK_HORIZON = 1000
PRED_LEN = 10 PRED_LEN = 20
STATE_SIZE = 3 STATE_SIZE = 3
INPUT_SIZE = 2 INPUT_SIZE = 2
DT = 0.01 DT = 0.01
# cost parameters # cost parameters
R = np.eye(INPUT_SIZE) R = np.eye(INPUT_SIZE) * 0.1
Q = np.eye(STATE_SIZE) Q = np.eye(STATE_SIZE) * 0.5
Sf = np.eye(STATE_SIZE) Sf = np.eye(STATE_SIZE)
# bounds # bounds
INPUT_LOWER_BOUND = np.array([-1.5, 3.14]) INPUT_LOWER_BOUND = np.array([-1.5, 3.14])
@ -40,34 +40,50 @@ class TwoWheeledConfigModule():
"noise_sigma": 0.5, "noise_sigma": 0.5,
}, },
"iLQR":{ "iLQR":{
"max_iter": 500,
"mu": 1.,
"mu_min": 1e-6,
"mu_max": 1e10,
"init_delta": 2.,
"threshold": 1e-6,
},
"DDP":{
"max_iter": 500,
"mu": 1.,
"mu_min": 1e-6,
"mu_max": 1e10,
"init_delta": 2.,
"threshold": 1e-6,
}, },
"NMPC-CGMRES":{ "NMPC-CGMRES":{
}, },
"NMPC-Newton":{ "NMPC-Newton":{
}, },
} }
@staticmethod @staticmethod
def input_cost_fn(u): def input_cost_fn(u):
""" input cost functions """ input cost functions
Args: Args:
u (numpy.ndarray): input, shape(input_size, ) u (numpy.ndarray): input, shape(pred_len, input_size)
or shape(pop_size, input_size) or shape(pop_size, pred_len, input_size)
Returns: Returns:
cost (numpy.ndarray): cost of input, none or shape(pop_size, ) cost (numpy.ndarray): cost of input, shape(pred_len, input_size) or
shape(pop_size, pred_len, input_size)
""" """
return (u**2) * np.diag(TwoWheeledConfigModule.R) * 0.1 return (u**2) * np.diag(TwoWheeledConfigModule.R)
@staticmethod @staticmethod
def state_cost_fn(x, g_x): def state_cost_fn(x, g_x):
""" state cost function """ state cost function
Args: Args:
x (numpy.ndarray): state, shape(pred_len, state_size) x (numpy.ndarray): state, shape(pred_len, state_size)
or shape(pop_size, pred_len, state_size) or shape(pop_size, pred_len, state_size)
g_x (numpy.ndarray): goal state, shape(state_size, ) g_x (numpy.ndarray): goal state, shape(pred_len, state_size)
or shape(pop_size, state_size) or shape(pop_size, pred_len, state_size)
Returns: Returns:
cost (numpy.ndarray): cost of state, none or shape(pop_size, ) cost (numpy.ndarray): cost of state, shape(pred_len, state_size) or
shape(pop_size, pred_len, state_size)
""" """
return ((x - g_x)**2) * np.diag(TwoWheeledConfigModule.Q) return ((x - g_x)**2) * np.diag(TwoWheeledConfigModule.Q)
@ -80,7 +96,93 @@ class TwoWheeledConfigModule():
terminal_g_x (numpy.ndarray): terminal goal state, terminal_g_x (numpy.ndarray): terminal goal state,
shape(state_size, ) or shape(pop_size, state_size) shape(state_size, ) or shape(pop_size, state_size)
Returns: Returns:
cost (numpy.ndarray): cost of state, none or shape(pop_size, ) cost (numpy.ndarray): cost of state, shape(pred_len, ) or
shape(pop_size, pred_len)
""" """
return ((terminal_x - terminal_g_x)**2) \ return ((terminal_x - terminal_g_x)**2) \
* np.diag(TwoWheeledConfigModule.Sf) * np.diag(TwoWheeledConfigModule.Sf)
@staticmethod
def gradient_cost_fn_with_state(x, g_x, terminal=False):
""" gradient of costs with respect to the state
Args:
x (numpy.ndarray): state, shape(pred_len, state_size)
g_x (numpy.ndarray): goal state, shape(pred_len, state_size)
Returns:
l_x (numpy.ndarray): gradient of cost, shape(pred_len, state_size)
or shape(1, state_size)
"""
if not terminal:
return 2. * (x - g_x) * np.diag(TwoWheeledConfigModule.Q)
return (2. * (x - g_x) \
* np.diag(TwoWheeledConfigModule.Sf))[np.newaxis, :]
@staticmethod
def gradient_cost_fn_with_input(x, u):
""" gradient of costs with respect to the input
Args:
x (numpy.ndarray): state, shape(pred_len, state_size)
u (numpy.ndarray): goal state, shape(pred_len, input_size)
Returns:
l_u (numpy.ndarray): gradient of cost, shape(pred_len, input_size)
"""
return 2. * u * np.diag(TwoWheeledConfigModule.R)
@staticmethod
def hessian_cost_fn_with_state(x, g_x, terminal=False):
""" hessian costs with respect to the state
Args:
x (numpy.ndarray): state, shape(pred_len, state_size)
g_x (numpy.ndarray): goal state, shape(pred_len, state_size)
Returns:
l_xx (numpy.ndarray): gradient of cost,
shape(pred_len, state_size, state_size) or
shape(1, state_size, state_size) or
"""
if not terminal:
(pred_len, _) = x.shape
return -g_x[:, :, np.newaxis] \
* np.tile(2.*TwoWheeledConfigModule.Q, (pred_len, 1, 1))
return -g_x[:, np.newaxis] \
* np.tile(2.*TwoWheeledConfigModule.Sf, (1, 1, 1))
@staticmethod
def hessian_cost_fn_with_input(x, u):
""" hessian costs with respect to the input
Args:
x (numpy.ndarray): state, shape(pred_len, state_size)
u (numpy.ndarray): goal state, shape(pred_len, input_size)
Returns:
l_uu (numpy.ndarray): gradient of cost,
shape(pred_len, input_size, input_size)
"""
(pred_len, _) = u.shape
return np.tile(2.*TwoWheeledConfigModule.R, (pred_len, 1, 1))
@staticmethod
def hessian_cost_fn_with_input_state(x, u):
""" hessian costs with respect to the state and input
Args:
x (numpy.ndarray): state, shape(pred_len, state_size)
u (numpy.ndarray): goal state, shape(pred_len, input_size)
Returns:
l_ux (numpy.ndarray): gradient of cost ,
shape(pred_len, input_size, state_size)
"""
(_, state_size) = x.shape
(pred_len, input_size) = u.shape
return np.zeros((pred_len, input_size, state_size))

View File

@ -24,7 +24,8 @@ class Controller():
Returns: Returns:
opt_input (numpy.ndarray): optimal input, shape(input_size, ) opt_input (numpy.ndarray): optimal input, shape(input_size, )
""" """
raise NotImplementedError("Implement gradient of hamitonian with respect to the state") raise NotImplementedError("Implement the algorithm to \
get optimal input")
def calc_cost(self, curr_x, samples, g_xs): def calc_cost(self, curr_x, samples, g_xs):
""" calculate the cost of input samples """ calculate the cost of input samples

View File

@ -0,0 +1,403 @@
from logging import getLogger
import numpy as np
import scipy.stats as stats
from .controller import Controller
from ..envs.cost import calc_cost
logger = getLogger(__name__)
class DDP(Controller):
""" Differential Dynamic Programming
Ref:
Tassa, Y., Erez, T., & Todorov, E. (2012). . In 2012 IEEE/RSJ International Conference on
Intelligent Robots and Systems (pp. 4906-4913). and Study Wolf,
https://github.com/studywolf/control
"""
def __init__(self, config, model):
"""
"""
super(DDP, self).__init__(config, model)
if config.TYPE != "Nonlinear":
raise ValueError("{} could be not applied to \
this controller".format(model))
# model
self.model = model
# get cost func
self.state_cost_fn = config.state_cost_fn
self.terminal_state_cost_fn = config.terminal_state_cost_fn
self.input_cost_fn = config.input_cost_fn
self.gradient_cost_fn_with_state = config.gradient_cost_fn_with_state
self.gradient_cost_fn_with_input = config.gradient_cost_fn_with_input
self.hessian_cost_fn_with_state = config.hessian_cost_fn_with_state
self.hessian_cost_fn_with_input = config.hessian_cost_fn_with_input
self.hessian_cost_fn_with_input_state = \
config.hessian_cost_fn_with_input_state
# controller parameters
self.max_iter = config.opt_config["DDP"]["max_iter"]
self.mu = config.opt_config["DDP"]["mu"]
self.mu_min = config.opt_config["DDP"]["mu_min"]
self.mu_max = config.opt_config["DDP"]["mu_max"]
self.init_delta = config.opt_config["DDP"]["init_delta"]
self.delta = self.init_delta
self.threshold = config.opt_config["DDP"]["threshold"]
# general parameters
self.pred_len = config.PRED_LEN
self.input_size = config.INPUT_SIZE
self.dt = config.DT
# cost parameters
self.Q = config.Q
self.R = config.R
self.Sf = config.Sf
# initialize
self.prev_sol = np.zeros((self.pred_len, self.input_size))
def clear_sol(self):
""" clear prev sol
"""
logger.debug("Clear Sol")
self.prev_sol = np.zeros((self.pred_len, self.input_size))
def obtain_sol(self, curr_x, g_xs):
""" calculate the optimal inputs
Args:
curr_x (numpy.ndarray): current state, shape(state_size, )
g_xs (numpy.ndarrya): goal trajectory, shape(plan_len, state_size)
Returns:
opt_input (numpy.ndarray): optimal input, shape(input_size, )
"""
# initialize
opt_count = 0
sol = self.prev_sol.copy()
converged_sol = False
update_sol = True
# line search param
alphas = 1.1**(-np.arange(10)**2)
while opt_count < self.max_iter:
accepted_sol = False
# forward
if update_sol == True:
pred_xs, cost, f_x, f_u, f_xx, f_ux, f_uu,\
l_x, l_xx, l_u, l_uu, l_ux = \
self.forward(curr_x, g_xs, sol)
update_sol = False
try:
# backward
k, K = self.backward(f_x, f_u, f_xx, f_ux, f_uu, \
l_x, l_xx, l_u, l_uu, l_ux)
# line search
for alpha in alphas:
new_pred_xs, new_sol = \
self.calc_input(k, K, pred_xs, sol, alpha)
new_cost = calc_cost(new_pred_xs[np.newaxis, :, :],
new_sol[np.newaxis, :, :],
g_xs[np.newaxis, :, :],
self.state_cost_fn,
self.input_cost_fn,
self.terminal_state_cost_fn)
if new_cost < cost:
if np.abs((cost - new_cost) / cost) < self.threshold:
converged_sol = True
cost = new_cost
pred_xs = new_pred_xs
sol = new_sol
update_sol = True
# decrease regularization term
self.delta = min(1.0, self.delta) / self.init_delta
self.mu *= self.delta
if self.mu <= self.mu_min:
self.mu = 0.0
# accept the solution
accepted_sol = True
break
except np.linalg.LinAlgError as e:
logger.debug("Non ans : {}".format(e))
if not accepted_sol:
# increase regularization term.
self.delta = max(1.0, self.delta) * self.init_delta
self.mu = max(self.mu_min, self.mu * self.delta)
logger.debug("Update regularization term to {}"\
.format(self.mu))
if self.mu >= self.mu_max:
logger.debug("Reach Max regularization term")
break
if converged_sol:
logger.debug("Get converged sol")
break
opt_count += 1
# update prev sol
self.prev_sol[:-1] = sol[1:]
self.prev_sol[-1] = sol[-1] # last use the terminal input
return sol[0]
def calc_input(self, k, K, pred_xs, sol, alpha):
""" calc input trajectory by using k and K
Args:
k (numpy.ndarray): gain, shape(pred_len, input_size)
K (numpy.ndarray): gain, shape(pred_len, input_size, state_size)
pred_xs (numpy.ndarray): predicted state,
shape(pred_len+1, state_size)
sol (numpy.ndarray): input trajectory, previous solutions
shape(pred_len, input_size)
alpha (float): param of line search
Returns:
new_pred_xs (numpy.ndarray): update state trajectory,
shape(pred_len+1, state_size)
new_sol (numpy.ndarray): update input trajectory,
shape(pred_len, input_size)
"""
# get size
(pred_len, input_size, state_size) = K.shape
# initialize
new_pred_xs = np.zeros((pred_len+1, state_size))
new_pred_xs[0] = pred_xs[0].copy() # init state is same
new_sol = np.zeros((pred_len, input_size))
for t in range(pred_len):
new_sol[t] = sol[t] \
+ alpha * k[t] \
+ np.dot(K[t], (new_pred_xs[t] - pred_xs[t]))
new_pred_xs[t+1] = self.model.predict_next_state(new_pred_xs[t],
new_sol[t])
return new_pred_xs, new_sol
def forward(self, curr_x, g_xs, sol):
""" forward step of iLQR
Args:
curr_x (numpy.ndarray): current state, shape(state_size, )
g_xs (numpy.ndarrya): goal trajectory, shape(plan_len, state_size)
sol (numpy.ndarray): solutions, shape(plan_len, input_size)
Returns:
f_x (numpy.ndarray): gradient of model with respecto to state,
shape(pred_len, state_size, state_size)
f_u (numpy.ndarray): gradient of model with respecto to input,
shape(pred_len, state_size, input_size)
f_xx (numpy.ndarray): gradient of model with respecto to state,
shape(pred_len+1, state_size, state_size, state_size)
f_ux (numpy.ndarray): gradient of model with respecto to input,
shape(pred_len, state_size, input_size, state_size)
f_uu (numpy.ndarray): gradient of model with respecto to input,
shape(pred_len, state_size, input_size, input_size)
l_x (numpy.ndarray): gradient of cost with respecto to state,
shape(pred_len+1, state_size)
l_u (numpy.ndarray): gradient of cost with respecto to input,
shape(pred_len, input_size)
l_xx (numpy.ndarray): hessian of cost with respecto to state,
shape(pred_len+1, state_size, state_size)
l_uu (numpy.ndarray): hessian of cost with respecto to input,
shape(pred_len+1, input_size, input_size)
l_ux (numpy.ndarray): hessian of cost with respect
to state and input, shape(pred_len, input_size, state_size)
"""
# simulate forward using the current control trajectory
pred_xs = self.model.predict_traj(curr_x, sol)
# check costs
cost = self.calc_cost(curr_x,
sol[np.newaxis, :, :],
g_xs)
# calc gradinet in batch
f_x = self.model.calc_f_x(pred_xs[:-1], sol, self.dt)
f_u = self.model.calc_f_u(pred_xs[:-1], sol, self.dt)
# calc hessian in batch
f_xx = self.model.calc_f_xx(pred_xs[:-1], sol, self.dt)
f_ux = self.model.calc_f_ux(pred_xs[:-1], sol, self.dt)
f_uu = self.model.calc_f_uu(pred_xs[:-1], sol, self.dt)
# gradint of costs
l_x, l_xx, l_u, l_uu, l_ux = \
self._calc_gradient_hessian_cost(pred_xs, g_xs, sol)
return pred_xs, cost, f_x, f_u, f_xx, f_ux, f_uu, \
l_x, l_xx, l_u, l_uu, l_ux
def _calc_gradient_hessian_cost(self, pred_xs, g_x, sol):
""" calculate gradient and hessian of model and cost fn
Args:
pred_xs (numpy.ndarray): predict traj,
shape(pred_len+1, state_size)
sol (numpy.ndarray): input traj,
shape(pred_len, input_size)
Returns
l_x (numpy.ndarray): gradient of cost,
shape(pred_len+1, state_size)
l_u (numpy.ndarray): gradient of cost,
shape(pred_len, input_size)
l_xx (numpy.ndarray): hessian of cost,
shape(pred_len+1, state_size, state_size)
l_uu (numpy.ndarray): hessian of cost,
shape(pred_len+1, input_size, input_size)
l_ux (numpy.ndarray): hessian of cost,
shape(pred_len, input_size, state_size)
"""
# l_x.shape = (pred_len+1, state_size)
l_x = self.gradient_cost_fn_with_state(pred_xs[:-1],
g_x[:-1], terminal=False)
terminal_l_x = \
self.gradient_cost_fn_with_state(pred_xs[-1],
g_x[-1], terminal=True)
l_x = np.concatenate((l_x, terminal_l_x), axis=0)
# l_u.shape = (pred_len, input_size)
l_u = self.gradient_cost_fn_with_input(pred_xs[:-1], sol)
# l_xx.shape = (pred_len+1, state_size, state_size)
l_xx = self.hessian_cost_fn_with_state(pred_xs[:-1],
g_x[:-1], terminal=False)
terminal_l_xx = \
self.hessian_cost_fn_with_state(pred_xs[-1],
g_x[-1], terminal=True)
l_xx = np.concatenate((l_xx, terminal_l_xx), axis=0)
# l_uu.shape = (pred_len, input_size, input_size)
l_uu = self.hessian_cost_fn_with_input(pred_xs[:-1], sol)
# l_ux.shape = (pred_len, input_size, state_size)
l_ux = self.hessian_cost_fn_with_input_state(pred_xs[:-1], sol)
return l_x, l_xx, l_u, l_uu, l_ux
def backward(self, f_x, f_u, f_xx, f_ux, f_uu, l_x, l_xx, l_u, l_uu, l_ux):
""" backward step of iLQR
Args:
f_x (numpy.ndarray): gradient of model with respecto to state,
shape(pred_len+1, state_size, state_size)
f_u (numpy.ndarray): gradient of model with respecto to input,
shape(pred_len, state_size, input_size)
f_xx (numpy.ndarray): gradient of model with respecto to state,
shape(pred_len+1, state_size, state_size, state_size)
f_ux (numpy.ndarray): gradient of model with respecto to input,
shape(pred_len, state_size, input_size, state_size)
f_uu (numpy.ndarray): gradient of model with respecto to input,
shape(pred_len, state_size, input_size, input_size)
l_x (numpy.ndarray): gradient of cost with respecto to state,
shape(pred_len+1, state_size)
l_u (numpy.ndarray): gradient of cost with respecto to input,
shape(pred_len, input_size)
l_xx (numpy.ndarray): hessian of cost with respecto to state,
shape(pred_len+1, state_size, state_size)
l_uu (numpy.ndarray): hessian of cost with respecto to input,
shape(pred_len, input_size, input_size)
l_ux (numpy.ndarray): hessian of cost with respect
to state and input, shape(pred_len, input_size, state_size)
Returns:
k (numpy.ndarray): gain, shape(pred_len, input_size)
K (numpy.ndarray): gain, shape(pred_len, input_size, state_size)
"""
# get size
(_, state_size, _) = f_x.shape
# initialzie
V_x = l_x[-1]
V_xx = l_xx[-1]
k = np.zeros((self.pred_len, self.input_size))
K = np.zeros((self.pred_len, self.input_size, state_size))
for t in range(self.pred_len-1, -1, -1):
# get Q val
Q_x, Q_u, Q_xx, Q_ux, Q_uu = self._Q(f_x[t], f_u[t],
f_xx[t], f_ux[t], f_uu[t],
l_x[t],
l_u[t], l_xx[t], l_ux[t],
l_uu[t], V_x, V_xx)
# calc gain
k[t] = - np.linalg.solve(Q_uu, Q_u)
K[t] = - np.linalg.solve(Q_uu, Q_ux)
# update V_x val
V_x = Q_x + np.dot(np.dot(K[t].T, Q_uu), k[t])
V_x += np.dot(K[t].T, Q_u) + np.dot(Q_ux.T, k[t])
# update V_xx val
V_xx = Q_xx + np.dot(np.dot(K[t].T, Q_uu), K[t])
V_xx += np.dot(K[t].T, Q_ux) + np.dot(Q_ux.T, K[t])
V_xx = 0.5 * (V_xx + V_xx.T) # to maintain symmetry.
return k, K
def _Q(self, f_x, f_u, f_xx, f_ux, f_uu,
l_x, l_u, l_xx, l_ux, l_uu, V_x, V_xx):
"""Computes second order expansion.
Args:
f_x (numpy.ndarray): gradient of model with respecto to state,
shape(state_size, state_size)
f_u (numpy.ndarray): gradient of model with respecto to input,
shape(state_size, input_size)
f_xx (numpy.ndarray): gradient of model with respecto to state,
shape(state_size, state_size, state_size)
f_ux (numpy.ndarray): gradient of model with respecto to input,
shape(state_size, input_size, state_size)
f_uu (numpy.ndarray): gradient of model with respecto to input,
shape(state_size, input_size, input_size)
l_x (numpy.ndarray): gradient of cost with respecto to state,
shape(state_size, )
l_u (numpy.ndarray): gradient of cost with respecto to input,
shape(input_size, )
l_xx (numpy.ndarray): hessian of cost with respecto to state,
shape(state_size, state_size)
l_uu (numpy.ndarray): hessian of cost with respecto to input,
shape(input_size, input_size)
l_ux (numpy.ndarray): hessian of cost with respect
to state and input, shape(input_size, state_size)
V_x (numpy.ndarray): gradient of Value function,
shape(state_size, )
V_xx (numpy.ndarray): hessian of Value function,
shape(state_size, state_size)
Returns:
Q_x (numpy.ndarray): gradient of Q function, shape(state_size, )
Q_u (numpy.ndarray): gradient of Q function, shape(input_size, )
Q_xx (numpy.ndarray): hessian of Q fucntion,
shape(state_size, state_size)
Q_ux (numpy.ndarray): hessian of Q fucntion,
shape(input_size, state_size)
Q_uu (numpy.ndarray): hessian of Q fucntion,
shape(input_size, input_size)
"""
# get size
state_size = len(l_x)
Q_x = l_x + np.dot(f_x.T, V_x)
Q_u = l_u + np.dot(f_u.T, V_x)
Q_xx = l_xx + np.dot(np.dot(f_x.T, V_xx), f_x)
reg = self.mu * np.eye(state_size)
Q_ux = l_ux + np.dot(np.dot(f_u.T, (V_xx + reg)), f_x)
Q_uu = l_uu + np.dot(np.dot(f_u.T, (V_xx + reg)), f_u)
# tensor constraction
Q_xx += np.tensordot(V_x, f_xx, axes=1)
Q_ux += np.tensordot(V_x, f_ux, axes=1)
Q_uu += np.tensordot(V_x, f_uu, axes=1)
return Q_x, Q_u, Q_xx, Q_ux, Q_uu

View File

@ -10,6 +10,11 @@ logger = getLogger(__name__)
class iLQR(Controller): class iLQR(Controller):
""" iterative Liner Quadratique Regulator """ iterative Liner Quadratique Regulator
Ref:
Tassa, Y., Erez, T., & Todorov, E. (2012). . In 2012 IEEE/RSJ International Conference on
Intelligent Robots and Systems (pp. 4906-4913). and Study Wolf,
https://github.com/studywolf/control
""" """
def __init__(self, config, model): def __init__(self, config, model):
""" """
@ -20,5 +25,346 @@ class iLQR(Controller):
raise ValueError("{} could be not applied to \ raise ValueError("{} could be not applied to \
this controller".format(model)) this controller".format(model))
# model
self.model = model self.model = model
# get cost func
self.state_cost_fn = config.state_cost_fn
self.terminal_state_cost_fn = config.terminal_state_cost_fn
self.input_cost_fn = config.input_cost_fn
self.gradient_cost_fn_with_state = config.gradient_cost_fn_with_state
self.gradient_cost_fn_with_input = config.gradient_cost_fn_with_input
self.hessian_cost_fn_with_state = config.hessian_cost_fn_with_state
self.hessian_cost_fn_with_input = config.hessian_cost_fn_with_input
self.hessian_cost_fn_with_input_state = \
config.hessian_cost_fn_with_input_state
# controller parameters
self.max_iter = config.opt_config["iLQR"]["max_iter"]
self.mu = config.opt_config["iLQR"]["mu"]
self.mu_min = config.opt_config["iLQR"]["mu_min"]
self.mu_max = config.opt_config["iLQR"]["mu_max"]
self.init_delta = config.opt_config["iLQR"]["init_delta"]
self.delta = self.init_delta
self.threshold = config.opt_config["iLQR"]["threshold"]
# general parameters
self.pred_len = config.PRED_LEN
self.input_size = config.INPUT_SIZE
self.dt = config.DT
# cost parameters
self.Q = config.Q
self.R = config.R
self.Sf = config.Sf
# initialize
self.prev_sol = np.zeros((self.pred_len, self.input_size))
def clear_sol(self):
""" clear prev sol
"""
logger.debug("Clear Sol")
self.prev_sol = np.zeros((self.pred_len, self.input_size))
def obtain_sol(self, curr_x, g_xs):
""" calculate the optimal inputs
Args:
curr_x (numpy.ndarray): current state, shape(state_size, )
g_xs (numpy.ndarrya): goal trajectory, shape(plan_len, state_size)
Returns:
opt_input (numpy.ndarray): optimal input, shape(input_size, )
"""
# initialize
opt_count = 0
sol = self.prev_sol.copy()
converged_sol = False
update_sol = True
# line search param
alphas = 1.1**(-np.arange(10)**2)
while opt_count < self.max_iter:
accepted_sol = False
# forward
if update_sol == True:
pred_xs, cost, f_x, f_u, l_x, l_xx, l_u, l_uu, l_ux = \
self.forward(curr_x, g_xs, sol)
update_sol = False
try:
# backward
k, K = self.backward(f_x, f_u, l_x, l_xx, l_u, l_uu, l_ux)
# line search
for alpha in alphas:
new_pred_xs, new_sol = \
self.calc_input(k, K, pred_xs, sol, alpha)
new_cost = calc_cost(new_pred_xs[np.newaxis, :, :],
new_sol[np.newaxis, :, :],
g_xs[np.newaxis, :, :],
self.state_cost_fn,
self.input_cost_fn,
self.terminal_state_cost_fn)
if new_cost < cost:
if np.abs((cost - new_cost) / cost) < self.threshold:
converged_sol = True
cost = new_cost
pred_xs = new_pred_xs
sol = new_sol
update_sol = True
# decrease regularization term
self.delta = min(1.0, self.delta) / self.init_delta
self.mu *= self.delta
if self.mu <= self.mu_min:
self.mu = 0.0
# accept the solution
accepted_sol = True
break
except np.linalg.LinAlgError as e:
logger.debug("Non ans : {}".format(e))
if not accepted_sol:
# increase regularization term.
self.delta = max(1.0, self.delta) * self.init_delta
self.mu = max(self.mu_min, self.mu * self.delta)
logger.debug("Update regularization term to {}"\
.format(self.mu))
if self.mu >= self.mu_max:
logger.debug("Reach Max regularization term")
break
if converged_sol:
logger.debug("Get converged sol")
break
opt_count += 1
# update prev sol
self.prev_sol[:-1] = sol[1:]
self.prev_sol[-1] = sol[-1] # last use the terminal input
return sol[0]
def calc_input(self, k, K, pred_xs, sol, alpha):
""" calc input trajectory by using k and K
Args:
k (numpy.ndarray): gain, shape(pred_len, input_size)
K (numpy.ndarray): gain, shape(pred_len, input_size, state_size)
pred_xs (numpy.ndarray): predicted state,
shape(pred_len+1, state_size)
sol (numpy.ndarray): input trajectory, previous solutions
shape(pred_len, input_size)
alpha (float): param of line search
Returns:
new_pred_xs (numpy.ndarray): update state trajectory,
shape(pred_len+1, state_size)
new_sol (numpy.ndarray): update input trajectory,
shape(pred_len, input_size)
"""
# get size
(pred_len, input_size, state_size) = K.shape
# initialize
new_pred_xs = np.zeros((pred_len+1, state_size))
new_pred_xs[0] = pred_xs[0].copy() # init state is same
new_sol = np.zeros((pred_len, input_size))
for t in range(pred_len):
new_sol[t] = sol[t] \
+ alpha * k[t] \
+ np.dot(K[t], (new_pred_xs[t] - pred_xs[t]))
new_pred_xs[t+1] = self.model.predict_next_state(new_pred_xs[t],
new_sol[t])
return new_pred_xs, new_sol
def forward(self, curr_x, g_xs, sol):
""" forward step of iLQR
Args:
curr_x (numpy.ndarray): current state, shape(state_size, )
g_xs (numpy.ndarrya): goal trajectory, shape(plan_len, state_size)
sol (numpy.ndarray): solutions, shape(plan_len, input_size)
Returns:
f_x (numpy.ndarray): gradient of model with respecto to state,
shape(pred_len, state_size, state_size)
f_u (numpy.ndarray): gradient of model with respecto to input,
shape(pred_len, state_size, input_size)
l_x (numpy.ndarray): gradient of cost with respecto to state,
shape(pred_len+1, state_size)
l_u (numpy.ndarray): gradient of cost with respecto to input,
shape(pred_len, input_size)
l_xx (numpy.ndarray): hessian of cost with respecto to state,
shape(pred_len+1, state_size, state_size)
l_uu (numpy.ndarray): hessian of cost with respecto to input,
shape(pred_len+1, input_size, input_size)
l_ux (numpy.ndarray): hessian of cost with respect
to state and input, shape(pred_len, input_size, state_size)
"""
# simulate forward using the current control trajectory
pred_xs = self.model.predict_traj(curr_x, sol)
# check costs
cost = self.calc_cost(curr_x,
sol[np.newaxis, :, :],
g_xs)
# calc gradinet in batch
f_x = self.model.calc_f_x(pred_xs[:-1], sol, self.dt)
f_u = self.model.calc_f_u(pred_xs[:-1], sol, self.dt)
# gradint of costs
l_x, l_xx, l_u, l_uu, l_ux = \
self._calc_gradient_hessian_cost(pred_xs, g_xs, sol)
return pred_xs, cost, f_x, f_u, l_x, l_xx, l_u, l_uu, l_ux
def _calc_gradient_hessian_cost(self, pred_xs, g_x, sol):
""" calculate gradient and hessian of model and cost fn
Args:
pred_xs (numpy.ndarray): predict traj,
shape(pred_len+1, state_size)
sol (numpy.ndarray): input traj,
shape(pred_len, input_size)
Returns
l_x (numpy.ndarray): gradient of cost,
shape(pred_len+1, state_size)
l_u (numpy.ndarray): gradient of cost,
shape(pred_len, input_size)
l_xx (numpy.ndarray): hessian of cost,
shape(pred_len+1, state_size, state_size)
l_uu (numpy.ndarray): hessian of cost,
shape(pred_len+1, input_size, input_size)
l_ux (numpy.ndarray): hessian of cost,
shape(pred_len, input_size, state_size)
"""
# l_x.shape = (pred_len+1, state_size)
l_x = self.gradient_cost_fn_with_state(pred_xs[:-1],
g_x[:-1], terminal=False)
terminal_l_x = \
self.gradient_cost_fn_with_state(pred_xs[-1],
g_x[-1], terminal=True)
l_x = np.concatenate((l_x, terminal_l_x), axis=0)
# l_u.shape = (pred_len, input_size)
l_u = self.gradient_cost_fn_with_input(pred_xs[:-1], sol)
# l_xx.shape = (pred_len+1, state_size, state_size)
l_xx = self.hessian_cost_fn_with_state(pred_xs[:-1],
g_x[:-1], terminal=False)
terminal_l_xx = \
self.hessian_cost_fn_with_state(pred_xs[-1],
g_x[-1], terminal=True)
l_xx = np.concatenate((l_xx, terminal_l_xx), axis=0)
# l_uu.shape = (pred_len, input_size, input_size)
l_uu = self.hessian_cost_fn_with_input(pred_xs[:-1], sol)
# l_ux.shape = (pred_len, input_size, state_size)
l_ux = self.hessian_cost_fn_with_input_state(pred_xs[:-1], sol)
return l_x, l_xx, l_u, l_uu, l_ux
def backward(self, f_x, f_u, l_x, l_xx, l_u, l_uu, l_ux):
""" backward step of iLQR
Args:
f_x (numpy.ndarray): gradient of model with respecto to state,
shape(pred_len+1, state_size, state_size)
f_u (numpy.ndarray): gradient of model with respecto to input,
shape(pred_len, state_size, input_size)
l_x (numpy.ndarray): gradient of cost with respecto to state,
shape(pred_len+1, state_size)
l_u (numpy.ndarray): gradient of cost with respecto to input,
shape(pred_len, input_size)
l_xx (numpy.ndarray): hessian of cost with respecto to state,
shape(pred_len+1, state_size, state_size)
l_uu (numpy.ndarray): hessian of cost with respecto to input,
shape(pred_len, input_size, input_size)
l_ux (numpy.ndarray): hessian of cost with respect
to state and input, shape(pred_len, input_size, state_size)
Returns:
k (numpy.ndarray): gain, shape(pred_len, input_size)
K (numpy.ndarray): gain, shape(pred_len, input_size, state_size)
"""
# get size
(_, state_size, _) = f_x.shape
# initialzie
V_x = l_x[-1]
V_xx = l_xx[-1]
k = np.zeros((self.pred_len, self.input_size))
K = np.zeros((self.pred_len, self.input_size, state_size))
for t in range(self.pred_len-1, -1, -1):
# get Q val
Q_x, Q_u, Q_xx, Q_ux, Q_uu = self._Q(f_x[t], f_u[t], l_x[t],
l_u[t], l_xx[t], l_ux[t],
l_uu[t], V_x, V_xx)
# calc gain
k[t] = - np.linalg.solve(Q_uu, Q_u)
K[t] = - np.linalg.solve(Q_uu, Q_ux)
# update V_x val
V_x = Q_x + np.dot(np.dot(K[t].T, Q_uu), k[t])
V_x += np.dot(K[t].T, Q_u) + np.dot(Q_ux.T, k[t])
# update V_xx val
V_xx = Q_xx + np.dot(np.dot(K[t].T, Q_uu), K[t])
V_xx += np.dot(K[t].T, Q_ux) + np.dot(Q_ux.T, K[t])
V_xx = 0.5 * (V_xx + V_xx.T) # to maintain symmetry.
return k, K
def _Q(self, f_x, f_u, l_x, l_u, l_xx, l_ux, l_uu, V_x, V_xx):
"""Computes second order expansion.
Args:
f_x (numpy.ndarray): gradient of model with respecto to state,
shape(state_size, state_size)
f_u (numpy.ndarray): gradient of model with respecto to input,
shape(state_size, input_size)
l_x (numpy.ndarray): gradient of cost with respecto to state,
shape(state_size, )
l_u (numpy.ndarray): gradient of cost with respecto to input,
shape(input_size, )
l_xx (numpy.ndarray): hessian of cost with respecto to state,
shape(state_size, state_size)
l_uu (numpy.ndarray): hessian of cost with respecto to input,
shape(input_size, input_size)
l_ux (numpy.ndarray): hessian of cost with respect
to state and input, shape(input_size, state_size)
V_x (numpy.ndarray): gradient of Value function,
shape(state_size, )
V_xx (numpy.ndarray): hessian of Value function,
shape(state_size, state_size)
Returns:
Q_x (numpy.ndarray): gradient of Q function, shape(state_size, )
Q_u (numpy.ndarray): gradient of Q function, shape(input_size, )
Q_xx (numpy.ndarray): hessian of Q fucntion,
shape(state_size, state_size)
Q_ux (numpy.ndarray): hessian of Q fucntion,
shape(input_size, state_size)
Q_uu (numpy.ndarray): hessian of Q fucntion,
shape(input_size, input_size)
"""
# get size
state_size = len(l_x)
Q_x = l_x + np.dot(f_x.T, V_x)
Q_u = l_u + np.dot(f_u.T, V_x)
Q_xx = l_xx + np.dot(np.dot(f_x.T, V_xx), f_x)
reg = self.mu * np.eye(state_size)
Q_ux = l_ux + np.dot(np.dot(f_u.T, (V_xx + reg)), f_x)
Q_uu = l_uu + np.dot(np.dot(f_u.T, (V_xx + reg)), f_u)
return Q_x, Q_u, Q_xx, Q_ux, Q_uu

View File

@ -3,6 +3,7 @@ from .cem import CEM
from .random import RandomShooting from .random import RandomShooting
from .mppi import MPPI from .mppi import MPPI
from .ilqr import iLQR from .ilqr import iLQR
from .ddp import DDP
def make_controller(args, config, model): def make_controller(args, config, model):
@ -15,4 +16,6 @@ def make_controller(args, config, model):
elif args.controller_type == "MPPI": elif args.controller_type == "MPPI":
return MPPI(config, model) return MPPI(config, model)
elif args.controller_type == "iLQR": elif args.controller_type == "iLQR":
return iLQR(config, model)
elif args.controller_type == "DDP":
return iLQR(config, model) return iLQR(config, model)

View File

@ -91,7 +91,7 @@ class FirstOrderLagEnv(Env):
# clip action # clip action
u = np.clip(u, u = np.clip(u,
self.config["input_lower_bound"], self.config["input_lower_bound"],
self.config["input_lower_bound"]) self.config["input_upper_bound"])
next_x = np.matmul(self.A, self.curr_x[:, np.newaxis]) \ next_x = np.matmul(self.A, self.curr_x[:, np.newaxis]) \
+ np.matmul(self.B, u[:, np.newaxis]) + np.matmul(self.B, u[:, np.newaxis])

View File

@ -140,18 +140,41 @@ class Model():
""" """
raise NotImplementedError("Implement terminal adjoint state") raise NotImplementedError("Implement terminal adjoint state")
def gradient_x(self, x, u): @staticmethod
""" gradient of model with respect to the state def calc_f_x(xs, us, dt):
""" gradient of model with respect to the state in batch form
""" """
raise NotImplementedError("Implement gradient of model \ raise NotImplementedError("Implement gradient of model \
with respect to the state") with respect to the state")
def gradient_u(self, x, u): @staticmethod
""" gradient of model with respect to the input def calc_f_u(xs, us, dt):
""" gradient of model with respect to the input in batch form
""" """
raise NotImplementedError("Implement gradient of model \ raise NotImplementedError("Implement gradient of model \
with respect to the input") with respect to the input")
@staticmethod
def calc_f_xx(xs, us, dt):
""" hessian of model with respect to the state in batch form
"""
raise NotImplementedError("Implement hessian of model \
with respect to the state")
@staticmethod
def calc_f_ux(xs, us, dt):
""" hessian of model with respect to the input in batch form
"""
raise NotImplementedError("Implement hessian of model \
with respect to the input and state")
@staticmethod
def calc_f_uu(xs, us, dt):
""" hessian of model with respect to the state in batch form
"""
raise NotImplementedError("Implement hessian of model \
with respect to the input")
class LinearModel(Model): class LinearModel(Model):
""" discrete linear model, x[k+1] = Ax[k] + Bu[k] """ discrete linear model, x[k+1] = Ax[k] + Bu[k]

View File

@ -50,4 +50,118 @@ class TwoWheeledModel(Model):
next_x = x_dot[:, :, 0] * self.dt + curr_x next_x = x_dot[:, :, 0] * self.dt + curr_x
return next_x return next_x
@staticmethod
def calc_f_x(xs, us, dt):
""" gradient of model with respect to the state in batch form
Args:
xs (numpy.ndarray): state, shape(pred_len+1, state_size)
us (numpy.ndarray): input, shape(pred_len, input_size,)
Return:
f_x (numpy.ndarray): gradient of model with respect to x,
shape(pred_len, state_size, state_size)
Notes:
This should be discrete form !!
"""
# get size
(_, state_size) = xs.shape
(pred_len, _) = us.shape
f_x = np.zeros((pred_len, state_size, state_size))
f_x[:, 0, 2] = -np.sin(xs[:, 2]) * us[:, 0]
f_x[:, 1, 2] = np.cos(xs[:, 2]) * us[:, 0]
return f_x * dt + np.eye(state_size) # to discrete form
@staticmethod
def calc_f_u(xs, us, dt):
""" gradient of model with respect to the input in batch form
Args:
xs (numpy.ndarray): state, shape(pred_len+1, state_size)
us (numpy.ndarray): input, shape(pred_len, input_size,)
Return:
f_u (numpy.ndarray): gradient of model with respect to x,
shape(pred_len, state_size, input_size)
Notes:
This should be discrete form !!
"""
# get size
(_, state_size) = xs.shape
(pred_len, input_size) = us.shape
f_u = np.zeros((pred_len, state_size, input_size))
f_u[:, 0, 0] = np.cos(xs[:, 2])
f_u[:, 1, 0] = np.sin(xs[:, 2])
f_u[:, 2, 1] = 1.
return f_u * dt # to discrete form
@staticmethod
def calc_f_xx(xs, us, dt):
""" hessian of model with respect to the state in batch form
Args:
xs (numpy.ndarray): state, shape(pred_len+1, state_size)
us (numpy.ndarray): input, shape(pred_len, input_size,)
Return:
f_xx (numpy.ndarray): gradient of model with respect to x,
shape(pred_len, state_size, state_size, state_size)
"""
# get size
(_, state_size) = xs.shape
(pred_len, _) = us.shape
f_xx = np.zeros((pred_len, state_size, state_size, state_size))
f_xx[:, 0, 2, 2] = -np.cos(xs[:, 2]) * us[:, 0]
f_xx[:, 1, 2, 2] = -np.sin(xs[:, 2]) * us[:, 0]
return f_xx
@staticmethod
def calc_f_ux(xs, us, dt):
""" hessian of model with respect to state and input in batch form
Args:
xs (numpy.ndarray): state, shape(pred_len+1, state_size)
us (numpy.ndarray): input, shape(pred_len, input_size,)
Return:
f_ux (numpy.ndarray): gradient of model with respect to x,
shape(pred_len, state_size, input_size, state_size)
"""
# get size
(_, state_size) = xs.shape
(pred_len, input_size) = us.shape
f_ux = np.zeros((pred_len, state_size, input_size, state_size))
f_ux[:, 0, 0, 2] = -np.sin(xs[:, 2])
f_ux[:, 1, 0, 2] = np.cos(xs[:, 2])
return f_ux
@staticmethod
def calc_f_uu(xs, us, dt):
""" hessian of model with respect to input in batch form
Args:
xs (numpy.ndarray): state, shape(pred_len+1, state_size)
us (numpy.ndarray): input, shape(pred_len, input_size,)
Return:
f_uu (numpy.ndarray): gradient of model with respect to x,
shape(pred_len, state_size, input_size, input_size)
"""
# get size
(_, state_size) = xs.shape
(pred_len, input_size) = us.shape
f_uu = np.zeros((pred_len, state_size, input_size, input_size))
return f_uu

View File

@ -39,8 +39,11 @@ Following algorithms are implemented in PythonLinearNonlinearControl
- Ref: Chua, K., Calandra, R., McAllister, R., & Levine, S. (2018). Deep reinforcement learning in a handful of trials using probabilistic dynamics models. In Advances in Neural Information Processing Systems (pp. 4754-4765) - Ref: Chua, K., Calandra, R., McAllister, R., & Levine, S. (2018). Deep reinforcement learning in a handful of trials using probabilistic dynamics models. In Advances in Neural Information Processing Systems (pp. 4754-4765)
- [script](PythonLinearNonlinearControl/controllers/random.py) - [script](PythonLinearNonlinearControl/controllers/random.py)
- [Iterative LQR (iLQR)](https://ieeexplore.ieee.org/document/6386025) - [Iterative LQR (iLQR)](https://ieeexplore.ieee.org/document/6386025)
- Ref: Tassa, Y., Erez, T., & Todorov, E. (2012, October). Synthesis and stabilization of complex behaviors through online trajectory optimization. In 2012 IEEE/RSJ International Conference on Intelligent Robots and Systems (pp. 4906-4913). IEEE. and [Study Wolf](https://github.com/studywolf/control) - Ref: Tassa, Y., Erez, T., & Todorov, E. (2012, October). Synthesis and stabilization of complex behaviors through online trajectory optimization. In 2012 IEEE/RSJ International Conference on Intelligent Robots and Systems (pp. 4906-4913). IEEE. and [Study Wolf](https://github.com/studywolf/control), https://github.com/anassinator/ilqr
- [script (Coming soon)]() - [script](PythonLinearNonlinearControl/controllers/ilqr.py)
- [Dynamic Differential Programing (DDP)](https://ieeexplore.ieee.org/document/6386025)
- Ref: Tassa, Y., Erez, T., & Todorov, E. (2012, October). Synthesis and stabilization of complex behaviors through online trajectory optimization. In 2012 IEEE/RSJ International Conference on Intelligent Robots and Systems (pp. 4906-4913). IEEE. and [Study Wolf](https://github.com/studywolf/control), https://github.com/anassinator/ilqr
- [script](PythonLinearNonlinearControl/controllers/ddp.py)
- [Unconstrained Nonlinear Model Predictive Control (NMPC)](https://www.sciencedirect.com/science/article/pii/S0005109897000058) - [Unconstrained Nonlinear Model Predictive Control (NMPC)](https://www.sciencedirect.com/science/article/pii/S0005109897000058)
- Ref: Ohtsuka, T., & Fujii, H. A. (1997). Real-time optimization algorithm for nonlinear receding-horizon control. Automatica, 33(6), 1147-1154. - Ref: Ohtsuka, T., & Fujii, H. A. (1997). Real-time optimization algorithm for nonlinear receding-horizon control. Automatica, 33(6), 1147-1154.
- [script (Coming soon)]() - [script (Coming soon)]()
@ -93,7 +96,7 @@ pip install -e .
You can run the experiments as follows: You can run the experiments as follows:
``` ```
python scripts/simple_run.py --model first-order_lag --controller CEM python scripts/simple_run.py --env first-order_lag --controller CEM
``` ```
**figures and animations are saved in the ./result folder.** **figures and animations are saved in the ./result folder.**

View File

@ -40,7 +40,7 @@ def run(args):
def main(): def main():
parser = argparse.ArgumentParser() parser = argparse.ArgumentParser()
parser.add_argument("--controller_type", type=str, default="CEM") parser.add_argument("--controller_type", type=str, default="DDP")
parser.add_argument("--planner_type", type=str, default="const") parser.add_argument("--planner_type", type=str, default="const")
parser.add_argument("--env", type=str, default="TwoWheeledConst") parser.add_argument("--env", type=str, default="TwoWheeledConst")
parser.add_argument("--result_dir", type=str, default="./result") parser.add_argument("--result_dir", type=str, default="./result")

0
tests/env/__init__.py vendored Normal file
View File

43
tests/env/test_first_orger_lag.py vendored Normal file
View File

@ -0,0 +1,43 @@
import pytest
import numpy as np
from PythonLinearNonlinearControl.envs.first_order_lag import FirstOrderLagEnv
class TestFirstOrderLagEnv():
def test_step(self):
env = FirstOrderLagEnv()
curr_x = np.ones(4)
env.reset(init_x=curr_x)
u = np.ones(2) * 0.1
next_x, _, _, _ = env.step(u)
dx = np.dot(env.A, curr_x[:, np.newaxis])
du = np.dot(env.B, u[:, np.newaxis])
expected = (dx + du).flatten()
assert next_x == pytest.approx(expected, abs=1e-5)
def test_bound_step(self):
env = FirstOrderLagEnv()
curr_x = np.ones(4)
env.reset(init_x=curr_x)
u = np.ones(2) * 1e5
next_x, _, _, _ = env.step(u)
dx = np.dot(env.A, curr_x[:, np.newaxis])
du = np.dot(env.B,
np.array(env.config["input_upper_bound"])[:, np.newaxis])
expected = (dx + du).flatten()
assert next_x == pytest.approx(expected, abs=1e-5)

50
tests/env/test_two_wheeled.py vendored Normal file
View File

@ -0,0 +1,50 @@
import pytest
import numpy as np
from PythonLinearNonlinearControl.envs.two_wheeled import TwoWheeledConstEnv
class TestTwoWheeledEnv():
"""
"""
def test_step(self):
env = TwoWheeledConstEnv()
curr_x = np.ones(3)
curr_x[-1] = np.pi / 6.
env.reset(init_x=curr_x)
u = np.ones(2)
next_x, _, _, _ = env.step(u)
pos_x = np.cos(curr_x[-1]) * u[0] * env.config["dt"] + curr_x[0]
pos_y = np.sin(curr_x[-1]) * u[0] * env.config["dt"] + curr_x[1]
expected = np.array([pos_x, pos_y,\
curr_x[-1] + u[1] * env.config["dt"]])
assert next_x == pytest.approx(expected)
def test_bound_step(self):
env = TwoWheeledConstEnv()
curr_x = np.ones(3)
curr_x[-1] = np.pi / 6.
env.reset(init_x=curr_x)
u = np.ones(2) * 1e3
next_x, _, _, _ = env.step(u)
pos_x = np.cos(curr_x[-1]) * env.config["input_upper_bound"][0] \
* env.config["dt"] + curr_x[0]
pos_y = np.sin(curr_x[-1]) * env.config["input_upper_bound"][0] \
* env.config["dt"] + curr_x[1]
expected = np.array([pos_x, pos_y,\
curr_x[-1] + env.config["input_upper_bound"][1] \
* env.config["dt"]])
assert next_x == pytest.approx(expected)