Merge branch 'develop' of github.com:Shunichi-03/PythonLinearNonlinearControl into develop

This commit is contained in:
Shunichi09 2021-05-28 14:29:40 +09:00
commit a3c3269c0f
51 changed files with 1993 additions and 757 deletions

View File

@ -6,6 +6,8 @@
| Two wheeled System (Constant Goal) | x | ✓ | 3 | 2 | | Two wheeled System (Constant Goal) | x | ✓ | 3 | 2 |
| Two wheeled System (Moving Goal) (Coming soon) | x | ✓ | 3 | 2 | | Two wheeled System (Moving Goal) (Coming soon) | x | ✓ | 3 | 2 |
| Cartpole (Swing up) | x | ✓ | 4 | 1 | | Cartpole (Swing up) | x | ✓ | 4 | 1 |
| Nonlinear Sample System Env | x | ✓ | 2 | 1 |
## [FistOrderLagEnv](PythonLinearNonlinearControl/envs/first_order_lag.py) ## [FistOrderLagEnv](PythonLinearNonlinearControl/envs/first_order_lag.py)
@ -54,3 +56,13 @@ mc = 1, mp = 0.2, l = 0.5, g = 9.81
### Cost. ### Cost.
<img src="assets/cartpole_score.png" width="300"> <img src="assets/cartpole_score.png" width="300">
## [Nonlinear Sample System Env](PythonLinearNonlinearControl/envs/nonlinear_sample_system.py)
## System equation.
<img src="assets/nonlinear_sample_system.png" width="400">
### Cost.
<img src="assets/nonlinear_sample_system_score.png" width="400">

View File

@ -1,5 +1,6 @@
import numpy as np import numpy as np
def rotate_pos(pos, angle): def rotate_pos(pos, angle):
""" Transformation the coordinate in the angle """ Transformation the coordinate in the angle
@ -14,6 +15,7 @@ def rotate_pos(pos, angle):
return np.dot(pos, rot_mat.T) return np.dot(pos, rot_mat.T)
def fit_angle_in_range(angles, min_angle=-np.pi, max_angle=np.pi): def fit_angle_in_range(angles, min_angle=-np.pi, max_angle=np.pi):
""" Check angle range and correct the range """ Check angle range and correct the range
@ -43,15 +45,17 @@ def fit_angle_in_range(angles, min_angle=-np.pi, max_angle=np.pi):
output = np.minimum(max_angle, np.maximum(min_angle, output)) output = np.minimum(max_angle, np.maximum(min_angle, output))
return output.reshape(output_shape) return output.reshape(output_shape)
def update_state_with_Runge_Kutta(state, u, functions, dt=0.01):
def update_state_with_Runge_Kutta(state, u, functions, dt=0.01, batch=True):
""" update state in Runge Kutta methods """ update state in Runge Kutta methods
Args: Args:
state (array-like): state of system state (array-like): state of system
u (array-like): input of system u (array-like): input of system
functions (list): update function of each state, functions (list): update function of each state,
each function will be called like func(*state, *u) each function will be called like func(state, u)
We expect that this function returns differential of each state We expect that this function returns differential of each state
dt (float): float in seconds dt (float): float in seconds
batch (bool): state and u is given by batch or not
Returns: Returns:
next_state (np.array): next state of system next_state (np.array): next state of system
@ -65,6 +69,7 @@ def update_state_with_Runge_Kutta(state, u, functions, dt=0.01):
Note that the function return x_dot. Note that the function return x_dot.
""" """
if not batch:
state_size = len(state) state_size = len(state)
assert state_size == len(functions), \ assert state_size == len(functions), \
"Invalid functions length, You need to give the state size functions" "Invalid functions length, You need to give the state size functions"
@ -74,27 +79,69 @@ def update_state_with_Runge_Kutta(state, u, functions, dt=0.01):
k2 = np.zeros(state_size) k2 = np.zeros(state_size)
k3 = np.zeros(state_size) k3 = np.zeros(state_size)
inputs = np.concatenate([state, u]) for i, func in enumerate(functions):
k0[i] = dt * func(state, u)
for i, func in enumerate(functions): for i, func in enumerate(functions):
k0[i] = dt * func(*inputs) k1[i] = dt * func(state + k0 / 2., u)
add_state = state + k0 / 2.
inputs = np.concatenate([add_state, u])
for i, func in enumerate(functions): for i, func in enumerate(functions):
k1[i] = dt * func(*inputs) k2[i] = dt * func(state + k1 / 2., u)
add_state = state + k1 / 2.
inputs = np.concatenate([add_state, u])
for i, func in enumerate(functions): for i, func in enumerate(functions):
k2[i] = dt * func(*inputs) k3[i] = dt * func(state + k2, u)
add_state = state + k2 return state + (k0 + 2. * k1 + 2. * k2 + k3) / 6.
inputs = np.concatenate([add_state, u])
else:
batch_size, state_size = state.shape
assert state_size == len(functions), \
"Invalid functions length, You need to give the state size functions"
k0 = np.zeros((batch_size, state_size))
k1 = np.zeros((batch_size, state_size))
k2 = np.zeros((batch_size, state_size))
k3 = np.zeros((batch_size, state_size))
for i, func in enumerate(functions): for i, func in enumerate(functions):
k3[i] = dt * func(*inputs) k0[:, i] = dt * func(state, u)
return (k0 + 2. * k1 + 2. * k2 + k3) / 6. for i, func in enumerate(functions):
k1[:, i] = dt * func(state + k0 / 2., u)
for i, func in enumerate(functions):
k2[:, i] = dt * func(state + k1 / 2., u)
for i, func in enumerate(functions):
k3[:, i] = dt * func(state + k2, u)
return state + (k0 + 2. * k1 + 2. * k2 + k3) / 6.
def line_search(grad, sol, compute_eval_val,
init_alpha=0.001, max_iter=100, update_ratio=1.):
""" line search
Args:
grad (numpy.ndarray): gradient
sol (numpy.ndarray): sol
compute_eval_val (numpy.ndarray): function to compute evaluation value
Returns:
alpha (float): result of line search
"""
assert grad.shape == sol.shape
base_val = np.inf
alpha = init_alpha
original_sol = sol.copy()
for _ in range(max_iter):
updated_sol = original_sol - alpha * grad
eval_val = compute_eval_val(updated_sol)
if eval_val < base_val:
alpha += init_alpha * update_ratio
base_val = eval_val
else:
break
return alpha

View File

@ -1,5 +1,6 @@
import numpy as np import numpy as np
class CartPoleConfigModule(): class CartPoleConfigModule():
# parameters # parameters
ENV_NAME = "CartPole-v0" ENV_NAME = "CartPole-v0"
@ -103,15 +104,15 @@ class CartPoleConfigModule():
""" """
if len(x.shape) > 2: if len(x.shape) > 2:
return (6. * (x[:, :, 0]**2) \ return (6. * (x[:, :, 0]**2)
+ 12. * ((np.cos(x[:, :, 2]) + 1.)**2) \ + 12. * ((np.cos(x[:, :, 2]) + 1.)**2)
+ 0.1 * (x[:, :, 1]**2) \ + 0.1 * (x[:, :, 1]**2)
+ 0.1 * (x[:, :, 3]**2))[:, :, np.newaxis] + 0.1 * (x[:, :, 3]**2))[:, :, np.newaxis]
elif len(x.shape) > 1: elif len(x.shape) > 1:
return (6. * (x[:, 0]**2) \ return (6. * (x[:, 0]**2)
+ 12. * ((np.cos(x[:, 2]) + 1.)**2) \ + 12. * ((np.cos(x[:, 2]) + 1.)**2)
+ 0.1 * (x[:, 1]**2) \ + 0.1 * (x[:, 1]**2)
+ 0.1 * (x[:, 3]**2))[:, np.newaxis] + 0.1 * (x[:, 3]**2))[:, np.newaxis]
return 6. * (x[0]**2) \ return 6. * (x[0]**2) \
@ -134,20 +135,20 @@ class CartPoleConfigModule():
""" """
if len(terminal_x.shape) > 1: if len(terminal_x.shape) > 1:
return (6. * (terminal_x[:, 0]**2) \ return (6. * (terminal_x[:, 0]**2)
+ 12. * ((np.cos(terminal_x[:, 2]) + 1.)**2) \ + 12. * ((np.cos(terminal_x[:, 2]) + 1.)**2)
+ 0.1 * (terminal_x[:, 1]**2) \ + 0.1 * (terminal_x[:, 1]**2)
+ 0.1 * (terminal_x[:, 3]**2))[:, np.newaxis] \ + 0.1 * (terminal_x[:, 3]**2))[:, np.newaxis] \
* CartPoleConfigModule.TERMINAL_WEIGHT * CartPoleConfigModule.TERMINAL_WEIGHT
return (6. * (terminal_x[0]**2) \ return (6. * (terminal_x[0]**2)
+ 12. * ((np.cos(terminal_x[2]) + 1.)**2) \ + 12. * ((np.cos(terminal_x[2]) + 1.)**2)
+ 0.1 * (terminal_x[1]**2) \ + 0.1 * (terminal_x[1]**2)
+ 0.1 * (terminal_x[3]**2)) \ + 0.1 * (terminal_x[3]**2)) \
* CartPoleConfigModule.TERMINAL_WEIGHT * CartPoleConfigModule.TERMINAL_WEIGHT
@staticmethod @staticmethod
def gradient_cost_fn_with_state(x, g_x, terminal=False): def gradient_cost_fn_state(x, g_x, terminal=False):
""" gradient of costs with respect to the state """ gradient of costs with respect to the state
Args: Args:
@ -163,7 +164,7 @@ class CartPoleConfigModule():
cost_dx1 = 0.2 * x[:, 1] cost_dx1 = 0.2 * x[:, 1]
cost_dx2 = 24. * (1 + np.cos(x[:, 2])) * -np.sin(x[:, 2]) cost_dx2 = 24. * (1 + np.cos(x[:, 2])) * -np.sin(x[:, 2])
cost_dx3 = 0.2 * x[:, 3] cost_dx3 = 0.2 * x[:, 3]
cost_dx = np.stack((cost_dx0, cost_dx1,\ cost_dx = np.stack((cost_dx0, cost_dx1,
cost_dx2, cost_dx3), axis=1) cost_dx2, cost_dx3), axis=1)
return cost_dx return cost_dx
@ -176,7 +177,7 @@ class CartPoleConfigModule():
return cost_dx * CartPoleConfigModule.TERMINAL_WEIGHT return cost_dx * CartPoleConfigModule.TERMINAL_WEIGHT
@staticmethod @staticmethod
def gradient_cost_fn_with_input(x, u): def gradient_cost_fn_input(x, u):
""" gradient of costs with respect to the input """ gradient of costs with respect to the input
Args: Args:
@ -188,7 +189,7 @@ class CartPoleConfigModule():
return 2. * u * np.diag(CartPoleConfigModule.R) return 2. * u * np.diag(CartPoleConfigModule.R)
@staticmethod @staticmethod
def hessian_cost_fn_with_state(x, g_x, terminal=False): def hessian_cost_fn_state(x, g_x, terminal=False):
""" hessian costs with respect to the state """ hessian costs with respect to the state
Args: Args:
@ -226,7 +227,7 @@ class CartPoleConfigModule():
return hessian[np.newaxis, :, :] * CartPoleConfigModule.TERMINAL_WEIGHT return hessian[np.newaxis, :, :] * CartPoleConfigModule.TERMINAL_WEIGHT
@staticmethod @staticmethod
def hessian_cost_fn_with_input(x, u): def hessian_cost_fn_input(x, u):
""" hessian costs with respect to the input """ hessian costs with respect to the input
Args: Args:
@ -241,7 +242,7 @@ class CartPoleConfigModule():
return np.tile(2.*CartPoleConfigModule.R, (pred_len, 1, 1)) return np.tile(2.*CartPoleConfigModule.R, (pred_len, 1, 1))
@staticmethod @staticmethod
def hessian_cost_fn_with_input_state(x, u): def hessian_cost_fn_input_state(x, u):
""" hessian costs with respect to the state and input """ hessian costs with respect to the state and input
Args: Args:

View File

@ -1,5 +1,6 @@
import numpy as np import numpy as np
class FirstOrderLagConfigModule(): class FirstOrderLagConfigModule():
# parameters # parameters
ENV_NAME = "FirstOrderLag-v0" ENV_NAME = "FirstOrderLag-v0"
@ -51,7 +52,7 @@ class FirstOrderLagConfigModule():
"MPC": { "MPC": {
}, },
"iLQR": { "iLQR": {
"max_iter": 500, "max_iters": 500,
"init_mu": 1., "init_mu": 1.,
"mu_min": 1e-6, "mu_min": 1e-6,
"mu_max": 1e10, "mu_max": 1e10,
@ -59,7 +60,7 @@ class FirstOrderLagConfigModule():
"threshold": 1e-6, "threshold": 1e-6,
}, },
"DDP": { "DDP": {
"max_iter": 500, "max_iters": 500,
"init_mu": 1., "init_mu": 1.,
"mu_min": 1e-6, "mu_min": 1e-6,
"mu_max": 1e10, "mu_max": 1e10,
@ -114,7 +115,7 @@ class FirstOrderLagConfigModule():
* np.diag(FirstOrderLagConfigModule.Sf) * np.diag(FirstOrderLagConfigModule.Sf)
@staticmethod @staticmethod
def gradient_cost_fn_with_state(x, g_x, terminal=False): def gradient_cost_fn_state(x, g_x, terminal=False):
""" gradient of costs with respect to the state """ gradient of costs with respect to the state
Args: Args:
@ -128,11 +129,11 @@ class FirstOrderLagConfigModule():
if not terminal: if not terminal:
return 2. * (x - g_x) * np.diag(FirstOrderLagConfigModule.Q) return 2. * (x - g_x) * np.diag(FirstOrderLagConfigModule.Q)
return (2. * (x - g_x) \ return (2. * (x - g_x)
* np.diag(FirstOrderLagConfigModule.Sf))[np.newaxis, :] * np.diag(FirstOrderLagConfigModule.Sf))[np.newaxis, :]
@staticmethod @staticmethod
def gradient_cost_fn_with_input(x, u): def gradient_cost_fn_input(x, u):
""" gradient of costs with respect to the input """ gradient of costs with respect to the input
Args: Args:
@ -145,7 +146,7 @@ class FirstOrderLagConfigModule():
return 2. * u * np.diag(FirstOrderLagConfigModule.R) return 2. * u * np.diag(FirstOrderLagConfigModule.R)
@staticmethod @staticmethod
def hessian_cost_fn_with_state(x, g_x, terminal=False): def hessian_cost_fn_state(x, g_x, terminal=False):
""" hessian costs with respect to the state """ hessian costs with respect to the state
Args: Args:
@ -164,7 +165,7 @@ class FirstOrderLagConfigModule():
return np.tile(2.*FirstOrderLagConfigModule.Sf, (1, 1, 1)) return np.tile(2.*FirstOrderLagConfigModule.Sf, (1, 1, 1))
@staticmethod @staticmethod
def hessian_cost_fn_with_input(x, u): def hessian_cost_fn_input(x, u):
""" hessian costs with respect to the input """ hessian costs with respect to the input
Args: Args:
@ -180,7 +181,7 @@ class FirstOrderLagConfigModule():
return np.tile(2.*FirstOrderLagConfigModule.R, (pred_len, 1, 1)) return np.tile(2.*FirstOrderLagConfigModule.R, (pred_len, 1, 1))
@staticmethod @staticmethod
def hessian_cost_fn_with_input_state(x, u): def hessian_cost_fn_input_state(x, u):
""" hessian costs with respect to the state and input """ hessian costs with respect to the state and input
Args: Args:

View File

@ -1,6 +1,8 @@
from .first_order_lag import FirstOrderLagConfigModule from .first_order_lag import FirstOrderLagConfigModule
from .two_wheeled import TwoWheeledConfigModule from .two_wheeled import TwoWheeledConfigModule, TwoWheeledExtendConfigModule
from .cartpole import CartPoleConfigModule from .cartpole import CartPoleConfigModule
from .nonlinear_sample_system import NonlinearSampleSystemConfigModule, NonlinearSampleSystemExtendConfigModule
def make_config(args): def make_config(args):
""" """
@ -10,6 +12,12 @@ def make_config(args):
if args.env == "FirstOrderLag": if args.env == "FirstOrderLag":
return FirstOrderLagConfigModule() return FirstOrderLagConfigModule()
elif args.env == "TwoWheeledConst" or args.env == "TwoWheeledTrack": elif args.env == "TwoWheeledConst" or args.env == "TwoWheeledTrack":
if args.controller_type == "NMPCCGMRES":
return TwoWheeledExtendConfigModule()
return TwoWheeledConfigModule() return TwoWheeledConfigModule()
elif args.env == "CartPole": elif args.env == "CartPole":
return CartPoleConfigModule() return CartPoleConfigModule()
elif args.env == "NonlinearSample":
if args.controller_type == "NMPCCGMRES":
return NonlinearSampleSystemExtendConfigModule()
return NonlinearSampleSystemConfigModule()

View File

@ -0,0 +1,353 @@
import numpy as np
class NonlinearSampleSystemConfigModule():
# parameters
ENV_NAME = "NonlinearSampleSystem-v0"
PLANNER_TYPE = "Const"
TYPE = "Nonlinear"
TASK_HORIZON = 2000
PRED_LEN = 10
STATE_SIZE = 2
INPUT_SIZE = 1
DT = 0.01
R = np.diag([1.])
Q = None
Sf = None
# bounds
INPUT_LOWER_BOUND = np.array([-0.5])
INPUT_UPPER_BOUND = np.array([0.5])
def __init__(self):
"""
"""
# opt configs
self.opt_config = {
"Random": {
"popsize": 5000
},
"CEM": {
"popsize": 500,
"num_elites": 50,
"max_iters": 15,
"alpha": 0.3,
"init_var": 9.,
"threshold": 0.001
},
"MPPI": {
"beta": 0.6,
"popsize": 5000,
"kappa": 0.9,
"noise_sigma": 0.5,
},
"MPPIWilliams": {
"popsize": 5000,
"lambda": 1.,
"noise_sigma": 0.9,
},
"iLQR": {
"max_iters": 500,
"init_mu": 1.,
"mu_min": 1e-6,
"mu_max": 1e10,
"init_delta": 2.,
"threshold": 1e-6,
},
"DDP": {
"max_iters": 500,
"init_mu": 1.,
"mu_min": 1e-6,
"mu_max": 1e10,
"init_delta": 2.,
"threshold": 1e-6,
},
"NMPC": {
"threshold": 0.01,
"max_iters": 5000,
"learning_rate": 0.01,
"optimizer_mode": "conjugate"
}
}
@staticmethod
def input_cost_fn(u):
""" input cost functions
Args:
u (numpy.ndarray): input, shape(pred_len, input_size)
or shape(pop_size, pred_len, input_size)
Returns:
cost (numpy.ndarray): cost of input, shape(pred_len, input_size) or
shape(pop_size, pred_len, input_size)
"""
return (u**2) * np.diag(NonlinearSampleSystemConfigModule.R)
@staticmethod
def state_cost_fn(x, g_x):
""" state cost function
Args:
x (numpy.ndarray): state, shape(pred_len, state_size)
or shape(pop_size, pred_len, state_size)
g_x (numpy.ndarray): goal state, shape(pred_len, state_size)
or shape(pop_size, pred_len, state_size)
Returns:
cost (numpy.ndarray): cost of state, shape(pred_len, 1) or
shape(pop_size, pred_len, 1)
"""
if len(x.shape) > 2:
return (0.5 * (x[:, :, 0]**2) +
0.5 * (x[:, :, 1]**2))[:, :, np.newaxis]
elif len(x.shape) > 1:
return (0.5 * (x[:, 0]**2) + 0.5 * (x[:, 1]**2))[:, np.newaxis]
return 0.5 * (x[0]**2) + 0.5 * (x[1]**2)
@staticmethod
def terminal_state_cost_fn(terminal_x, terminal_g_x):
"""
Args:
terminal_x (numpy.ndarray): terminal state,
shape(state_size, ) or shape(pop_size, state_size)
terminal_g_x (numpy.ndarray): terminal goal state,
shape(state_size, ) or shape(pop_size, state_size)
Returns:
cost (numpy.ndarray): cost of state, shape(pred_len, ) or
shape(pop_size, pred_len)
"""
if len(terminal_x.shape) > 1:
return (0.5 * (terminal_x[:, 0]**2) +
0.5 * (terminal_x[:, 1]**2))[:, np.newaxis]
return 0.5 * (terminal_x[0]**2) + 0.5 * (terminal_x[1]**2)
@staticmethod
def gradient_cost_fn_state(x, g_x, terminal=False):
""" gradient of costs with respect to the state
Args:
x (numpy.ndarray): state, shape(pred_len, state_size)
g_x (numpy.ndarray): goal state, shape(pred_len, state_size)
Returns:
l_x (numpy.ndarray): gradient of cost, shape(pred_len, state_size)
or shape(1, state_size)
"""
if not terminal:
cost_dx0 = x[:, 0]
cost_dx1 = x[:, 1]
cost_dx = np.stack((cost_dx0, cost_dx1), axis=1)
return cost_dx
cost_dx0 = x[0]
cost_dx1 = x[1]
cost_dx = np.array([[cost_dx0, cost_dx1]])
return cost_dx
@staticmethod
def gradient_cost_fn_input(x, u):
""" gradient of costs with respect to the input
Args:
x (numpy.ndarray): state, shape(pred_len, state_size)
u (numpy.ndarray): goal state, shape(pred_len, input_size)
Returns:
l_u (numpy.ndarray): gradient of cost, shape(pred_len, input_size)
"""
return 2. * u * np.diag(NonlinearSampleSystemConfigModule.R)
@staticmethod
def hessian_cost_fn_state(x, g_x, terminal=False):
""" hessian costs with respect to the state
Args:
x (numpy.ndarray): state, shape(pred_len, state_size)
g_x (numpy.ndarray): goal state, shape(pred_len, state_size)
Returns:
l_xx (numpy.ndarray): gradient of cost,
shape(pred_len, state_size, state_size) or
shape(1, state_size, state_size) or
"""
if not terminal:
(pred_len, state_size) = x.shape
hessian = np.eye(state_size)
hessian = np.tile(hessian, (pred_len, 1, 1))
hessian[:, 0, 0] = 1.
hessian[:, 1, 1] = 1.
return hessian
state_size = len(x)
hessian = np.eye(state_size)
hessian[0, 0] = 1.
hessian[1, 1] = 1.
return hessian[np.newaxis, :, :]
@staticmethod
def hessian_cost_fn_input(x, u):
""" hessian costs with respect to the input
Args:
x (numpy.ndarray): state, shape(pred_len, state_size)
u (numpy.ndarray): goal state, shape(pred_len, input_size)
Returns:
l_uu (numpy.ndarray): gradient of cost,
shape(pred_len, input_size, input_size)
"""
(pred_len, _) = u.shape
return np.tile(NonlinearSampleSystemConfigModule.R, (pred_len, 1, 1))
@staticmethod
def hessian_cost_fn_input_state(x, u):
""" hessian costs with respect to the state and input
Args:
x (numpy.ndarray): state, shape(pred_len, state_size)
u (numpy.ndarray): goal state, shape(pred_len, input_size)
Returns:
l_ux (numpy.ndarray): gradient of cost ,
shape(pred_len, input_size, state_size)
"""
(_, state_size) = x.shape
(pred_len, input_size) = u.shape
return np.zeros((pred_len, input_size, state_size))
@staticmethod
def gradient_hamiltonian_input(x, lam, u, g_x):
"""
Args:
x (numpy.ndarray): shape(pred_len+1, state_size)
lam (numpy.ndarray): shape(pred_len, state_size)
u (numpy.ndarray): shape(pred_len, input_size)
g_xs (numpy.ndarray): shape(pred_len, state_size)
Returns:
F (numpy.ndarray), shape(pred_len, input_size)
"""
if len(x.shape) == 1:
input_size = u.shape[0]
F = np.zeros(input_size)
F[0] = u[0] + lam[1]
return F
elif len(x.shape) == 2:
pred_len, input_size = u.shape
F = np.zeros((pred_len, input_size))
for i in range(pred_len):
F[i, 0] = u[i, 0] + lam[i, 1]
return F
else:
raise NotImplementedError
@staticmethod
def gradient_hamiltonian_state(x, lam, u, g_x):
"""
Args:
x (numpy.ndarray): shape(pred_len+1, state_size)
lam (numpy.ndarray): shape(pred_len, state_size)
u (numpy.ndarray): shape(pred_len, input_size)
g_xs (numpy.ndarray): shape(pred_len, state_size)
Returns:
lam_dot (numpy.ndarray), shape(state_size, )
"""
if len(lam.shape) == 1:
state_size = lam.shape[0]
lam_dot = np.zeros(state_size)
lam_dot[0] = x[0] - (2. * x[0] * x[1] + 1.) * lam[1]
lam_dot[1] = x[1] + lam[0] + \
(-3. * (x[1]**2) - x[0]**2 + 1.) * lam[1]
return lam_dot
elif len(lam.shape) == 2:
pred_len, state_size = lam.shape
lam_dot = np.zeros((pred_len, state_size))
for i in range(pred_len):
lam_dot[i, 0] = x[i, 0] - \
(2. * x[i, 0] * x[i, 1] + 1.) * lam[i, 1]
lam_dot[i, 1] = x[i, 1] + lam[i, 0] + \
(-3. * (x[i, 1]**2) - x[i, 0]**2 + 1.) * lam[i, 1]
return lam_dot
else:
raise NotImplementedError
class NonlinearSampleSystemExtendConfigModule(NonlinearSampleSystemConfigModule):
def __init__(self):
super().__init__()
self.opt_config = {
"NMPCCGMRES": {
"threshold": 1e-3,
"zeta": 100.,
"delta": 0.01,
"alpha": 0.5,
"tf": 1.,
"constraint": True
},
"NMPCNewton": {
"threshold": 1e-3,
"max_iteration": 500,
"learning_rate": 1e-3
}
}
@staticmethod
def gradient_hamiltonian_input_with_constraint(x, lam, u, g_x, dummy_u, raw):
"""
Args:
x (numpy.ndarray): shape(pred_len+1, state_size)
lam (numpy.ndarray): shape(pred_len, state_size)
u (numpy.ndarray): shape(pred_len, input_size)
g_xs (numpy.ndarray): shape(pred_len, state_size)
dummy_u (numpy.ndarray): shape(pred_len, input_size)
raw (numpy.ndarray): shape(pred_len, input_size), Lagrangian for constraints
Returns:
F (numpy.ndarray), shape(pred_len, 3)
"""
if len(x.shape) == 1:
vanilla_F = np.zeros(1)
extend_F = np.zeros(1) # 1 is the same as input size
extend_C = np.zeros(1)
vanilla_F[0] = u[0] + lam[1] + 2. * raw[0] * u[0]
extend_F[0] = -0.01 + 2. * raw[0] * dummy_u[0]
extend_C[0] = u[0]**2 + dummy_u[0]**2 - \
NonlinearSampleSystemConfigModule.INPUT_LOWER_BOUND**2
F = np.concatenate([vanilla_F, extend_F, extend_C])
elif len(x.shape) == 2:
pred_len, _ = u.shape
vanilla_F = np.zeros((pred_len, 1))
extend_F = np.zeros((pred_len, 1)) # 1 is the same as input size
extend_C = np.zeros((pred_len, 1))
for i in range(pred_len):
vanilla_F[i, 0] = \
u[i, 0] + lam[i, 1] + 2. * raw[i, 0] * u[i, 0]
extend_F[i, 0] = -0.01 + 2. * raw[i, 0] * dummy_u[i, 0]
extend_C[i, 0] = u[i, 0]**2 + dummy_u[i, 0]**2 - \
NonlinearSampleSystemConfigModule.INPUT_LOWER_BOUND**2
F = np.concatenate([vanilla_F, extend_F, extend_C], axis=1)
return F

View File

@ -4,6 +4,7 @@ from matplotlib.axes import Axes
from ..plotters.plot_objs import square_with_angle, square from ..plotters.plot_objs import square_with_angle, square
from ..common.utils import fit_angle_in_range from ..common.utils import fit_angle_in_range
class TwoWheeledConfigModule(): class TwoWheeledConfigModule():
# parameters # parameters
ENV_NAME = "TwoWheeled-v0" ENV_NAME = "TwoWheeled-v0"
@ -22,10 +23,15 @@ class TwoWheeledConfigModule():
Sf = np.diag([5., 5., 1.]) Sf = np.diag([5., 5., 1.])
""" """
# for track goal # for track goal
"""
R = np.diag([0.01, 0.01]) R = np.diag([0.01, 0.01])
Q = np.diag([2.5, 2.5, 0.01]) Q = np.diag([2.5, 2.5, 0.01])
Sf = np.diag([2.5, 2.5, 0.01]) Sf = np.diag([2.5, 2.5, 0.01])
"""
# for track goal to NMPC
R = np.diag([1., 1.])
Q = np.diag([0.001, 0.001, 0.001])
Sf = np.diag([1., 1., 0.001])
# bounds # bounds
INPUT_LOWER_BOUND = np.array([-1.5, -3.14]) INPUT_LOWER_BOUND = np.array([-1.5, -3.14])
INPUT_UPPER_BOUND = np.array([1.5, 3.14]) INPUT_UPPER_BOUND = np.array([1.5, 3.14])
@ -61,7 +67,7 @@ class TwoWheeledConfigModule():
"noise_sigma": 1., "noise_sigma": 1.,
}, },
"iLQR": { "iLQR": {
"max_iter": 500, "max_iters": 500,
"init_mu": 1., "init_mu": 1.,
"mu_min": 1e-6, "mu_min": 1e-6,
"mu_max": 1e10, "mu_max": 1e10,
@ -69,13 +75,19 @@ class TwoWheeledConfigModule():
"threshold": 1e-6, "threshold": 1e-6,
}, },
"DDP": { "DDP": {
"max_iter": 500, "max_iters": 500,
"init_mu": 1., "init_mu": 1.,
"mu_min": 1e-6, "mu_min": 1e-6,
"mu_max": 1e10, "mu_max": 1e10,
"init_delta": 2., "init_delta": 2.,
"threshold": 1e-6, "threshold": 1e-6,
}, },
"NMPC": {
"threshold": 0.01,
"max_iters": 5000,
"learning_rate": 0.01,
"optimizer_mode": "conjugate"
},
"NMPC-CGMRES": { "NMPC-CGMRES": {
}, },
"NMPC-Newton": { "NMPC-Newton": {
@ -142,13 +154,13 @@ class TwoWheeledConfigModule():
cost (numpy.ndarray): cost of state, shape(pred_len, ) or cost (numpy.ndarray): cost of state, shape(pred_len, ) or
shape(pop_size, pred_len) shape(pop_size, pred_len)
""" """
terminal_diff = TwoWheeledConfigModule.fit_diff_in_range(terminal_x \ terminal_diff = TwoWheeledConfigModule.fit_diff_in_range(terminal_x
- terminal_g_x) - terminal_g_x)
return ((terminal_diff)**2) * np.diag(TwoWheeledConfigModule.Sf) return ((terminal_diff)**2) * np.diag(TwoWheeledConfigModule.Sf)
@staticmethod @staticmethod
def gradient_cost_fn_with_state(x, g_x, terminal=False): def gradient_cost_fn_state(x, g_x, terminal=False):
""" gradient of costs with respect to the state """ gradient of costs with respect to the state
Args: Args:
@ -164,11 +176,11 @@ class TwoWheeledConfigModule():
if not terminal: if not terminal:
return 2. * (diff) * np.diag(TwoWheeledConfigModule.Q) return 2. * (diff) * np.diag(TwoWheeledConfigModule.Q)
return (2. * (diff) \ return (2. * (diff)
* np.diag(TwoWheeledConfigModule.Sf))[np.newaxis, :] * np.diag(TwoWheeledConfigModule.Sf))[np.newaxis, :]
@staticmethod @staticmethod
def gradient_cost_fn_with_input(x, u): def gradient_cost_fn_input(x, u):
""" gradient of costs with respect to the input """ gradient of costs with respect to the input
Args: Args:
@ -181,7 +193,7 @@ class TwoWheeledConfigModule():
return 2. * u * np.diag(TwoWheeledConfigModule.R) return 2. * u * np.diag(TwoWheeledConfigModule.R)
@staticmethod @staticmethod
def hessian_cost_fn_with_state(x, g_x, terminal=False): def hessian_cost_fn_state(x, g_x, terminal=False):
""" hessian costs with respect to the state """ hessian costs with respect to the state
Args: Args:
@ -200,7 +212,7 @@ class TwoWheeledConfigModule():
return np.tile(2.*TwoWheeledConfigModule.Sf, (1, 1, 1)) return np.tile(2.*TwoWheeledConfigModule.Sf, (1, 1, 1))
@staticmethod @staticmethod
def hessian_cost_fn_with_input(x, u): def hessian_cost_fn_input(x, u):
""" hessian costs with respect to the input """ hessian costs with respect to the input
Args: Args:
@ -216,7 +228,7 @@ class TwoWheeledConfigModule():
return np.tile(2.*TwoWheeledConfigModule.R, (pred_len, 1, 1)) return np.tile(2.*TwoWheeledConfigModule.R, (pred_len, 1, 1))
@staticmethod @staticmethod
def hessian_cost_fn_with_input_state(x, u): def hessian_cost_fn_input_state(x, u):
""" hessian costs with respect to the state and input """ hessian costs with respect to the state and input
Args: Args:
@ -231,3 +243,166 @@ class TwoWheeledConfigModule():
(pred_len, input_size) = u.shape (pred_len, input_size) = u.shape
return np.zeros((pred_len, input_size, state_size)) return np.zeros((pred_len, input_size, state_size))
@staticmethod
def gradient_hamiltonian_input(x, lam, u, g_x):
"""
Args:
x (numpy.ndarray): shape(pred_len+1, state_size)
lam (numpy.ndarray): shape(pred_len, state_size)
u (numpy.ndarray): shape(pred_len, input_size)
g_xs (numpy.ndarray): shape(pred_len, state_size)
Returns:
F (numpy.ndarray), shape(pred_len, input_size)
"""
if len(x.shape) == 1:
input_size = u.shape[0]
F = np.zeros(input_size)
F[0] = u[0] * TwoWheeledConfigModule.R[0, 0] + \
lam[0] * np.cos(x[2]) + lam[1] * np.sin(x[2])
F[1] = u[1] * TwoWheeledConfigModule.R[1, 1] + lam[2]
return F
elif len(x.shape) == 2:
pred_len, input_size = u.shape
F = np.zeros((pred_len, input_size))
for i in range(pred_len):
F[i, 0] = u[i, 0] * TwoWheeledConfigModule.R[0, 0] + \
lam[i, 0] * np.cos(x[i, 2]) + lam[i, 1] * np.sin(x[i, 2])
F[i, 1] = u[i, 1] * TwoWheeledConfigModule.R[1, 1] + lam[i, 2]
return F
else:
raise NotImplementedError
@staticmethod
def gradient_hamiltonian_state(x, lam, u, g_x):
"""
Args:
x (numpy.ndarray): shape(pred_len+1, state_size)
lam (numpy.ndarray): shape(pred_len, state_size)
u (numpy.ndarray): shape(pred_len, input_size)
g_xs (numpy.ndarray): shape(pred_len, state_size)
Returns:
lam_dot (numpy.ndarray), shape(state_size, )
"""
if len(lam.shape) == 1:
state_size = lam.shape[0]
lam_dot = np.zeros(state_size)
lam_dot[0] = \
(x[0] - g_x[0]) * TwoWheeledConfigModule.Q[0, 0]
lam_dot[1] = \
(x[1] - g_x[1]) * TwoWheeledConfigModule.Q[1, 1]
relative_angle = fit_angle_in_range(x[2] - g_x[2])
lam_dot[2] = \
relative_angle * TwoWheeledConfigModule.Q[2, 2] \
- lam[0] * u[0] * np.sin(x[2]) \
+ lam[1] * u[0] * np.cos(x[2])
return lam_dot
elif len(lam.shape) == 2:
pred_len, state_size = lam.shape
lam_dot = np.zeros((pred_len, state_size))
for i in range(pred_len):
lam_dot[i, 0] = \
(x[i, 0] - g_x[i, 0]) * TwoWheeledConfigModule.Q[0, 0]
lam_dot[i, 1] = \
(x[i, 1] - g_x[i, 1]) * TwoWheeledConfigModule.Q[1, 1]
relative_angle = fit_angle_in_range(x[i, 2] - g_x[i, 2])
lam_dot[i, 2] = \
relative_angle * TwoWheeledConfigModule.Q[2, 2] \
- lam[i, 0] * u[i, 0] * np.sin(x[i, 2]) \
+ lam[i, 1] * u[i, 0] * np.cos(x[i, 2])
return lam_dot
else:
raise NotImplementedError
class TwoWheeledExtendConfigModule(TwoWheeledConfigModule):
PRED_LEN = 20
def __init__(self):
super().__init__()
self.opt_config = {
"NMPCCGMRES": {
"threshold": 1e-3,
"zeta": 5.,
"delta": 0.01,
"alpha": 0.5,
"tf": 1.,
"constraint": True
},
"NMPCNewton": {
"threshold": 1e-3,
"max_iteration": 500,
"learning_rate": 1e-3
}
}
@staticmethod
def gradient_hamiltonian_input_with_constraint(x, lam, u, g_x, dummy_u, raw):
"""
Args:
x (numpy.ndarray): shape(pred_len+1, state_size)
lam (numpy.ndarray): shape(pred_len, state_size)
u (numpy.ndarray): shape(pred_len, input_size)
g_xs (numpy.ndarray): shape(pred_len, state_size)
dummy_u (numpy.ndarray): shape(pred_len, input_size)
raw (numpy.ndarray): shape(pred_len, input_size), Lagrangian for constraints
Returns:
F (numpy.ndarray), shape(pred_len, 3)
"""
if len(x.shape) == 1:
vanilla_F = np.zeros(2)
extend_F = np.zeros(2) # 1 is the same as input size
extend_C = np.zeros(2)
vanilla_F[0] = u[0] + lam[0] * \
np.cos(x[2]) + lam[1] * np.sin(x[2]) + 2. * raw[0] * u[0]
vanilla_F[1] = u[1] + lam[2] + 2 * raw[1] * u[1]
extend_F[0] = -0.01 + 2. * raw[0] * dummy_u[0]
extend_F[1] = -0.01 + 2. * raw[1] * dummy_u[1]
extend_C[0] = u[0]**2 + dummy_u[0]**2 - \
TwoWheeledConfigModule.INPUT_LOWER_BOUND[0]**2
extend_C[1] = u[1]**2 + dummy_u[1]**2 - \
TwoWheeledConfigModule.INPUT_LOWER_BOUND[1]**2
F = np.concatenate([vanilla_F, extend_F, extend_C])
elif len(x.shape) == 2:
pred_len, _ = u.shape
vanilla_F = np.zeros((pred_len, 2))
extend_F = np.zeros((pred_len, 2)) # 1 is the same as input size
extend_C = np.zeros((pred_len, 2))
for i in range(pred_len):
vanilla_F[i, 0] = u[i, 0] + lam[i, 0] * \
np.cos(x[i, 2]) + lam[i, 1] * \
np.sin(x[i, 2]) + 2. * raw[i, 0] * u[i, 0]
vanilla_F[i, 1] = u[i, 1] + lam[i, 2] + 2 * raw[i, 1] * u[i, 1]
extend_F[i, 0] = -0.01 + 2. * raw[i, 0] * dummy_u[i, 0]
extend_F[i, 1] = -0.01 + 2. * raw[i, 1] * dummy_u[i, 1]
extend_C[i, 0] = u[i, 0]**2 + dummy_u[i, 0]**2 - \
TwoWheeledConfigModule.INPUT_LOWER_BOUND[0]**2
extend_C[i, 1] = u[i, 1]**2 + dummy_u[i, 1]**2 - \
TwoWheeledConfigModule.INPUT_LOWER_BOUND[1]**2
F = np.concatenate([vanilla_F, extend_F, extend_C], axis=1)
return F

View File

@ -8,6 +8,7 @@ from ..envs.cost import calc_cost
logger = getLogger(__name__) logger = getLogger(__name__)
class CEM(Controller): class CEM(Controller):
""" Cross Entropy Method for linear and nonlinear method """ Cross Entropy Method for linear and nonlinear method
@ -19,6 +20,7 @@ class CEM(Controller):
using probabilistic dynamics models. using probabilistic dynamics models.
In Advances in Neural Information Processing Systems (pp. 4754-4765). In Advances in Neural Information Processing Systems (pp. 4754-4765).
""" """
def __init__(self, config, model): def __init__(self, config, model):
super(CEM, self).__init__(config, model) super(CEM, self).__init__(config, model)
@ -50,7 +52,7 @@ class CEM(Controller):
self.input_cost_fn = config.input_cost_fn self.input_cost_fn = config.input_cost_fn
# init mean # init mean
self.init_mean = np.tile((config.INPUT_UPPER_BOUND \ self.init_mean = np.tile((config.INPUT_UPPER_BOUND
+ config.INPUT_LOWER_BOUND) / 2., + config.INPUT_LOWER_BOUND) / 2.,
self.pred_len) self.pred_len)
self.prev_sol = self.init_mean.copy() self.prev_sol = self.init_mean.copy()
@ -86,7 +88,7 @@ class CEM(Controller):
# make distribution # make distribution
X = stats.truncnorm(-1, 1, X = stats.truncnorm(-1, 1,
loc=np.zeros_like(mean),\ loc=np.zeros_like(mean),
scale=np.ones_like(mean)) scale=np.ones_like(mean))
while (opt_count < self.max_iters) and np.max(var) > self.epsilon: while (opt_count < self.max_iters) and np.max(var) > self.epsilon:

View File

@ -2,9 +2,11 @@ import numpy as np
from ..envs.cost import calc_cost from ..envs.cost import calc_cost
class Controller(): class Controller():
""" Controller class """ Controller class
""" """
def __init__(self, config, model): def __init__(self, config, model):
""" """
""" """
@ -49,19 +51,7 @@ class Controller():
# get particle cost # get particle cost
costs = calc_cost(pred_xs, samples, g_xs, costs = calc_cost(pred_xs, samples, g_xs,
self.state_cost_fn, self.input_cost_fn, \ self.state_cost_fn, self.input_cost_fn,
self.terminal_state_cost_fn) self.terminal_state_cost_fn)
return costs return costs
@staticmethod
def gradient_hamiltonian_x(x, u, lam):
""" gradient of hamitonian with respect to the state,
"""
raise NotImplementedError("Implement gradient of hamitonian with respect to the state")
@staticmethod
def gradient_hamiltonian_u(x, u, lam):
""" gradient of hamitonian with respect to the input
"""
raise NotImplementedError("Implement gradient of hamitonian with respect to the input")

View File

@ -8,6 +8,7 @@ from ..envs.cost import calc_cost
logger = getLogger(__name__) logger = getLogger(__name__)
class DDP(Controller): class DDP(Controller):
""" Differential Dynamic Programming """ Differential Dynamic Programming
@ -18,6 +19,7 @@ class DDP(Controller):
https://github.com/studywolf/control, and https://github.com/studywolf/control, and
https://github.com/anassinator/ilqr https://github.com/anassinator/ilqr
""" """
def __init__(self, config, model): def __init__(self, config, model):
""" """
""" """
@ -30,15 +32,15 @@ class DDP(Controller):
self.state_cost_fn = config.state_cost_fn self.state_cost_fn = config.state_cost_fn
self.terminal_state_cost_fn = config.terminal_state_cost_fn self.terminal_state_cost_fn = config.terminal_state_cost_fn
self.input_cost_fn = config.input_cost_fn self.input_cost_fn = config.input_cost_fn
self.gradient_cost_fn_with_state = config.gradient_cost_fn_with_state self.gradient_cost_fn_state = config.gradient_cost_fn_state
self.gradient_cost_fn_with_input = config.gradient_cost_fn_with_input self.gradient_cost_fn_input = config.gradient_cost_fn_input
self.hessian_cost_fn_with_state = config.hessian_cost_fn_with_state self.hessian_cost_fn_state = config.hessian_cost_fn_state
self.hessian_cost_fn_with_input = config.hessian_cost_fn_with_input self.hessian_cost_fn_input = config.hessian_cost_fn_input
self.hessian_cost_fn_with_input_state = \ self.hessian_cost_fn_input_state = \
config.hessian_cost_fn_with_input_state config.hessian_cost_fn_input_state
# controller parameters # controller parameters
self.max_iter = config.opt_config["DDP"]["max_iter"] self.max_iters = config.opt_config["DDP"]["max_iters"]
self.init_mu = config.opt_config["DDP"]["init_mu"] self.init_mu = config.opt_config["DDP"]["init_mu"]
self.mu = self.init_mu self.mu = self.init_mu
self.mu_min = config.opt_config["DDP"]["mu_min"] self.mu_min = config.opt_config["DDP"]["mu_min"]
@ -86,7 +88,7 @@ class DDP(Controller):
# line search param # line search param
alphas = 1.1**(-np.arange(10)**2) alphas = 1.1**(-np.arange(10)**2)
while opt_count < self.max_iter: while opt_count < self.max_iters:
accepted_sol = False accepted_sol = False
# forward # forward
@ -98,7 +100,7 @@ class DDP(Controller):
try: try:
# backward # backward
k, K = self.backward(f_x, f_u, f_xx, f_ux, f_uu, \ k, K = self.backward(f_x, f_u, f_xx, f_ux, f_uu,
l_x, l_xx, l_u, l_uu, l_ux) l_x, l_xx, l_u, l_uu, l_ux)
# line search # line search
@ -139,7 +141,7 @@ class DDP(Controller):
# increase regularization term. # increase regularization term.
self.delta = max(1.0, self.delta) * self.init_delta self.delta = max(1.0, self.delta) * self.init_delta
self.mu = max(self.mu_min, self.mu * self.delta) self.mu = max(self.mu_min, self.mu * self.delta)
logger.debug("Update regularization term to {}"\ logger.debug("Update regularization term to {}"
.format(self.mu)) .format(self.mu))
if self.mu >= self.mu_max: if self.mu >= self.mu_max:
logger.debug("Reach Max regularization term") logger.debug("Reach Max regularization term")
@ -262,31 +264,31 @@ class DDP(Controller):
shape(pred_len, input_size, state_size) shape(pred_len, input_size, state_size)
""" """
# l_x.shape = (pred_len+1, state_size) # l_x.shape = (pred_len+1, state_size)
l_x = self.gradient_cost_fn_with_state(pred_xs[:-1], l_x = self.gradient_cost_fn_state(pred_xs[:-1],
g_x[:-1], terminal=False) g_x[:-1], terminal=False)
terminal_l_x = \ terminal_l_x = \
self.gradient_cost_fn_with_state(pred_xs[-1], self.gradient_cost_fn_state(pred_xs[-1],
g_x[-1], terminal=True) g_x[-1], terminal=True)
l_x = np.concatenate((l_x, terminal_l_x), axis=0) l_x = np.concatenate((l_x, terminal_l_x), axis=0)
# l_u.shape = (pred_len, input_size) # l_u.shape = (pred_len, input_size)
l_u = self.gradient_cost_fn_with_input(pred_xs[:-1], sol) l_u = self.gradient_cost_fn_input(pred_xs[:-1], sol)
# l_xx.shape = (pred_len+1, state_size, state_size) # l_xx.shape = (pred_len+1, state_size, state_size)
l_xx = self.hessian_cost_fn_with_state(pred_xs[:-1], l_xx = self.hessian_cost_fn_state(pred_xs[:-1],
g_x[:-1], terminal=False) g_x[:-1], terminal=False)
terminal_l_xx = \ terminal_l_xx = \
self.hessian_cost_fn_with_state(pred_xs[-1], self.hessian_cost_fn_state(pred_xs[-1],
g_x[-1], terminal=True) g_x[-1], terminal=True)
l_xx = np.concatenate((l_xx, terminal_l_xx), axis=0) l_xx = np.concatenate((l_xx, terminal_l_xx), axis=0)
# l_uu.shape = (pred_len, input_size, input_size) # l_uu.shape = (pred_len, input_size, input_size)
l_uu = self.hessian_cost_fn_with_input(pred_xs[:-1], sol) l_uu = self.hessian_cost_fn_input(pred_xs[:-1], sol)
# l_ux.shape = (pred_len, input_size, state_size) # l_ux.shape = (pred_len, input_size, state_size)
l_ux = self.hessian_cost_fn_with_input_state(pred_xs[:-1], sol) l_ux = self.hessian_cost_fn_input_state(pred_xs[:-1], sol)
return l_x, l_xx, l_u, l_uu, l_ux return l_x, l_xx, l_u, l_uu, l_ux

View File

@ -8,6 +8,7 @@ from ..envs.cost import calc_cost
logger = getLogger(__name__) logger = getLogger(__name__)
class iLQR(Controller): class iLQR(Controller):
""" iterative Liner Quadratique Regulator """ iterative Liner Quadratique Regulator
@ -16,6 +17,7 @@ class iLQR(Controller):
Intelligent Robots and Systems (pp. 4906-4913). and Study Wolf, Intelligent Robots and Systems (pp. 4906-4913). and Study Wolf,
https://github.com/studywolf/control https://github.com/studywolf/control
""" """
def __init__(self, config, model): def __init__(self, config, model):
""" """
""" """
@ -28,15 +30,15 @@ class iLQR(Controller):
self.state_cost_fn = config.state_cost_fn self.state_cost_fn = config.state_cost_fn
self.terminal_state_cost_fn = config.terminal_state_cost_fn self.terminal_state_cost_fn = config.terminal_state_cost_fn
self.input_cost_fn = config.input_cost_fn self.input_cost_fn = config.input_cost_fn
self.gradient_cost_fn_with_state = config.gradient_cost_fn_with_state self.gradient_cost_fn_state = config.gradient_cost_fn_state
self.gradient_cost_fn_with_input = config.gradient_cost_fn_with_input self.gradient_cost_fn_input = config.gradient_cost_fn_input
self.hessian_cost_fn_with_state = config.hessian_cost_fn_with_state self.hessian_cost_fn_state = config.hessian_cost_fn_state
self.hessian_cost_fn_with_input = config.hessian_cost_fn_with_input self.hessian_cost_fn_input = config.hessian_cost_fn_input
self.hessian_cost_fn_with_input_state = \ self.hessian_cost_fn_input_state = \
config.hessian_cost_fn_with_input_state config.hessian_cost_fn_input_state
# controller parameters # controller parameters
self.max_iter = config.opt_config["iLQR"]["max_iter"] self.max_iters = config.opt_config["iLQR"]["max_iters"]
self.init_mu = config.opt_config["iLQR"]["init_mu"] self.init_mu = config.opt_config["iLQR"]["init_mu"]
self.mu = self.init_mu self.mu = self.init_mu
self.mu_min = config.opt_config["iLQR"]["mu_min"] self.mu_min = config.opt_config["iLQR"]["mu_min"]
@ -79,7 +81,7 @@ class iLQR(Controller):
# line search param # line search param
alphas = 1.1**(-np.arange(10)**2) alphas = 1.1**(-np.arange(10)**2)
while opt_count < self.max_iter: while opt_count < self.max_iters:
accepted_sol = False accepted_sol = False
# forward # forward
@ -130,7 +132,7 @@ class iLQR(Controller):
# increase regularization term. # increase regularization term.
self.delta = max(1.0, self.delta) * self.init_delta self.delta = max(1.0, self.delta) * self.init_delta
self.mu = max(self.mu_min, self.mu * self.delta) self.mu = max(self.mu_min, self.mu * self.delta)
logger.debug("Update regularization term to {}"\ logger.debug("Update regularization term to {}"
.format(self.mu)) .format(self.mu))
if self.mu >= self.mu_max: if self.mu >= self.mu_max:
logger.debug("Reach Max regularization term") logger.debug("Reach Max regularization term")
@ -242,31 +244,31 @@ class iLQR(Controller):
shape(pred_len, input_size, state_size) shape(pred_len, input_size, state_size)
""" """
# l_x.shape = (pred_len+1, state_size) # l_x.shape = (pred_len+1, state_size)
l_x = self.gradient_cost_fn_with_state(pred_xs[:-1], l_x = self.gradient_cost_fn_state(pred_xs[:-1],
g_x[:-1], terminal=False) g_x[:-1], terminal=False)
terminal_l_x = \ terminal_l_x = \
self.gradient_cost_fn_with_state(pred_xs[-1], self.gradient_cost_fn_state(pred_xs[-1],
g_x[-1], terminal=True) g_x[-1], terminal=True)
l_x = np.concatenate((l_x, terminal_l_x), axis=0) l_x = np.concatenate((l_x, terminal_l_x), axis=0)
# l_u.shape = (pred_len, input_size) # l_u.shape = (pred_len, input_size)
l_u = self.gradient_cost_fn_with_input(pred_xs[:-1], sol) l_u = self.gradient_cost_fn_input(pred_xs[:-1], sol)
# l_xx.shape = (pred_len+1, state_size, state_size) # l_xx.shape = (pred_len+1, state_size, state_size)
l_xx = self.hessian_cost_fn_with_state(pred_xs[:-1], l_xx = self.hessian_cost_fn_state(pred_xs[:-1],
g_x[:-1], terminal=False) g_x[:-1], terminal=False)
terminal_l_xx = \ terminal_l_xx = \
self.hessian_cost_fn_with_state(pred_xs[-1], self.hessian_cost_fn_state(pred_xs[-1],
g_x[-1], terminal=True) g_x[-1], terminal=True)
l_xx = np.concatenate((l_xx, terminal_l_xx), axis=0) l_xx = np.concatenate((l_xx, terminal_l_xx), axis=0)
# l_uu.shape = (pred_len, input_size, input_size) # l_uu.shape = (pred_len, input_size, input_size)
l_uu = self.hessian_cost_fn_with_input(pred_xs[:-1], sol) l_uu = self.hessian_cost_fn_input(pred_xs[:-1], sol)
# l_ux.shape = (pred_len, input_size, state_size) # l_ux.shape = (pred_len, input_size, state_size)
l_ux = self.hessian_cost_fn_with_input_state(pred_xs[:-1], sol) l_ux = self.hessian_cost_fn_input_state(pred_xs[:-1], sol)
return l_x, l_xx, l_u, l_uu, l_ux return l_x, l_xx, l_u, l_uu, l_ux

View File

@ -5,6 +5,9 @@ from .mppi import MPPI
from .mppi_williams import MPPIWilliams from .mppi_williams import MPPIWilliams
from .ilqr import iLQR from .ilqr import iLQR
from .ddp import DDP from .ddp import DDP
from .nmpc import NMPC
from .nmpc_cgmres import NMPCCGMRES
def make_controller(args, config, model): def make_controller(args, config, model):
@ -22,5 +25,9 @@ def make_controller(args, config, model):
return iLQR(config, model) return iLQR(config, model)
elif args.controller_type == "DDP": elif args.controller_type == "DDP":
return DDP(config, model) return DDP(config, model)
elif args.controller_type == "NMPC":
return NMPC(config, model)
elif args.controller_type == "NMPCCGMRES":
return NMPCCGMRES(config, model)
raise ValueError("No controller: {}".format(args.controller_type)) raise ValueError("No controller: {}".format(args.controller_type))

View File

@ -9,6 +9,7 @@ from ..envs.cost import calc_cost
logger = getLogger(__name__) logger = getLogger(__name__)
class LinearMPC(Controller): class LinearMPC(Controller):
""" Model Predictive Controller for linear model """ Model Predictive Controller for linear model
@ -21,6 +22,7 @@ class LinearMPC(Controller):
Ref: Ref:
Maciejowski, J. M. (2002). Predictive control: with constraints. Maciejowski, J. M. (2002). Predictive control: with constraints.
""" """
def __init__(self, config, model): def __init__(self, config, model):
""" """
Args: Args:
@ -114,7 +116,7 @@ class LinearMPC(Controller):
for i in range(self.pred_len - 1): for i in range(self.pred_len - 1):
for j in range(self.input_size): for j in range(self.input_size):
temp_F[j * 2: (j + 1) * 2,\ temp_F[j * 2: (j + 1) * 2,
((i+1) * self.input_size) + j] = np.array([1., -1.]) ((i+1) * self.input_size) + j] = np.array([1., -1.])
self.F = np.vstack((self.F, temp_F)) self.F = np.vstack((self.F, temp_F))
@ -187,14 +189,14 @@ class LinearMPC(Controller):
# using cvxopt # using cvxopt
def optimized_func(dt_us): def optimized_func(dt_us):
return (np.dot(dt_us, np.dot(H, dt_us.reshape(-1, 1))) \ return (np.dot(dt_us, np.dot(H, dt_us.reshape(-1, 1)))
- np.dot(G.T, dt_us.reshape(-1, 1)))[0] - np.dot(G.T, dt_us.reshape(-1, 1)))[0]
# constraint # constraint
lb = np.array([-np.inf for _ in range(len(ub))]) # one side cons lb = np.array([-np.inf for _ in range(len(ub))]) # one side cons
cons = LinearConstraint(A, lb, ub) cons = LinearConstraint(A, lb, ub)
# solve # solve
opt_sol = minimize(optimized_func, self.prev_sol.flatten(),\ opt_sol = minimize(optimized_func, self.prev_sol.flatten(),
constraints=[cons]) constraints=[cons])
opt_dt_us = opt_sol.x opt_dt_us = opt_sol.x
@ -213,7 +215,7 @@ class LinearMPC(Controller):
""" """
# to dt form # to dt form
opt_dt_u_seq = np.cumsum(opt_dt_us.reshape(self.pred_len,\ opt_dt_u_seq = np.cumsum(opt_dt_us.reshape(self.pred_len,
self.input_size), self.input_size),
axis=0) axis=0)
self.prev_sol = opt_dt_u_seq.copy() self.prev_sol = opt_dt_u_seq.copy()

View File

@ -8,6 +8,7 @@ from ..envs.cost import calc_cost
logger = getLogger(__name__) logger = getLogger(__name__)
class MPPI(Controller): class MPPI(Controller):
""" Model Predictive Path Integral for linear and nonlinear method """ Model Predictive Path Integral for linear and nonlinear method
@ -18,6 +19,7 @@ class MPPI(Controller):
Deep Dynamics Models for Learning Dexterous Manipulation. Deep Dynamics Models for Learning Dexterous Manipulation.
arXiv preprint arXiv:1909.11652. arXiv preprint arXiv:1909.11652.
""" """
def __init__(self, config, model): def __init__(self, config, model):
super(MPPI, self).__init__(config, model) super(MPPI, self).__init__(config, model)
@ -47,7 +49,7 @@ class MPPI(Controller):
self.input_cost_fn = config.input_cost_fn self.input_cost_fn = config.input_cost_fn
# init mean # init mean
self.prev_sol = np.tile((config.INPUT_UPPER_BOUND \ self.prev_sol = np.tile((config.INPUT_UPPER_BOUND
+ config.INPUT_LOWER_BOUND) / 2., + config.INPUT_LOWER_BOUND) / 2.,
self.pred_len) self.pred_len)
self.prev_sol = self.prev_sol.reshape(self.pred_len, self.input_size) self.prev_sol = self.prev_sol.reshape(self.pred_len, self.input_size)
@ -81,13 +83,13 @@ class MPPI(Controller):
for t in range(self.pred_len): for t in range(self.pred_len):
if t > 0: if t > 0:
noised_inputs[:, t, :] = self.beta \ noised_inputs[:, t, :] = self.beta \
* (self.prev_sol[t, :] \ * (self.prev_sol[t, :]
+ noise[:, t, :]) \ + noise[:, t, :]) \
+ (1 - self.beta) \ + (1 - self.beta) \
* noised_inputs[:, t-1, :] * noised_inputs[:, t-1, :]
else: else:
noised_inputs[:, t, :] = self.beta \ noised_inputs[:, t, :] = self.beta \
* (self.prev_sol[t, :] \ * (self.prev_sol[t, :]
+ noise[:, t, :]) \ + noise[:, t, :]) \
+ (1 - self.beta) \ + (1 - self.beta) \
* self.history_u[-1] * self.history_u[-1]

View File

@ -8,6 +8,7 @@ from ..envs.cost import calc_cost
logger = getLogger(__name__) logger = getLogger(__name__)
class MPPIWilliams(Controller): class MPPIWilliams(Controller):
""" Model Predictive Path Integral for linear and nonlinear method """ Model Predictive Path Integral for linear and nonlinear method
@ -19,6 +20,7 @@ class MPPIWilliams(Controller):
2017 IEEE International Conference on Robotics and Automation (ICRA), 2017 IEEE International Conference on Robotics and Automation (ICRA),
Singapore, 2017, pp. 1714-1721. Singapore, 2017, pp. 1714-1721.
""" """
def __init__(self, config, model): def __init__(self, config, model):
super(MPPIWilliams, self).__init__(config, model) super(MPPIWilliams, self).__init__(config, model)
@ -47,7 +49,7 @@ class MPPIWilliams(Controller):
self.input_cost_fn = config.input_cost_fn self.input_cost_fn = config.input_cost_fn
# init mean # init mean
self.prev_sol = np.tile((config.INPUT_UPPER_BOUND \ self.prev_sol = np.tile((config.INPUT_UPPER_BOUND
+ config.INPUT_LOWER_BOUND) / 2., + config.INPUT_LOWER_BOUND) / 2.,
self.pred_len) self.pred_len)
self.prev_sol = self.prev_sol.reshape(self.pred_len, self.input_size) self.prev_sol = self.prev_sol.reshape(self.pred_len, self.input_size)
@ -85,7 +87,7 @@ class MPPIWilliams(Controller):
# get particle cost # get particle cost
costs = calc_cost(pred_xs, samples, g_xs, costs = calc_cost(pred_xs, samples, g_xs,
self.state_cost_fn, None, \ self.state_cost_fn, None,
self.terminal_state_cost_fn) self.terminal_state_cost_fn)
return costs return costs

View File

@ -0,0 +1,109 @@
from logging import getLogger
import numpy as np
import scipy.stats as stats
from .controller import Controller
from ..envs.cost import calc_cost
from ..common.utils import line_search
logger = getLogger(__name__)
class NMPC(Controller):
def __init__(self, config, model):
""" Nonlinear Model Predictive Control using pure gradient algorithm
"""
super(NMPC, self).__init__(config, model)
# model
self.model = model
# get cost func
self.state_cost_fn = config.state_cost_fn
self.terminal_state_cost_fn = config.terminal_state_cost_fn
self.input_cost_fn = config.input_cost_fn
# controller parameters
self.threshold = config.opt_config["NMPC"]["threshold"]
self.max_iters = config.opt_config["NMPC"]["max_iters"]
self.learning_rate = config.opt_config["NMPC"]["learning_rate"]
self.optimizer_mode = config.opt_config["NMPC"]["optimizer_mode"]
# general parameters
self.pred_len = config.PRED_LEN
self.input_size = config.INPUT_SIZE
self.dt = config.DT
# initialize
self.prev_sol = np.zeros((self.pred_len, self.input_size))
def obtain_sol(self, curr_x, g_xs):
""" calculate the optimal inputs
Args:
curr_x (numpy.ndarray): current state, shape(state_size, )
g_xs (numpy.ndarrya): goal trajectory, shape(plan_len, state_size)
Returns:
opt_input (numpy.ndarray): optimal input, shape(input_size, )
"""
sol = self.prev_sol.copy()
count = 0
# use for Conjugate method
conjugate_d = None
conjugate_prev_d = None
conjugate_s = None
conjugate_beta = None
while True:
# shape(pred_len+1, state_size)
pred_xs = self.model.predict_traj(curr_x, sol)
# shape(pred_len, state_size)
pred_lams = self.model.predict_adjoint_traj(pred_xs, sol, g_xs)
F_hat = self.config.gradient_hamiltonian_input(
pred_xs, pred_lams, sol, g_xs)
if np.linalg.norm(F_hat) < self.threshold:
break
if count > self.max_iters:
logger.debug(" break max iteartion at F : `{}".format(
np.linalg.norm(F_hat)))
break
if self.optimizer_mode == "conjugate":
conjugate_d = F_hat.flatten()
if conjugate_prev_d is None: # initial
conjugate_s = conjugate_d
conjugate_prev_d = conjugate_d
F_hat = conjugate_s.reshape(F_hat.shape)
else:
prev_d = np.dot(conjugate_prev_d, conjugate_prev_d)
d = np.dot(conjugate_d, conjugate_d - conjugate_prev_d)
conjugate_beta = (d + 1e-6) / (prev_d + 1e-6)
conjugate_s = conjugate_d + conjugate_beta * conjugate_s
conjugate_prev_d = conjugate_d
F_hat = conjugate_s.reshape(F_hat.shape)
def compute_eval_val(u):
pred_xs = self.model.predict_traj(curr_x, u)
state_cost = np.sum(self.config.state_cost_fn(
pred_xs[1:-1], g_xs[1:-1]))
input_cost = np.sum(self.config.input_cost_fn(u))
terminal_cost = np.sum(
self.config.terminal_state_cost_fn(pred_xs[-1], g_xs[-1]))
return state_cost + input_cost + terminal_cost
alpha = line_search(F_hat, sol,
compute_eval_val, init_alpha=self.learning_rate)
sol -= alpha * F_hat
count += 1
# update us for next optimization
self.prev_sol = np.concatenate(
(sol[1:], np.zeros((1, self.input_size))), axis=0)
return sol[0]

View File

@ -0,0 +1,167 @@
from logging import getLogger
import numpy as np
import scipy.stats as stats
from .controller import Controller
from ..envs.cost import calc_cost
from ..common.utils import line_search
logger = getLogger(__name__)
class NMPCCGMRES(Controller):
def __init__(self, config, model):
""" Nonlinear Model Predictive Control using cgmres
"""
super(NMPCCGMRES, self).__init__(config, model)
# model
self.model = model
# get cost func
self.state_cost_fn = config.state_cost_fn
self.terminal_state_cost_fn = config.terminal_state_cost_fn
self.input_cost_fn = config.input_cost_fn
# general parameters
self.pred_len = config.PRED_LEN
self.input_size = config.INPUT_SIZE
self.dt = config.DT
# controller parameters
self.threshold = config.opt_config["NMPCCGMRES"]["threshold"]
self.zeta = config.opt_config["NMPCCGMRES"]["zeta"]
self.delta = config.opt_config["NMPCCGMRES"]["delta"]
self.alpha = config.opt_config["NMPCCGMRES"]["alpha"]
self.tf = config.opt_config["NMPCCGMRES"]["tf"]
self.divide_num = config.PRED_LEN
self.with_constraint = config.opt_config["NMPCCGMRES"]["constraint"]
if not self.with_constraint:
raise NotImplementedError
# 3 means u, dummy_u, raw
self.max_iters = 3 * self.input_size * self.divide_num
# initialize
self.prev_sol = np.zeros((self.pred_len, self.input_size))
self.opt_count = 1
# add smaller than constraints value
input_constraint = np.abs([config.INPUT_LOWER_BOUND])
self.prev_dummy_sol = np.ones(
(self.pred_len, self.input_size)) * input_constraint - 1e-3
# add bigger than 0.01 to avoid computational error
self.prev_raw = np.zeros(
(self.pred_len, self.input_size)) + 0.01 + 1e-3
def _compute_f(self, curr_x, sol, g_xs, dummy_sol=None, raw=None):
# shape(pred_len+1, state_size)
pred_xs = self.model.predict_traj(curr_x, sol)
# shape(pred_len, state_size)
pred_lams = self.model.predict_adjoint_traj(pred_xs, sol, g_xs)
if self.with_constraint:
F = self.config.gradient_hamiltonian_input_with_constraint(
pred_xs, pred_lams, sol, g_xs, dummy_sol, raw)
return F
else:
raise NotImplementedError
def obtain_sol(self, curr_x, g_xs):
""" calculate the optimal inputs
Args:
curr_x (numpy.ndarray): current state, shape(state_size, )
g_xs (numpy.ndarrya): goal trajectory, shape(plan_len, state_size)
Returns:
opt_input (numpy.ndarray): optimal input, shape(input_size, )
"""
sol = self.prev_sol.copy()
dummy_sol = self.prev_dummy_sol.copy()
raw = self.prev_raw.copy()
# compute delta t
time = self.dt * self.opt_count
dt = self.tf * (1. - np.exp(-self.alpha * time)) / \
float(self.divide_num)
self.model.dt = dt
# compute Fxt
x_dot = self.model.x_dot(curr_x, sol[0])
dx = x_dot * self.delta
Fxt = self._compute_f(curr_x+dx, sol, g_xs, dummy_sol, raw).flatten()
# compute F
F = self._compute_f(curr_x, sol, g_xs, dummy_sol, raw).flatten()
right = - self.zeta * F - ((Fxt - F) / self.delta)
# compute Fuxt
du = sol * self.delta
ddummy_u = dummy_sol * self.delta
draw = raw * self.delta
Fuxt = self._compute_f(curr_x+dx, sol+du, g_xs,
dummy_sol+ddummy_u, raw+draw).flatten()
left = ((Fuxt - Fxt) / self.delta)
r0 = right - left
r0_norm = np.linalg.norm(r0)
vs = np.zeros((self.max_iters, self.max_iters + 1))
vs[:, 0] = r0 / r0_norm
hs = np.zeros((self.max_iters + 1, self.max_iters + 1))
e = np.zeros((self.max_iters + 1, 1))
e[0] = 1.
for i in range(self.max_iters):
reshaped_vs = vs.reshape(
(self.divide_num, 3, self.input_size, self.max_iters+1))
du = reshaped_vs[:, 0, :, i] * self.delta
ddummy_u = reshaped_vs[:, 1, :, i] * self.delta
draw = reshaped_vs[:, 2, :, i] * self.delta
Fuxt = self._compute_f(
curr_x+dx, sol+du, g_xs, dummy_sol+ddummy_u, raw+draw).flatten()
Av = ((Fuxt - Fxt) / self.delta)
sum_Av = np.zeros(self.max_iters)
for j in range(i + 1):
hs[j, i] = np.dot(Av, vs[:, j])
sum_Av = sum_Av + hs[j, i] * vs[:, j]
v_est = Av - sum_Av
hs[i+1, i] = np.linalg.norm(v_est)
vs[:, i+1] = v_est / hs[i+1, i]
inv_hs = np.linalg.pinv(hs[:i+1, :i])
ys = np.dot(inv_hs, r0_norm * e[:i+1])
judge_value = r0_norm * e[:i+1] - np.dot(hs[:i+1, :i], ys[:i])
if np.linalg.norm(judge_value) < self.threshold or i == self.max_iters-1:
update_value = np.dot(vs[:, :i-1], ys_pre[:i-1]).flatten()
update_value = update_value.reshape(
(self.divide_num, 3, self.input_size))
du_new = du + update_value[:, 0, :]
ddummy_u_new = ddummy_u + update_value[:, 1, :]
draw_new = draw + update_value[:, 2, :]
break
ys_pre = ys
sol += du_new * self.delta
dummy_sol += ddummy_u_new * self.delta
raw += draw_new * self.delta
F = self._compute_f(curr_x, sol, g_xs, dummy_sol, raw)
logger.debug("check F = {0}".format(np.linalg.norm(F)))
self.prev_sol = sol.copy()
self.prev_dummy_sol = dummy_sol.copy()
self.prev_raw = raw.copy()
self.opt_count += 1
return sol[0]

View File

@ -8,6 +8,7 @@ from ..envs.cost import calc_cost
logger = getLogger(__name__) logger = getLogger(__name__)
class RandomShooting(Controller): class RandomShooting(Controller):
""" Random Shooting Method for linear and nonlinear method """ Random Shooting Method for linear and nonlinear method
@ -19,6 +20,7 @@ class RandomShooting(Controller):
using probabilistic dynamics models. using probabilistic dynamics models.
In Advances in Neural Information Processing Systems (pp. 4754-4765). In Advances in Neural Information Processing Systems (pp. 4754-4765).
""" """
def __init__(self, config, model): def __init__(self, config, model):
super(RandomShooting, self).__init__(config, model) super(RandomShooting, self).__init__(config, model)

View File

@ -4,6 +4,7 @@ from matplotlib.axes import Axes
from .env import Env from .env import Env
from ..plotters.plot_objs import square from ..plotters.plot_objs import square
class CartPoleEnv(Env): class CartPoleEnv(Env):
""" Cartpole Environment """ Cartpole Environment
@ -13,6 +14,7 @@ class CartPoleEnv(Env):
6-832-underactuated-robotics-spring-2009/readings/ 6-832-underactuated-robotics-spring-2009/readings/
MIT6_832s09_read_ch03.pdf MIT6_832s09_read_ch03.pdf
""" """
def __init__(self): def __init__(self):
""" """
""" """
@ -76,21 +78,21 @@ class CartPoleEnv(Env):
# x # x
d_x0 = self.curr_x[1] d_x0 = self.curr_x[1]
# v_x # v_x
d_x1 = (u[0] + self.config["mp"] * np.sin(self.curr_x[2]) \ d_x1 = (u[0] + self.config["mp"] * np.sin(self.curr_x[2])
* (self.config["l"] * (self.curr_x[3]**2) \ * (self.config["l"] * (self.curr_x[3]**2)
+ self.config["g"] * np.cos(self.curr_x[2]))) \ + self.config["g"] * np.cos(self.curr_x[2]))) \
/ (self.config["mc"] + self.config["mp"] \ / (self.config["mc"] + self.config["mp"]
* (np.sin(self.curr_x[2])**2)) * (np.sin(self.curr_x[2])**2))
# theta # theta
d_x2 = self.curr_x[3] d_x2 = self.curr_x[3]
# v_theta # v_theta
d_x3 = (-u[0] * np.cos(self.curr_x[2]) \ d_x3 = (-u[0] * np.cos(self.curr_x[2])
- self.config["mp"] * self.config["l"] * (self.curr_x[3]**2) \ - self.config["mp"] * self.config["l"] * (self.curr_x[3]**2)
* np.cos(self.curr_x[2]) * np.sin(self.curr_x[2]) \ * np.cos(self.curr_x[2]) * np.sin(self.curr_x[2])
- (self.config["mc"] + self.config["mp"]) * self.config["g"] \ - (self.config["mc"] + self.config["mp"]) * self.config["g"]
* np.sin(self.curr_x[2])) \ * np.sin(self.curr_x[2])) \
/ (self.config["l"] * (self.config["mc"] + self.config["mp"] \ / (self.config["l"] * (self.config["mc"] + self.config["mp"]
* (np.sin(self.curr_x[2])**2))) * (np.sin(self.curr_x[2])**2)))
next_x = self.curr_x +\ next_x = self.curr_x +\
@ -134,10 +136,10 @@ class CartPoleEnv(Env):
imgs["cart"] = to_plot.plot([], [], c="k")[0] imgs["cart"] = to_plot.plot([], [], c="k")[0]
imgs["pole"] = to_plot.plot([], [], c="k", linewidth=5)[0] imgs["pole"] = to_plot.plot([], [], c="k", linewidth=5)[0]
imgs["center"] = to_plot.plot([], [], marker="o", c="k",\ imgs["center"] = to_plot.plot([], [], marker="o", c="k",
markersize=10)[0] markersize=10)[0]
# centerline # centerline
to_plot.plot(np.linspace(-1., 1., num=50), np.zeros(50),\ to_plot.plot(np.linspace(-1., 1., num=50), np.zeros(50),
c="k", linestyle="dashed") c="k", linestyle="dashed")
# set axis # set axis
@ -166,13 +168,13 @@ class CartPoleEnv(Env):
pole_y (numpy.ndarray): y data of pole pole_y (numpy.ndarray): y data of pole
""" """
# cart # cart
cart_x, cart_y = square(curr_x[0], 0.,\ cart_x, cart_y = square(curr_x[0], 0.,
self.config["cart_size"], 0.) self.config["cart_size"], 0.)
# pole # pole
pole_x = np.array([curr_x[0], curr_x[0] + self.config["l"] \ pole_x = np.array([curr_x[0], curr_x[0] + self.config["l"]
* np.cos(curr_x[2]-np.pi/2)]) * np.cos(curr_x[2]-np.pi/2)])
pole_y = np.array([0., self.config["l"] \ pole_y = np.array([0., self.config["l"]
* np.sin(curr_x[2]-np.pi/2)]) * np.sin(curr_x[2]-np.pi/2)])
return cart_x, cart_y, pole_x, pole_y return cart_x, cart_y, pole_x, pole_y

View File

@ -4,6 +4,7 @@ import numpy as np
logger = getLogger(__name__) logger = getLogger(__name__)
def calc_cost(pred_xs, input_sample, g_xs, def calc_cost(pred_xs, input_sample, g_xs,
state_cost_fn, input_cost_fn, terminal_state_cost_fn): state_cost_fn, input_cost_fn, terminal_state_cost_fn):
""" calculate the cost """ calculate the cost
@ -24,7 +25,8 @@ def calc_cost(pred_xs, input_sample, g_xs,
# state cost # state cost
state_cost = 0. state_cost = 0.
if state_cost_fn is not None: if state_cost_fn is not None:
state_pred_par_cost = state_cost_fn(pred_xs[:, 1:-1, :], g_xs[:, 1:-1, :]) state_pred_par_cost = state_cost_fn(
pred_xs[:, 1:-1, :], g_xs[:, 1:-1, :])
state_cost = np.sum(np.sum(state_pred_par_cost, axis=-1), axis=-1) state_cost = np.sum(np.sum(state_pred_par_cost, axis=-1), axis=-1)
# terminal cost # terminal cost

View File

@ -1,5 +1,6 @@
import numpy as np import numpy as np
class Env(): class Env():
""" Environments class """ Environments class
Attributes: Attributes:
@ -8,6 +9,7 @@ class Env():
history_x (list[numpy.ndarray]): historty of state, shape(step_count*state_size) history_x (list[numpy.ndarray]): historty of state, shape(step_count*state_size)
step_count (int): step count step_count (int): step count
""" """
def __init__(self, config): def __init__(self, config):
""" """
""" """

View File

@ -3,17 +3,19 @@ import scipy
from scipy import integrate from scipy import integrate
from .env import Env from .env import Env
class FirstOrderLagEnv(Env): class FirstOrderLagEnv(Env):
""" First Order Lag System Env """ First Order Lag System Env
""" """
def __init__(self, tau=0.63): def __init__(self, tau=0.63):
""" """
""" """
self.config = {"state_size" : 4,\ self.config = {"state_size": 4,
"input_size" : 2,\ "input_size": 2,
"dt" : 0.05,\ "dt": 0.05,
"max_step" : 500,\ "max_step": 500,
"input_lower_bound": [-0.5, -0.5],\ "input_lower_bound": [-0.5, -0.5],
"input_upper_bound": [0.5, 0.5], "input_upper_bound": [0.5, 0.5],
} }

View File

@ -2,6 +2,8 @@ from .first_order_lag import FirstOrderLagEnv
from .two_wheeled import TwoWheeledConstEnv from .two_wheeled import TwoWheeledConstEnv
from .two_wheeled import TwoWheeledTrackEnv from .two_wheeled import TwoWheeledTrackEnv
from .cartpole import CartPoleEnv from .cartpole import CartPoleEnv
from .nonlinear_sample_system import NonlinearSampleSystemEnv
def make_env(args): def make_env(args):
@ -13,5 +15,7 @@ def make_env(args):
return TwoWheeledTrackEnv() return TwoWheeledTrackEnv()
elif args.env == "CartPole": elif args.env == "CartPole":
return CartPoleEnv() return CartPoleEnv()
elif args.env == "NonlinearSample":
return NonlinearSampleSystemEnv()
raise NotImplementedError("There is not {} Env".format(args.env)) raise NotImplementedError("There is not {} Env".format(args.env))

View File

@ -4,21 +4,23 @@ from scipy import integrate
from .env import Env from .env import Env
from ..common.utils import update_state_with_Runge_Kutta from ..common.utils import update_state_with_Runge_Kutta
class NonlinearSampleEnv(Env):
class NonlinearSampleSystemEnv(Env):
""" Nonlinear Sample Env """ Nonlinear Sample Env
""" """
def __init__(self): def __init__(self):
""" """
""" """
self.config = {"state_size" : 2,\ self.config = {"state_size": 2,
"input_size" : 1,\ "input_size": 1,
"dt" : 0.01,\ "dt": 0.01,
"max_step" : 250,\ "max_step": 2000,
"input_lower_bound": [-0.5],\ "input_lower_bound": [-0.5],
"input_upper_bound": [0.5], "input_upper_bound": [0.5],
} }
super(NonlinearSampleEnv, self).__init__(self.config) super(NonlinearSampleSystemEnv, self).__init__(self.config)
def reset(self, init_x=np.array([2., 0.])): def reset(self, init_x=np.array([2., 0.])):
""" reset state """ reset state
@ -57,10 +59,11 @@ class NonlinearSampleEnv(Env):
self.config["input_lower_bound"], self.config["input_lower_bound"],
self.config["input_upper_bound"]) self.config["input_upper_bound"])
funtions = [self._func_x_1, self._func_x_2] functions = [self._func_x_1, self._func_x_2]
next_x = update_state_with_Runge_Kutta(self._curr_x, u, next_x = update_state_with_Runge_Kutta(self.curr_x, u,
functions, self.config["dt"]) functions, self.config["dt"],
batch=False)
# cost # cost
cost = 0 cost = 0
@ -80,19 +83,15 @@ class NonlinearSampleEnv(Env):
self.step_count > self.config["max_step"], \ self.step_count > self.config["max_step"], \
{"goal_state": self.g_x} {"goal_state": self.g_x}
def _func_x_1(self, x_1, x_2, u): def _func_x_1(self, x, u):
""" x_dot = x[1]
"""
x_dot = x_2
return x_dot return x_dot
def _func_x_2(self, x_1, x_2, u): def _func_x_2(self, x, u):
""" x_dot = (1. - x[0]**2 - x[1]**2) * x[1] - x[0] + u[0]
"""
x_dot = (1. - x_1**2 - x_2**2) * x_2 - x_1 + u
return x_dot return x_dot
def plot_func(self, to_plot, i=None, history_x=None, history_g_x=None): def plot_func(self, to_plot, i=None, history_x=None, history_g_x=None):
""" """
""" """
raise ValueError("NonlinearSampleEnv does not have animation") raise ValueError("NonlinearSampleSystemEnv does not have animation")

View File

@ -5,6 +5,7 @@ import matplotlib.pyplot as plt
from .env import Env from .env import Env
from ..plotters.plot_objs import circle_with_angle, square, circle from ..plotters.plot_objs import circle_with_angle, square, circle
def step_two_wheeled_env(curr_x, u, dt, method="Oylar"): def step_two_wheeled_env(curr_x, u, dt, method="Oylar"):
""" step two wheeled enviroment """ step two wheeled enviroment
@ -28,19 +29,21 @@ def step_two_wheeled_env(curr_x, u, dt, method="Oylar"):
return next_x return next_x
class TwoWheeledConstEnv(Env): class TwoWheeledConstEnv(Env):
""" Two wheeled robot with constant goal Env """ Two wheeled robot with constant goal Env
""" """
def __init__(self): def __init__(self):
""" """
""" """
self.config = {"state_size" : 3,\ self.config = {"state_size": 3,
"input_size" : 2,\ "input_size": 2,
"dt" : 0.01,\ "dt": 0.01,
"max_step" : 500,\ "max_step": 500,
"input_lower_bound": (-1.5, -3.14),\ "input_lower_bound": (-1.5, -3.14),
"input_upper_bound": (1.5, 3.14),\ "input_upper_bound": (1.5, 3.14),
"car_size": 0.2,\ "car_size": 0.2,
"wheel_size": (0.075, 0.015) "wheel_size": (0.075, 0.015)
} }
@ -55,7 +58,9 @@ class TwoWheeledConstEnv(Env):
""" """
self.step_count = 0 self.step_count = 0
self.curr_x = np.zeros(self.config["state_size"]) noise = np.clip(np.random.randn(3), -0.1, 0.1)
noise *= 0.1
self.curr_x = np.zeros(self.config["state_size"]) + noise
if init_x is not None: if init_x is not None:
self.curr_x = init_x self.curr_x = init_x
@ -160,10 +165,10 @@ class TwoWheeledConstEnv(Env):
self.config["car_size"], curr_x[2]) self.config["car_size"], curr_x[2])
# left tire # left tire
center_x = (self.config["car_size"] \ center_x = (self.config["car_size"]
+ self.config["wheel_size"][1]) \ + self.config["wheel_size"][1]) \
* np.cos(curr_x[2]-np.pi/2.) + curr_x[0] * np.cos(curr_x[2]-np.pi/2.) + curr_x[0]
center_y = (self.config["car_size"] \ center_y = (self.config["car_size"]
+ self.config["wheel_size"][1]) \ + self.config["wheel_size"][1]) \
* np.sin(curr_x[2]-np.pi/2.) + curr_x[1] * np.sin(curr_x[2]-np.pi/2.) + curr_x[1]
@ -172,10 +177,10 @@ class TwoWheeledConstEnv(Env):
self.config["wheel_size"], curr_x[2]) self.config["wheel_size"], curr_x[2])
# right tire # right tire
center_x = (self.config["car_size"] \ center_x = (self.config["car_size"]
+ self.config["wheel_size"][1]) \ + self.config["wheel_size"][1]) \
* np.cos(curr_x[2]+np.pi/2.) + curr_x[0] * np.cos(curr_x[2]+np.pi/2.) + curr_x[0]
center_y = (self.config["car_size"] \ center_y = (self.config["car_size"]
+ self.config["wheel_size"][1]) \ + self.config["wheel_size"][1]) \
* np.sin(curr_x[2]+np.pi/2.) + curr_x[1] * np.sin(curr_x[2]+np.pi/2.) + curr_x[1]
@ -187,19 +192,21 @@ class TwoWheeledConstEnv(Env):
left_tire_x, left_tire_y,\ left_tire_x, left_tire_y,\
right_tire_x, right_tire_y right_tire_x, right_tire_y
class TwoWheeledTrackEnv(Env): class TwoWheeledTrackEnv(Env):
""" Two wheeled robot with constant goal Env """ Two wheeled robot with constant goal Env
""" """
def __init__(self): def __init__(self):
""" """
""" """
self.config = {"state_size" : 3,\ self.config = {"state_size": 3,
"input_size" : 2,\ "input_size": 2,
"dt" : 0.01,\ "dt": 0.01,
"max_step" : 1000,\ "max_step": 1000,
"input_lower_bound": (-1.5, -3.14),\ "input_lower_bound": (-1.5, -3.14),
"input_upper_bound": (1.5, 3.14),\ "input_upper_bound": (1.5, 3.14),
"car_size": 0.2,\ "car_size": 0.2,
"wheel_size": (0.075, 0.015) "wheel_size": (0.075, 0.015)
} }
@ -247,7 +254,9 @@ class TwoWheeledTrackEnv(Env):
""" """
self.step_count = 0 self.step_count = 0
self.curr_x = np.zeros(self.config["state_size"]) noise = np.clip(np.random.randn(3), -0.1, 0.1)
noise *= 0.01
self.curr_x = np.zeros(self.config["state_size"]) + noise
if init_x is not None: if init_x is not None:
self.curr_x = init_x self.curr_x = init_x
@ -354,10 +363,10 @@ class TwoWheeledTrackEnv(Env):
self.config["car_size"], curr_x[2]) self.config["car_size"], curr_x[2])
# left tire # left tire
center_x = (self.config["car_size"] \ center_x = (self.config["car_size"]
+ self.config["wheel_size"][1]) \ + self.config["wheel_size"][1]) \
* np.cos(curr_x[2]-np.pi/2.) + curr_x[0] * np.cos(curr_x[2]-np.pi/2.) + curr_x[0]
center_y = (self.config["car_size"] \ center_y = (self.config["car_size"]
+ self.config["wheel_size"][1]) \ + self.config["wheel_size"][1]) \
* np.sin(curr_x[2]-np.pi/2.) + curr_x[1] * np.sin(curr_x[2]-np.pi/2.) + curr_x[1]
@ -366,10 +375,10 @@ class TwoWheeledTrackEnv(Env):
self.config["wheel_size"], curr_x[2]) self.config["wheel_size"], curr_x[2])
# right tire # right tire
center_x = (self.config["car_size"] \ center_x = (self.config["car_size"]
+ self.config["wheel_size"][1]) \ + self.config["wheel_size"][1]) \
* np.cos(curr_x[2]+np.pi/2.) + curr_x[0] * np.cos(curr_x[2]+np.pi/2.) + curr_x[0]
center_y = (self.config["car_size"] \ center_y = (self.config["car_size"]
+ self.config["wheel_size"][1]) \ + self.config["wheel_size"][1]) \
* np.sin(curr_x[2]+np.pi/2.) + curr_x[1] * np.sin(curr_x[2]+np.pi/2.) + curr_x[1]

View File

@ -7,6 +7,7 @@ import six
import pickle import pickle
from logging import DEBUG, basicConfig, getLogger, FileHandler, StreamHandler, Formatter, Logger from logging import DEBUG, basicConfig, getLogger, FileHandler, StreamHandler, Formatter, Logger
def make_logger(save_dir): def make_logger(save_dir):
""" """
Args: Args:
@ -33,6 +34,7 @@ def make_logger(save_dir):
# sh_handler = StreamHandler() # sh_handler = StreamHandler()
# logger.addHandler(sh_handler) # logger.addHandler(sh_handler)
def int_tuple(s): def int_tuple(s):
""" transform str to tuple """ transform str to tuple
Args: Args:
@ -42,6 +44,7 @@ def int_tuple(s):
""" """
return tuple(int(i) for i in s.split(',')) return tuple(int(i) for i in s.split(','))
def bool_flag(s): def bool_flag(s):
""" transform str to bool flg """ transform str to bool flg
Args: Args:
@ -54,6 +57,7 @@ def bool_flag(s):
msg = 'Invalid value "%s" for bool flag (should be 0 or 1)' msg = 'Invalid value "%s" for bool flag (should be 0 or 1)'
raise ValueError(msg % s) raise ValueError(msg % s)
def file_exists(path): def file_exists(path):
""" Check file existence on given path """ Check file existence on given path
Args: Args:
@ -63,6 +67,7 @@ def file_exists(path):
""" """
return os.path.exists(path) return os.path.exists(path)
def create_dir_if_not_exist(outdir): def create_dir_if_not_exist(outdir):
""" Check directory existence and creates new directory if not exist """ Check directory existence and creates new directory if not exist
Args: Args:
@ -77,6 +82,7 @@ def create_dir_if_not_exist(outdir):
return return
os.makedirs(outdir) os.makedirs(outdir)
def write_text_to_file(file_path, data): def write_text_to_file(file_path, data):
""" Write given text data to file """ Write given text data to file
Args: Args:
@ -86,6 +92,7 @@ def write_text_to_file(file_path, data):
with open(file_path, 'w') as f: with open(file_path, 'w') as f:
f.write(data) f.write(data)
def read_text_from_file(file_path): def read_text_from_file(file_path):
""" Read given file as text """ Read given file as text
Args: Args:
@ -96,6 +103,7 @@ def read_text_from_file(file_path):
with open(file_path, 'r') as f: with open(file_path, 'r') as f:
return f.read() return f.read()
def save_pickle(file_path, data): def save_pickle(file_path, data):
""" pickle given data to file """ pickle given data to file
Args: Args:
@ -105,6 +113,7 @@ def save_pickle(file_path, data):
with open(file_path, 'wb') as f: with open(file_path, 'wb') as f:
pickle.dump(data, f) pickle.dump(data, f)
def load_pickle(file_path): def load_pickle(file_path):
""" load pickled data from file """ load pickled data from file
Args: Args:
@ -118,6 +127,7 @@ def load_pickle(file_path):
else: else:
return pickle.load(f, encoding='bytes') return pickle.load(f, encoding='bytes')
def prepare_output_dir(base_dir, args, time_format='%Y-%m-%d-%H%M%S'): def prepare_output_dir(base_dir, args, time_format='%Y-%m-%d-%H%M%S'):
""" prepare a directory with current datetime as name. """ prepare a directory with current datetime as name.
created directory contains the command and args when the script was called as text file. created directory contains the command and args when the script was called as text file.

View File

@ -2,9 +2,11 @@ import numpy as np
from .model import Model from .model import Model
class CartPoleModel(Model): class CartPoleModel(Model):
""" cartpole model """ cartpole model
""" """
def __init__(self, config): def __init__(self, config):
""" """
""" """
@ -31,16 +33,16 @@ class CartPoleModel(Model):
# x # x
d_x0 = curr_x[1] d_x0 = curr_x[1]
# v_x # v_x
d_x1 = (u[0] + self.mp * np.sin(curr_x[2]) \ d_x1 = (u[0] + self.mp * np.sin(curr_x[2])
* (self.l * (curr_x[3]**2) \ * (self.l * (curr_x[3]**2)
+ self.g * np.cos(curr_x[2]))) \ + self.g * np.cos(curr_x[2]))) \
/ (self.mc + self.mp * (np.sin(curr_x[2])**2)) / (self.mc + self.mp * (np.sin(curr_x[2])**2))
# theta # theta
d_x2 = curr_x[3] d_x2 = curr_x[3]
# v_theta # v_theta
d_x3 = (-u[0] * np.cos(curr_x[2]) \ d_x3 = (-u[0] * np.cos(curr_x[2])
- self.mp * self.l * (curr_x[3]**2) \ - self.mp * self.l * (curr_x[3]**2)
* np.cos(curr_x[2]) * np.sin(curr_x[2]) \ * np.cos(curr_x[2]) * np.sin(curr_x[2])
- (self.mc + self.mp) * self.g * np.sin(curr_x[2])) \ - (self.mc + self.mp) * self.g * np.sin(curr_x[2])) \
/ (self.l * (self.mc + self.mp * (np.sin(curr_x[2])**2))) / (self.l * (self.mc + self.mp * (np.sin(curr_x[2])**2)))
@ -53,16 +55,16 @@ class CartPoleModel(Model):
# x # x
d_x0 = curr_x[:, 1] d_x0 = curr_x[:, 1]
# v_x # v_x
d_x1 = (u[:, 0] + self.mp * np.sin(curr_x[:, 2]) \ d_x1 = (u[:, 0] + self.mp * np.sin(curr_x[:, 2])
* (self.l * (curr_x[:, 3]**2) \ * (self.l * (curr_x[:, 3]**2)
+ self.g * np.cos(curr_x[:, 2]))) \ + self.g * np.cos(curr_x[:, 2]))) \
/ (self.mc + self.mp * (np.sin(curr_x[:, 2])**2)) / (self.mc + self.mp * (np.sin(curr_x[:, 2])**2))
# theta # theta
d_x2 = curr_x[:, 3] d_x2 = curr_x[:, 3]
# v_theta # v_theta
d_x3 = (-u[:, 0] * np.cos(curr_x[:, 2]) \ d_x3 = (-u[:, 0] * np.cos(curr_x[:, 2])
- self.mp * self.l * (curr_x[:, 3]**2) \ - self.mp * self.l * (curr_x[:, 3]**2)
* np.cos(curr_x[:, 2]) * np.sin(curr_x[:, 2]) \ * np.cos(curr_x[:, 2]) * np.sin(curr_x[:, 2])
- (self.mc + self.mp) * self.g * np.sin(curr_x[:, 2])) \ - (self.mc + self.mp) * self.g * np.sin(curr_x[:, 2])) \
/ (self.l * (self.mc + self.mp * (np.sin(curr_x[:, 2])**2))) / (self.l * (self.mc + self.mp * (np.sin(curr_x[:, 2])**2)))
@ -99,31 +101,31 @@ class CartPoleModel(Model):
tmp2 = 1. / (self.mc + self.mp * (np.sin(xs[:, 2])**2)) tmp2 = 1. / (self.mc + self.mp * (np.sin(xs[:, 2])**2))
f_x[:, 1, 2] = - us[:, 0] * tmp \ f_x[:, 1, 2] = - us[:, 0] * tmp \
- tmp * (self.mp * np.sin(xs[:, 2]) \ - tmp * (self.mp * np.sin(xs[:, 2])
* (self.l * xs[:, 3]**2 \ * (self.l * xs[:, 3]**2
+ self.g * np.cos(xs[:, 2]))) \ + self.g * np.cos(xs[:, 2]))) \
+ tmp2 * (self.mp * np.cos(xs[:, 2]) * self.l \ + tmp2 * (self.mp * np.cos(xs[:, 2]) * self.l
* xs[:, 3]**2 \ * xs[:, 3]**2
+ self.mp * self.g * (np.cos(xs[:, 2])**2 \ + self.mp * self.g * (np.cos(xs[:, 2])**2
- np.sin(xs[:, 2])**2)) - np.sin(xs[:, 2])**2))
f_x[:, 3, 2] = - 1. / self.l * tmp \ f_x[:, 3, 2] = - 1. / self.l * tmp \
* (-us[:, 0] * np.cos(xs[:, 2]) \ * (-us[:, 0] * np.cos(xs[:, 2])
- self.mp * self.l * (xs[:, 3]**2) \ - self.mp * self.l * (xs[:, 3]**2)
* np.cos(xs[:, 2]) * np.sin(xs[:, 2]) \ * np.cos(xs[:, 2]) * np.sin(xs[:, 2])
- (self.mc + self.mp) * self.g * np.sin(xs[:, 2])) \ - (self.mc + self.mp) * self.g * np.sin(xs[:, 2])) \
+ 1. / self.l * tmp2 \ + 1. / self.l * tmp2 \
* (us[:, 0] * np.sin(xs[:, 2]) \ * (us[:, 0] * np.sin(xs[:, 2])
- self.mp * self.l * xs[:, 3]**2 \ - self.mp * self.l * xs[:, 3]**2
* (np.cos(xs[:, 2])**2 - np.sin(xs[:, 2])**2) \ * (np.cos(xs[:, 2])**2 - np.sin(xs[:, 2])**2)
- (self.mc + self.mp) \ - (self.mc + self.mp)
* self.g * np.cos(xs[:, 2])) * self.g * np.cos(xs[:, 2]))
# f_theta_dot # f_theta_dot
f_x[:, 1, 3] = tmp2 * (self.mp * np.sin(xs[:, 2]) \ f_x[:, 1, 3] = tmp2 * (self.mp * np.sin(xs[:, 2])
* self.l * 2 * xs[:, 3]) * self.l * 2 * xs[:, 3])
f_x[:, 2, 3] = np.ones(pred_len) f_x[:, 2, 3] = np.ones(pred_len)
f_x[:, 3, 3] = 1. / self.l * tmp2 \ f_x[:, 3, 3] = 1. / self.l * tmp2 \
* (-2. * self.mp * self.l * xs[:, 3] \ * (-2. * self.mp * self.l * xs[:, 3]
* np.cos(xs[:, 2]) * np.sin(xs[:, 2])) * np.cos(xs[:, 2]) * np.sin(xs[:, 2]))
return f_x * dt + np.eye(state_size) # to discrete form return f_x * dt + np.eye(state_size) # to discrete form
@ -150,7 +152,7 @@ class CartPoleModel(Model):
f_u[:, 1, 0] = 1. / (self.mc + self.mp * (np.sin(xs[:, 2])**2)) f_u[:, 1, 0] = 1. / (self.mc + self.mp * (np.sin(xs[:, 2])**2))
f_u[:, 3, 0] = -np.cos(xs[:, 2]) \ f_u[:, 3, 0] = -np.cos(xs[:, 2]) \
/ (self.l * (self.mc \ / (self.l * (self.mc
+ self.mp * (np.sin(xs[:, 2])**2))) + self.mp * (np.sin(xs[:, 2])**2)))
return f_u * dt # to discrete form return f_u * dt # to discrete form

View File

@ -3,6 +3,7 @@ import scipy.linalg
from scipy import integrate from scipy import integrate
from .model import LinearModel from .model import LinearModel
class FirstOrderLagModel(LinearModel): class FirstOrderLagModel(LinearModel):
""" first order lag model """ first order lag model
Attributes: Attributes:
@ -10,13 +11,15 @@ class FirstOrderLagModel(LinearModel):
u (numpy.ndarray): u (numpy.ndarray):
history_pred_xs (numpy.ndarray): history_pred_xs (numpy.ndarray):
""" """
def __init__(self, config, tau=0.63): def __init__(self, config, tau=0.63):
""" """
Args: Args:
tau (float): time constant tau (float): time constant
""" """
# param # param
self.A, self.B = self._to_state_space(tau, dt=config.DT) # discrete system self.A, self.B = self._to_state_space(
tau, dt=config.DT) # discrete system
super(FirstOrderLagModel, self).__init__(self.A, self.B) super(FirstOrderLagModel, self).__init__(self.A, self.B)
@staticmethod @staticmethod
@ -44,7 +47,8 @@ class FirstOrderLagModel(LinearModel):
B = np.zeros_like(Bc) B = np.zeros_like(Bc)
for m in range(Bc.shape[0]): for m in range(Bc.shape[0]):
for n in range(Bc.shape[1]): for n in range(Bc.shape[1]):
integrate_fn = lambda tau: np.matmul(scipy.linalg.expm(Ac*tau), Bc)[m, n] def integrate_fn(tau): return np.matmul(
scipy.linalg.expm(Ac*tau), Bc)[m, n]
sol = integrate.quad(integrate_fn, 0, dt) sol = integrate.quad(integrate_fn, 0, dt)
B[m, n] = sol[0] B[m, n] = sol[0]

View File

@ -1,6 +1,8 @@
from .first_order_lag import FirstOrderLagModel from .first_order_lag import FirstOrderLagModel
from .two_wheeled import TwoWheeledModel from .two_wheeled import TwoWheeledModel
from .cartpole import CartPoleModel from .cartpole import CartPoleModel
from .nonlinear_sample_system import NonlinearSampleSystemModel
def make_model(args, config): def make_model(args, config):
@ -10,5 +12,7 @@ def make_model(args, config):
return TwoWheeledModel(config) return TwoWheeledModel(config)
elif args.env == "CartPole": elif args.env == "CartPole":
return CartPoleModel(config) return CartPoleModel(config)
elif args.env == "NonlinearSample":
return NonlinearSampleSystemModel(config)
raise NotImplementedError("There is not {} Model".format(args.env)) raise NotImplementedError("There is not {} Model".format(args.env))

View File

@ -1,8 +1,10 @@
import numpy as np import numpy as np
class Model(): class Model():
""" base class of model """ base class of model
""" """
def __init__(self): def __init__(self):
""" """
""" """
@ -75,7 +77,7 @@ class Model():
# next_x.shape = (pop_size, state_size) # next_x.shape = (pop_size, state_size)
next_x = self.predict_next_state(x, us[t]) next_x = self.predict_next_state(x, us[t])
# update # update
pred_xs = np.concatenate((pred_xs, next_x[np.newaxis, :, :]),\ pred_xs = np.concatenate((pred_xs, next_x[np.newaxis, :, :]),
axis=0) axis=0)
x = next_x x = next_x
@ -86,6 +88,11 @@ class Model():
""" """
raise NotImplementedError("Implement the model") raise NotImplementedError("Implement the model")
def x_dot(self, curr_x, u):
""" compute x dot
"""
raise NotImplementedError("Implement the model")
def predict_adjoint_traj(self, xs, us, g_xs): def predict_adjoint_traj(self, xs, us, g_xs):
""" """
Args: Args:
@ -95,32 +102,35 @@ class Model():
Returns: Returns:
lams (numpy.ndarray): adjoint state, shape(pred_len, state_size), lams (numpy.ndarray): adjoint state, shape(pred_len, state_size),
adjoint size is the same as state_size adjoint size is the same as state_size
Notes:
Adjoint trajectory be computed by backward path.
Usually, we should -\dot{lam} but in backward path case, we can use \dot{lam} directry
""" """
# get size # get size
(pred_len, input_size) = us.shape (pred_len, input_size) = us.shape
# pred final adjoint state # pred final adjoint state
lam = self.predict_terminal_adjoint_state(xs[-1],\ lam = self.predict_terminal_adjoint_state(xs[-1],
terminal_g_x=g_xs[-1]) terminal_g_x=g_xs[-1])
lams = lam[np.newaxis, :] lams = lam[np.newaxis, :]
for t in range(pred_len-1, 0, -1): for t in range(pred_len-1, 0, -1):
prev_lam = \ prev_lam = \
self.predict_adjoint_state(lam, xs[t], us[t],\ self.predict_adjoint_state(lam, xs[t], us[t],
goal=g_xs[t], t=t) g_x=g_xs[t])
# update # update
lams = np.concatenate((prev_lam[np.newaxis, :], lams), axis=0) lams = np.concatenate((prev_lam[np.newaxis, :], lams), axis=0)
lam = prev_lam lam = prev_lam
return lams return lams
def predict_adjoint_state(self, lam, x, u, goal=None, t=None): def predict_adjoint_state(self, lam, x, u, g_x=None, t=None):
""" predict adjoint states """ predict adjoint states
Args: Args:
lam (numpy.ndarray): adjoint state, shape(state_size, ) lam (numpy.ndarray): adjoint state, shape(state_size, )
x (numpy.ndarray): state, shape(state_size, ) x (numpy.ndarray): state, shape(state_size, )
u (numpy.ndarray): input, shape(input_size, ) u (numpy.ndarray): input, shape(input_size, )
goal (numpy.ndarray): goal state, shape(state_size, ) g_x (numpy.ndarray): goal state, shape(state_size, )
Returns: Returns:
prev_lam (numpy.ndarrya): previous adjoint state, prev_lam (numpy.ndarrya): previous adjoint state,
shape(state_size, ) shape(state_size, )
@ -175,6 +185,7 @@ class Model():
raise NotImplementedError("Implement hessian of model \ raise NotImplementedError("Implement hessian of model \
with respect to the input") with respect to the input")
class LinearModel(Model): class LinearModel(Model):
""" discrete linear model, x[k+1] = Ax[k] + Bu[k] """ discrete linear model, x[k+1] = Ax[k] + Bu[k]
@ -182,6 +193,7 @@ class LinearModel(Model):
A (numpy.ndarray): shape(state_size, state_size) A (numpy.ndarray): shape(state_size, state_size)
B (numpy.ndarray): shape(state_size, input_size) B (numpy.ndarray): shape(state_size, input_size)
""" """
def __init__(self, A, B): def __init__(self, A, B):
""" """
""" """

View File

@ -0,0 +1,217 @@
import numpy as np
from .model import Model
from ..common.utils import update_state_with_Runge_Kutta
class NonlinearSampleSystemModel(Model):
""" nonlinear sample system model
"""
def __init__(self, config):
"""
"""
super(NonlinearSampleSystemModel, self).__init__()
self.dt = config.DT
self.gradient_hamiltonian_state = config.gradient_hamiltonian_state
self.gradient_hamiltonian_input = config.gradient_hamiltonian_input
self.gradient_cost_fn_state = config.gradient_cost_fn_state
def predict_next_state(self, curr_x, u):
""" predict next state
Args:
curr_x (numpy.ndarray): current state, shape(state_size, ) or
shape(pop_size, state_size)
u (numpy.ndarray): input, shape(input_size, ) or
shape(pop_size, input_size)
Returns:
next_x (numpy.ndarray): next state, shape(state_size, ) or
shape(pop_size, state_size)
"""
if len(u.shape) == 1:
func_1 = self._func_x_1
func_2 = self._func_x_2
functions = [func_1, func_2]
next_x = update_state_with_Runge_Kutta(
curr_x, u, functions, batch=False, dt=self.dt)
return next_x
elif len(u.shape) == 2:
def func_1(xs, us): return self._func_x_1(xs, us, batch=True)
def func_2(xs, us): return self._func_x_2(xs, us, batch=True)
functions = [func_1, func_2]
next_x = update_state_with_Runge_Kutta(
curr_x, u, functions, batch=True, dt=self.dt)
return next_x
def x_dot(self, curr_x, u):
"""
Args:
curr_x (numpy.ndarray): current state, shape(state_size, )
u (numpy.ndarray): input, shape(input_size, )
Returns:
x_dot (numpy.ndarray): next state, shape(state_size, )
"""
state_size = curr_x.shape[0]
x_dot = np.zeros(state_size)
x_dot[0] = self._func_x_1(curr_x, u)
x_dot[1] = self._func_x_2(curr_x, u)
return x_dot
def predict_adjoint_state(self, lam, x, u, g_x=None):
""" predict adjoint states
Args:
lam (numpy.ndarray): adjoint state, shape(state_size, )
x (numpy.ndarray): state, shape(state_size, )
u (numpy.ndarray): input, shape(input_size, )
goal (numpy.ndarray): goal state, shape(state_size, )
Returns:
prev_lam (numpy.ndarrya): previous adjoint state,
shape(state_size, )
"""
if len(u.shape) == 1:
delta_lam = self.dt * \
self.gradient_hamiltonian_state(x, lam, u, g_x)
prev_lam = lam + delta_lam
return prev_lam
elif len(u.shape) == 2:
raise ValueError
def predict_terminal_adjoint_state(self, terminal_x, terminal_g_x=None):
""" predict terminal adjoint state
Args:
terminal_x (numpy.ndarray): terminal state, shape(state_size, )
terminal_g_x (numpy.ndarray): terminal goal state,
shape(state_size, )
Returns:
terminal_lam (numpy.ndarray): terminal adjoint state,
shape(state_size, )
"""
terminal_lam = self.gradient_cost_fn_state(
terminal_x, terminal_g_x, terminal=True) # return in shape[1, state_size]
return terminal_lam[0]
def _func_x_1(self, x, u, batch=False):
if not batch:
x_dot = x[1]
else:
x_dot = x[:, 1]
return x_dot
def _func_x_2(self, x, u, batch=False):
if not batch:
x_dot = (1. - x[0]**2 - x[1]**2) * x[1] - x[0] + u[0]
else:
x_dot = (1. - x[:, 0]**2 - x[:, 1]**2) * \
x[:, 1] - x[:, 0] + u[:, 0]
return x_dot
def calc_f_x(self, xs, us, dt):
""" gradient of model with respect to the state in batch form
Args:
xs (numpy.ndarray): state, shape(pred_len+1, state_size)
us (numpy.ndarray): input, shape(pred_len, input_size,)
Return:
f_x (numpy.ndarray): gradient of model with respect to x,
shape(pred_len, state_size, state_size)
Notes:
This should be discrete form !!
"""
# get size
(_, state_size) = xs.shape
(pred_len, _) = us.shape
f_x = np.zeros((pred_len, state_size, state_size))
f_x[:, 0, 1] = 1.
f_x[:, 1, 0] = 2. * xs[:, 0] * xs[:, 1] - 1.
f_x[:, 1, 1] = - 2. * xs[:, 1] * xs[:, 1] + \
(1. - xs[:, 0]**2 - xs[:, 1]**2)
return f_x * dt + np.eye(state_size) # to discrete form
def calc_f_u(self, xs, us, dt):
""" gradient of model with respect to the input in batch form
Args:
xs (numpy.ndarray): state, shape(pred_len+1, state_size)
us (numpy.ndarray): input, shape(pred_len, input_size,)
Return:
f_u (numpy.ndarray): gradient of model with respect to x,
shape(pred_len, state_size, input_size)
Notes:
This should be discrete form !!
"""
# get size
(_, state_size) = xs.shape
(pred_len, input_size) = us.shape
f_u = np.zeros((pred_len, state_size, input_size))
f_u[:, 1, 0] = 1.
return f_u * dt # to discrete form
def calc_f_xx(self, xs, us, dt):
""" hessian of model with respect to the state in batch form
Args:
xs (numpy.ndarray): state, shape(pred_len+1, state_size)
us (numpy.ndarray): input, shape(pred_len, input_size,)
Return:
f_xx (numpy.ndarray): gradient of model with respect to x,
shape(pred_len, state_size, state_size, state_size)
"""
# get size
(_, state_size) = xs.shape
(pred_len, _) = us.shape
f_xx = np.zeros((pred_len, state_size, state_size, state_size))
raise NotImplementedError
def calc_f_ux(self, xs, us, dt):
""" hessian of model with respect to state and input in batch form
Args:
xs (numpy.ndarray): state, shape(pred_len+1, state_size)
us (numpy.ndarray): input, shape(pred_len, input_size,)
Return:
f_ux (numpy.ndarray): gradient of model with respect to x,
shape(pred_len, state_size, input_size, state_size)
"""
# get size
(_, state_size) = xs.shape
(pred_len, input_size) = us.shape
f_ux = np.zeros((pred_len, state_size, input_size, state_size))
raise NotImplementedError
def calc_f_uu(self, xs, us, dt):
""" hessian of model with respect to input in batch form
Args:
xs (numpy.ndarray): state, shape(pred_len+1, state_size)
us (numpy.ndarray): input, shape(pred_len, input_size,)
Return:
f_uu (numpy.ndarray): gradient of model with respect to x,
shape(pred_len, state_size, input_size, input_size)
"""
# get size
(_, state_size) = xs.shape
(pred_len, input_size) = us.shape
f_uu = np.zeros((pred_len, state_size, input_size, input_size))
raise NotImplementedError

View File

@ -2,14 +2,19 @@ import numpy as np
from .model import Model from .model import Model
class TwoWheeledModel(Model): class TwoWheeledModel(Model):
""" two wheeled model """ two wheeled model
""" """
def __init__(self, config): def __init__(self, config):
""" """
""" """
super(TwoWheeledModel, self).__init__() super(TwoWheeledModel, self).__init__()
self.dt = config.DT self.dt = config.DT
self.gradient_hamiltonian_state = config.gradient_hamiltonian_state
self.gradient_hamiltonian_input = config.gradient_hamiltonian_input
self.gradient_cost_fn_state = config.gradient_cost_fn_state
def predict_next_state(self, curr_x, u): def predict_next_state(self, curr_x, u):
""" predict next state """ predict next state
@ -51,6 +56,56 @@ class TwoWheeledModel(Model):
return next_x return next_x
def x_dot(self, curr_x, u):
""" compute x dot
Args:
curr_x (numpy.ndarray): current state, shape(state_size, )
u (numpy.ndarray): input, shape(input_size, )
Returns:
x_dot (numpy.ndarray): next state, shape(state_size, )
"""
B = np.array([[np.cos(curr_x[-1]), 0.],
[np.sin(curr_x[-1]), 0.],
[0., 1.]])
x_dot = np.matmul(B, u[:, np.newaxis])
return x_dot.flatten()
def predict_adjoint_state(self, lam, x, u, g_x=None, t=None):
""" predict adjoint states
Args:
lam (numpy.ndarray): adjoint state, shape(state_size, )
x (numpy.ndarray): state, shape(state_size, )
u (numpy.ndarray): input, shape(input_size, )
goal (numpy.ndarray): goal state, shape(state_size, )
Returns:
prev_lam (numpy.ndarrya): previous adjoint state,
shape(state_size, )
"""
if len(u.shape) == 1:
delta_lam = self.dt * \
self.gradient_hamiltonian_state(x, lam, u, g_x)
prev_lam = lam + delta_lam
return prev_lam
elif len(u.shape) == 2:
raise ValueError
def predict_terminal_adjoint_state(self, terminal_x, terminal_g_x=None):
""" predict terminal adjoint state
Args:
terminal_x (numpy.ndarray): terminal state, shape(state_size, )
terminal_g_x (numpy.ndarray): terminal goal state,
shape(state_size, )
Returns:
terminal_lam (numpy.ndarray): terminal adjoint state,
shape(state_size, )
"""
terminal_lam = self.gradient_cost_fn_state(
terminal_x, terminal_g_x, terminal=True) # return in shape[1, state_size]
return terminal_lam[0]
@staticmethod @staticmethod
def calc_f_x(xs, us, dt): def calc_f_x(xs, us, dt):
""" gradient of model with respect to the state in batch form """ gradient of model with respect to the state in batch form

View File

@ -1,9 +1,11 @@
import numpy as np import numpy as np
from .planner import Planner from .planner import Planner
class ClosestPointPlanner(Planner): class ClosestPointPlanner(Planner):
""" This planner make goal state according to goal path """ This planner make goal state according to goal path
""" """
def __init__(self, config): def __init__(self, config):
""" """
""" """

View File

@ -1,9 +1,11 @@
import numpy as np import numpy as np
from .planner import Planner from .planner import Planner
class ConstantPlanner(Planner): class ConstantPlanner(Planner):
""" This planner make constant goal state """ This planner make constant goal state
""" """
def __init__(self, config): def __init__(self, config):
""" """
""" """

View File

@ -1,6 +1,7 @@
from .const_planner import ConstantPlanner from .const_planner import ConstantPlanner
from .closest_point_planner import ClosestPointPlanner from .closest_point_planner import ClosestPointPlanner
def make_planner(args, config): def make_planner(args, config):
if args.env == "FirstOrderLag": if args.env == "FirstOrderLag":
@ -11,5 +12,8 @@ def make_planner(args, config):
return ClosestPointPlanner(config) return ClosestPointPlanner(config)
elif args.env == "CartPole": elif args.env == "CartPole":
return ConstantPlanner(config) return ConstantPlanner(config)
elif args.env == "NonlinearSample":
return ConstantPlanner(config)
raise NotImplementedError("There is not {} Planner".format(args.planner_type)) raise NotImplementedError(
"There is not {} Planner".format(args.planner_type))

View File

@ -1,8 +1,10 @@
import numpy as np import numpy as np
class Planner(): class Planner():
""" """
""" """
def __init__(self): def __init__(self):
""" """
""" """

View File

@ -8,9 +8,11 @@ import matplotlib.animation as animation
logger = getLogger(__name__) logger = getLogger(__name__)
class Animator(): class Animator():
""" animation class """ animation class
""" """
def __init__(self, env, args=None): def __init__(self, env, args=None):
""" """
""" """
@ -65,7 +67,7 @@ class Animator():
""" """
# set up animation figures # set up animation figures
self._setup() self._setup()
_update_img = lambda i: self._update_img(i, history_x, history_g_x) def _update_img(i): return self._update_img(i, history_x, history_g_x)
# Set up formatting for the movie files # Set up formatting for the movie files
Writer = animation.writers['ffmpeg'] Writer = animation.writers['ffmpeg']

View File

@ -5,6 +5,7 @@ import matplotlib.pyplot as plt
from ..helper import save_pickle, load_pickle from ..helper import save_pickle, load_pickle
def plot_result(history, history_g=None, ylabel="x", def plot_result(history, history_g=None, ylabel="x",
save_dir="./result", name="state_history"): save_dir="./result", name="state_history"):
""" """
@ -28,7 +29,7 @@ def plot_result(history, history_g=None, ylabel="x",
def plot(axis, history, history_g=None): def plot(axis, history, history_g=None):
axis.plot(range(iters), history, c="r", linewidth=3) axis.plot(range(iters), history, c="r", linewidth=3)
if history_g is not None: if history_g is not None:
axis.plot(range(iters), history_g,\ axis.plot(range(iters), history_g,
c="b", linewidth=3, label="goal") c="b", linewidth=3, label="goal")
if i < size: if i < size:
@ -47,6 +48,7 @@ def plot_result(history, history_g=None, ylabel="x",
axis1.legend(ncol=1, bbox_to_anchor=(0., 1.02, 1., 0.102), loc=3) axis1.legend(ncol=1, bbox_to_anchor=(0., 1.02, 1., 0.102), loc=3)
figure.savefig(path, bbox_inches="tight", pad_inches=0.05) figure.savefig(path, bbox_inches="tight", pad_inches=0.05)
def plot_results(history_x, history_u, history_g=None, args=None): def plot_results(history_x, history_u, history_g=None, args=None):
""" """
@ -70,6 +72,7 @@ def plot_results(history_x, history_u, history_g=None, args=None):
name=env + "-input_history", name=env + "-input_history",
save_dir="./result/" + controller_type) save_dir="./result/" + controller_type)
def save_plot_data(history_x, history_u, history_g=None, args=None): def save_plot_data(history_x, history_u, history_g=None, args=None):
""" save plot data """ save plot data
@ -98,6 +101,7 @@ def save_plot_data(history_x, history_u, history_g=None, args=None):
env + "-history_g.pkl") env + "-history_g.pkl")
save_pickle(path, history_g) save_pickle(path, history_g)
def load_plot_data(env, controller_type, result_dir="./result"): def load_plot_data(env, controller_type, result_dir="./result"):
""" """
Args: Args:
@ -123,6 +127,7 @@ def load_plot_data(env, controller_type, result_dir="./result"):
return history_x, history_u, history_g return history_x, history_u, history_g
def plot_multi_result(histories, histories_g=None, labels=None, ylabel="x", def plot_multi_result(histories, histories_g=None, labels=None, ylabel="x",
save_dir="./result", name="state_history"): save_dir="./result", name="state_history"):
""" """
@ -146,7 +151,7 @@ def plot_multi_result(histories, histories_g=None, labels=None, ylabel="x",
axis.plot(range(iters), history, axis.plot(range(iters), history,
linewidth=3, label=label, alpha=0.7, linestyle="dashed") linewidth=3, label=label, alpha=0.7, linestyle="dashed")
if history_g is not None: if history_g is not None:
axis.plot(range(iters), history_g,\ axis.plot(range(iters), history_g,
c="b", linewidth=3) c="b", linewidth=3)
if i < size: if i < size:

View File

@ -5,6 +5,7 @@ import matplotlib.pyplot as plt
from ..common.utils import rotate_pos from ..common.utils import rotate_pos
def circle(center_x, center_y, radius, start=0., end=2*np.pi, n_point=100): def circle(center_x, center_y, radius, start=0., end=2*np.pi, n_point=100):
""" Create circle matrix """ Create circle matrix
@ -29,6 +30,7 @@ def circle(center_x, center_y, radius, start=0., end=2*np.pi, n_point=100):
return np.array(circle_xs), np.array(circle_ys) return np.array(circle_xs), np.array(circle_ys)
def circle_with_angle(center_x, center_y, radius, angle): def circle_with_angle(center_x, center_y, radius, angle):
""" Create circle matrix with angle line matrix """ Create circle matrix with angle line matrix
@ -50,6 +52,7 @@ def circle_with_angle(center_x, center_y, radius, angle):
return circle_x, circle_y, angle_x, angle_y return circle_x, circle_y, angle_x, angle_y
def square(center_x, center_y, shape, angle): def square(center_x, center_y, shape, angle):
""" Create square """ Create square
@ -77,6 +80,7 @@ def square(center_x, center_y, shape, angle):
return trans_points[:, 0], trans_points[:, 1] return trans_points[:, 0], trans_points[:, 1]
def square_with_angle(center_x, center_y, shape, angle): def square_with_angle(center_x, center_y, shape, angle):
""" Create square with angle line """ Create square with angle line

View File

@ -1,4 +1,5 @@
from .runner import ExpRunner from .runner import ExpRunner
def make_runner(args): def make_runner(args):
return ExpRunner() return ExpRunner()

View File

@ -4,9 +4,11 @@ import numpy as np
logger = getLogger(__name__) logger = getLogger(__name__)
class ExpRunner(): class ExpRunner():
""" experiment runner """ experiment runner
""" """
def __init__(self): def __init__(self):
""" """
""" """
@ -46,6 +48,6 @@ class ExpRunner():
score += cost score += cost
step_count += 1 step_count += 1
logger.debug("Controller type = {}, Score = {}"\ logger.debug("Controller type = {}, Score = {}"
.format(controller, score)) .format(controller, score))
return np.array(history_x), np.array(history_u), np.array(history_g) return np.array(history_x), np.array(history_u), np.array(history_g)

View File

@ -53,10 +53,10 @@ Following algorithms are implemented in PythonLinearNonlinearControl
- [script](PythonLinearNonlinearControl/controllers/ddp.py) - [script](PythonLinearNonlinearControl/controllers/ddp.py)
- [Unconstrained Nonlinear Model Predictive Control (NMPC)](https://www.sciencedirect.com/science/article/pii/S0005109897000058) - [Unconstrained Nonlinear Model Predictive Control (NMPC)](https://www.sciencedirect.com/science/article/pii/S0005109897000058)
- Ref: Ohtsuka, T., & Fujii, H. A. (1997). Real-time optimization algorithm for nonlinear receding-horizon control. Automatica, 33(6), 1147-1154. - Ref: Ohtsuka, T., & Fujii, H. A. (1997). Real-time optimization algorithm for nonlinear receding-horizon control. Automatica, 33(6), 1147-1154.
- [script (Coming soon)]() - [script](PythonLinearNonlinearControl/controllers/nmpc.py)
- [Constrained Nonlinear Model Predictive Control -CGMRES- (NMPC-CGMRES)](https://www.sciencedirect.com/science/article/pii/S0005109897000058) - [Constrained Nonlinear Model Predictive Control -CGMRES- (NMPC-CGMRES)](https://www.sciencedirect.com/science/article/pii/S0005109897000058)
- Ref: Ohtsuka, T., & Fujii, H. A. (1997). Real-time optimization algorithm for nonlinear receding-horizon control. Automatica, 33(6), 1147-1154. - Ref: Ohtsuka, T., & Fujii, H. A. (1997). Real-time optimization algorithm for nonlinear receding-horizon control. Automatica, 33(6), 1147-1154.
- [script (Coming soon)]() - [script](PythonLinearNonlinearControl/controllers/nmpc_cgmres.py)
- [Constrained Nonlinear Model Predictive Control -Newton- (NMPC-Newton)](https://www.sciencedirect.com/science/article/pii/S0005109897000058) - [Constrained Nonlinear Model Predictive Control -Newton- (NMPC-Newton)](https://www.sciencedirect.com/science/article/pii/S0005109897000058)
- Ref: Ohtsuka, T., & Fujii, H. A. (1997). Real-time optimization algorithm for nonlinear receding-horizon control. Automatica, 33(6), 1147-1154. - Ref: Ohtsuka, T., & Fujii, H. A. (1997). Real-time optimization algorithm for nonlinear receding-horizon control. Automatica, 33(6), 1147-1154.
- [script (Coming soon)]() - [script (Coming soon)]()
@ -71,6 +71,7 @@ There are 4 example environments, "FirstOrderLag", "TwoWheeledConst", "TwoWheele
| Two wheeled System (Constant Goal) | x | ✓ | 3 | 2 | | Two wheeled System (Constant Goal) | x | ✓ | 3 | 2 |
| Two wheeled System (Moving Goal) | x | ✓ | 3 | 2 | | Two wheeled System (Moving Goal) | x | ✓ | 3 | 2 |
| Cartpole (Swing up) | x | ✓ | 4 | 1 | | Cartpole (Swing up) | x | ✓ | 4 | 1 |
| Nonlinear Sample System Env | x | ✓ | 2 | 1 |
All states and inputs of environments are continuous. All states and inputs of environments are continuous.
**It should be noted that the algorithms for linear model could be applied to nonlinear enviroments if you have linealized the model of nonlinear environments.** **It should be noted that the algorithms for linear model could be applied to nonlinear enviroments if you have linealized the model of nonlinear environments.**
@ -184,6 +185,8 @@ save_plot_data(history_x, history_u, history_g=history_g)
animator = Animator(env) animator = Animator(env)
animator.draw(history_x, history_g) animator.draw(history_x, history_g)
``` ```
**It should be noted that the controller parameters like Q, R and Sf strongly affect the performence of the controller.
Please, check these parameters before you run the simulation.**
## Run Example Script ## Run Example Script

Binary file not shown.

After

Width:  |  Height:  |  Size: 20 KiB

Binary file not shown.

After

Width:  |  Height:  |  Size: 28 KiB

View File

@ -8,6 +8,7 @@ import matplotlib.pyplot as plt
from PythonLinearNonlinearControl.plotters.plot_func import load_plot_data, \ from PythonLinearNonlinearControl.plotters.plot_func import load_plot_data, \
plot_multi_result plot_multi_result
def run(args): def run(args):
controllers = ["iLQR", "DDP", "CEM", "MPPI"] controllers = ["iLQR", "DDP", "CEM", "MPPI"]
@ -41,6 +42,7 @@ def run(args):
plot_multi_result(history_us, histories_g=np.zeros_like(history_us), plot_multi_result(history_us, histories_g=np.zeros_like(history_us),
labels=controllers, ylabel="u", name="input_history") labels=controllers, ylabel="u", name="input_history")
def main(): def main():
parser = argparse.ArgumentParser() parser = argparse.ArgumentParser()
@ -51,5 +53,6 @@ def main():
run(args) run(args)
if __name__ == "__main__": if __name__ == "__main__":
main() main()

View File

@ -11,32 +11,24 @@ from PythonLinearNonlinearControl.plotters.plot_func import plot_results, \
save_plot_data save_plot_data
from PythonLinearNonlinearControl.plotters.animator import Animator from PythonLinearNonlinearControl.plotters.animator import Animator
def run(args): def run(args):
# logger
make_logger(args.result_dir) make_logger(args.result_dir)
# make envs
env = make_env(args) env = make_env(args)
# make config
config = make_config(args) config = make_config(args)
# make planner
planner = make_planner(args, config) planner = make_planner(args, config)
# make model
model = make_model(args, config) model = make_model(args, config)
# make controller
controller = make_controller(args, config, model) controller = make_controller(args, config, model)
# make simulator
runner = make_runner(args) runner = make_runner(args)
# run experiment
history_x, history_u, history_g = runner.run(env, controller, planner) history_x, history_u, history_g = runner.run(env, controller, planner)
# plot results
plot_results(history_x, history_u, history_g=history_g, args=args) plot_results(history_x, history_u, history_g=history_g, args=args)
save_plot_data(history_x, history_u, history_g=history_g, args=args) save_plot_data(history_x, history_u, history_g=history_g, args=args)
@ -44,17 +36,19 @@ def run(args):
animator = Animator(env, args=args) animator = Animator(env, args=args)
animator.draw(history_x, history_g) animator.draw(history_x, history_g)
def main(): def main():
parser = argparse.ArgumentParser() parser = argparse.ArgumentParser()
parser.add_argument("--controller_type", type=str, default="CEM") parser.add_argument("--controller_type", type=str, default="NMPCCGMRES")
parser.add_argument("--env", type=str, default="TwoWheeledTrack") parser.add_argument("--env", type=str, default="TwoWheeledConst")
parser.add_argument("--save_anim", type=bool_flag, default=1) parser.add_argument("--save_anim", type=bool_flag, default=0)
parser.add_argument("--result_dir", type=str, default="./result") parser.add_argument("--result_dir", type=str, default="./result")
args = parser.parse_args() args = parser.parse_args()
run(args) run(args)
if __name__ == "__main__": if __name__ == "__main__":
main() main()

View File

@ -4,6 +4,7 @@ import numpy as np
from PythonLinearNonlinearControl.configs.cartpole \ from PythonLinearNonlinearControl.configs.cartpole \
import CartPoleConfigModule import CartPoleConfigModule
class TestCalcCost(): class TestCalcCost():
def test_calc_costs(self): def test_calc_costs(self):
# make config # make config
@ -25,17 +26,18 @@ class TestCalcCost():
assert costs.shape == (pop_size, pred_len, 1) assert costs.shape == (pop_size, pred_len, 1)
costs = config.terminal_state_cost_fn(pred_xs[:, -1, :],\ costs = config.terminal_state_cost_fn(pred_xs[:, -1, :],
g_xs[:, -1, :]) g_xs[:, -1, :])
assert costs.shape == (pop_size, 1) assert costs.shape == (pop_size, 1)
class TestGradient(): class TestGradient():
def test_state_gradient(self): def test_state_gradient(self):
""" """
""" """
xs = np.ones((1, 4)) xs = np.ones((1, 4))
cost_grad = CartPoleConfigModule.gradient_cost_fn_with_state(xs, None) cost_grad = CartPoleConfigModule.gradient_cost_fn_state(xs, None)
# numeric grad # numeric grad
eps = 1e-4 eps = 1e-4
@ -59,7 +61,7 @@ class TestGradient():
""" """
xs = np.ones(4) xs = np.ones(4)
cost_grad =\ cost_grad =\
CartPoleConfigModule.gradient_cost_fn_with_state(xs, None, CartPoleConfigModule.gradient_cost_fn_state(xs, None,
terminal=True) terminal=True)
# numeric grad # numeric grad
@ -83,7 +85,7 @@ class TestGradient():
""" """
""" """
us = np.ones((1, 1)) us = np.ones((1, 1))
cost_grad = CartPoleConfigModule.gradient_cost_fn_with_input(None, us) cost_grad = CartPoleConfigModule.gradient_cost_fn_input(None, us)
# numeric grad # numeric grad
eps = 1e-4 eps = 1e-4
@ -106,7 +108,7 @@ class TestGradient():
""" """
""" """
xs = np.ones((1, 4)) xs = np.ones((1, 4))
cost_hess = CartPoleConfigModule.hessian_cost_fn_with_state(xs, None) cost_hess = CartPoleConfigModule.hessian_cost_fn_state(xs, None)
# numeric grad # numeric grad
eps = 1e-4 eps = 1e-4
@ -115,12 +117,12 @@ class TestGradient():
tmp_x = xs.copy() tmp_x = xs.copy()
tmp_x[0, i] = xs[0, i] + eps tmp_x[0, i] = xs[0, i] + eps
forward = \ forward = \
CartPoleConfigModule.gradient_cost_fn_with_state( CartPoleConfigModule.gradient_cost_fn_state(
tmp_x, None, terminal=False) tmp_x, None, terminal=False)
tmp_x = xs.copy() tmp_x = xs.copy()
tmp_x[0, i] = xs[0, i] - eps tmp_x[0, i] = xs[0, i] - eps
backward = \ backward = \
CartPoleConfigModule.gradient_cost_fn_with_state( CartPoleConfigModule.gradient_cost_fn_state(
tmp_x, None, terminal=False) tmp_x, None, terminal=False)
expected_hess[0, :, i] = (forward - backward) / (2. * eps) expected_hess[0, :, i] = (forward - backward) / (2. * eps)
@ -132,7 +134,7 @@ class TestGradient():
""" """
xs = np.ones(4) xs = np.ones(4)
cost_hess =\ cost_hess =\
CartPoleConfigModule.hessian_cost_fn_with_state(xs, None, CartPoleConfigModule.hessian_cost_fn_state(xs, None,
terminal=True) terminal=True)
# numeric grad # numeric grad
@ -142,12 +144,12 @@ class TestGradient():
tmp_x = xs.copy() tmp_x = xs.copy()
tmp_x[i] = xs[i] + eps tmp_x[i] = xs[i] + eps
forward = \ forward = \
CartPoleConfigModule.gradient_cost_fn_with_state( CartPoleConfigModule.gradient_cost_fn_state(
tmp_x, None, terminal=True) tmp_x, None, terminal=True)
tmp_x = xs.copy() tmp_x = xs.copy()
tmp_x[i] = xs[i] - eps tmp_x[i] = xs[i] - eps
backward = \ backward = \
CartPoleConfigModule.gradient_cost_fn_with_state( CartPoleConfigModule.gradient_cost_fn_state(
tmp_x, None, terminal=True) tmp_x, None, terminal=True)
expected_hess[0, :, i] = (forward - backward) / (2. * eps) expected_hess[0, :, i] = (forward - backward) / (2. * eps)
@ -159,7 +161,7 @@ class TestGradient():
""" """
us = np.ones((1, 1)) us = np.ones((1, 1))
xs = np.ones((1, 4)) xs = np.ones((1, 4))
cost_hess = CartPoleConfigModule.hessian_cost_fn_with_input(us, xs) cost_hess = CartPoleConfigModule.hessian_cost_fn_input(us, xs)
# numeric grad # numeric grad
eps = 1e-4 eps = 1e-4
@ -168,12 +170,12 @@ class TestGradient():
tmp_u = us.copy() tmp_u = us.copy()
tmp_u[0, i] = us[0, i] + eps tmp_u[0, i] = us[0, i] + eps
forward = \ forward = \
CartPoleConfigModule.gradient_cost_fn_with_input( CartPoleConfigModule.gradient_cost_fn_input(
xs, tmp_u) xs, tmp_u)
tmp_u = us.copy() tmp_u = us.copy()
tmp_u[0, i] = us[0, i] - eps tmp_u[0, i] = us[0, i] - eps
backward = \ backward = \
CartPoleConfigModule.gradient_cost_fn_with_input( CartPoleConfigModule.gradient_cost_fn_input(
xs, tmp_u) xs, tmp_u)
expected_hess[0, :, i] = (forward - backward) / (2. * eps) expected_hess[0, :, i] = (forward - backward) / (2. * eps)

View File

@ -4,6 +4,7 @@ import numpy as np
from PythonLinearNonlinearControl.configs.two_wheeled \ from PythonLinearNonlinearControl.configs.two_wheeled \
import TwoWheeledConfigModule import TwoWheeledConfigModule
class TestCalcCost(): class TestCalcCost():
def test_calc_costs(self): def test_calc_costs(self):
# make config # make config
@ -27,12 +28,13 @@ class TestCalcCost():
assert costs == pytest.approx(expected_costs**2 * np.diag(config.Q)) assert costs == pytest.approx(expected_costs**2 * np.diag(config.Q))
costs = config.terminal_state_cost_fn(pred_xs[:, -1, :],\ costs = config.terminal_state_cost_fn(pred_xs[:, -1, :],
g_xs[:, -1, :]) g_xs[:, -1, :])
expected_costs = np.ones((pop_size, state_size))*0.5 expected_costs = np.ones((pop_size, state_size))*0.5
assert costs == pytest.approx(expected_costs**2 * np.diag(config.Sf)) assert costs == pytest.approx(expected_costs**2 * np.diag(config.Sf))
class TestGradient(): class TestGradient():
def test_state_gradient(self): def test_state_gradient(self):
""" """
@ -40,7 +42,7 @@ class TestGradient():
xs = np.ones((1, 3)) xs = np.ones((1, 3))
g_xs = np.ones((1, 3)) * 0.5 g_xs = np.ones((1, 3)) * 0.5
cost_grad =\ cost_grad =\
TwoWheeledConfigModule.gradient_cost_fn_with_state(xs, g_xs) TwoWheeledConfigModule.gradient_cost_fn_state(xs, g_xs)
# numeric grad # numeric grad
eps = 1e-4 eps = 1e-4
@ -66,7 +68,7 @@ class TestGradient():
""" """
us = np.ones((1, 2)) us = np.ones((1, 2))
cost_grad =\ cost_grad =\
TwoWheeledConfigModule.gradient_cost_fn_with_input(None, us) TwoWheeledConfigModule.gradient_cost_fn_input(None, us)
# numeric grad # numeric grad
eps = 1e-4 eps = 1e-4
@ -93,7 +95,7 @@ class TestGradient():
g_xs = np.ones((1, 3)) * 0.5 g_xs = np.ones((1, 3)) * 0.5
xs = np.ones((1, 3)) xs = np.ones((1, 3))
cost_hess =\ cost_hess =\
TwoWheeledConfigModule.hessian_cost_fn_with_state(xs, g_xs) TwoWheeledConfigModule.hessian_cost_fn_state(xs, g_xs)
# numeric grad # numeric grad
eps = 1e-4 eps = 1e-4
@ -102,12 +104,12 @@ class TestGradient():
tmp_x = xs.copy() tmp_x = xs.copy()
tmp_x[0, i] = xs[0, i] + eps tmp_x[0, i] = xs[0, i] + eps
forward = \ forward = \
TwoWheeledConfigModule.gradient_cost_fn_with_state( TwoWheeledConfigModule.gradient_cost_fn_state(
tmp_x, g_xs, terminal=False) tmp_x, g_xs, terminal=False)
tmp_x = xs.copy() tmp_x = xs.copy()
tmp_x[0, i] = xs[0, i] - eps tmp_x[0, i] = xs[0, i] - eps
backward = \ backward = \
TwoWheeledConfigModule.gradient_cost_fn_with_state( TwoWheeledConfigModule.gradient_cost_fn_state(
tmp_x, g_xs, terminal=False) tmp_x, g_xs, terminal=False)
expected_hess[0, :, i] = (forward - backward) / (2. * eps) expected_hess[0, :, i] = (forward - backward) / (2. * eps)
@ -119,7 +121,7 @@ class TestGradient():
""" """
us = np.ones((1, 2)) us = np.ones((1, 2))
xs = np.ones((1, 3)) xs = np.ones((1, 3))
cost_hess = TwoWheeledConfigModule.hessian_cost_fn_with_input(us, xs) cost_hess = TwoWheeledConfigModule.hessian_cost_fn_input(us, xs)
# numeric grad # numeric grad
eps = 1e-4 eps = 1e-4
@ -128,12 +130,12 @@ class TestGradient():
tmp_u = us.copy() tmp_u = us.copy()
tmp_u[0, i] = us[0, i] + eps tmp_u[0, i] = us[0, i] + eps
forward = \ forward = \
TwoWheeledConfigModule.gradient_cost_fn_with_input( TwoWheeledConfigModule.gradient_cost_fn_input(
xs, tmp_u) xs, tmp_u)
tmp_u = us.copy() tmp_u = us.copy()
tmp_u[0, i] = us[0, i] - eps tmp_u[0, i] = us[0, i] - eps
backward = \ backward = \
TwoWheeledConfigModule.gradient_cost_fn_with_input( TwoWheeledConfigModule.gradient_cost_fn_input(
xs, tmp_u) xs, tmp_u)
expected_hess[0, :, i] = (forward - backward) / (2. * eps) expected_hess[0, :, i] = (forward - backward) / (2. * eps)