Add NMPC
This commit is contained in:
parent
8c28ff328a
commit
0c09e8a3d1
|
@ -52,7 +52,7 @@ class FirstOrderLagConfigModule():
|
||||||
"MPC": {
|
"MPC": {
|
||||||
},
|
},
|
||||||
"iLQR": {
|
"iLQR": {
|
||||||
"max_iter": 500,
|
"max_iters": 500,
|
||||||
"init_mu": 1.,
|
"init_mu": 1.,
|
||||||
"mu_min": 1e-6,
|
"mu_min": 1e-6,
|
||||||
"mu_max": 1e10,
|
"mu_max": 1e10,
|
||||||
|
@ -60,7 +60,7 @@ class FirstOrderLagConfigModule():
|
||||||
"threshold": 1e-6,
|
"threshold": 1e-6,
|
||||||
},
|
},
|
||||||
"DDP": {
|
"DDP": {
|
||||||
"max_iter": 500,
|
"max_iters": 500,
|
||||||
"init_mu": 1.,
|
"init_mu": 1.,
|
||||||
"mu_min": 1e-6,
|
"mu_min": 1e-6,
|
||||||
"mu_max": 1e10,
|
"mu_max": 1e10,
|
||||||
|
|
|
@ -6,12 +6,12 @@ class NonlinearSampleSystemConfigModule():
|
||||||
ENV_NAME = "NonlinearSampleSystem-v0"
|
ENV_NAME = "NonlinearSampleSystem-v0"
|
||||||
PLANNER_TYPE = "Const"
|
PLANNER_TYPE = "Const"
|
||||||
TYPE = "Nonlinear"
|
TYPE = "Nonlinear"
|
||||||
TASK_HORIZON = 2500
|
TASK_HORIZON = 2000
|
||||||
PRED_LEN = 10
|
PRED_LEN = 10
|
||||||
STATE_SIZE = 2
|
STATE_SIZE = 2
|
||||||
INPUT_SIZE = 1
|
INPUT_SIZE = 1
|
||||||
DT = 0.01
|
DT = 0.01
|
||||||
R = np.diag([0.01])
|
R = np.diag([1.])
|
||||||
Q = None
|
Q = None
|
||||||
Sf = None
|
Sf = None
|
||||||
# bounds
|
# bounds
|
||||||
|
@ -46,7 +46,7 @@ class NonlinearSampleSystemConfigModule():
|
||||||
"noise_sigma": 0.9,
|
"noise_sigma": 0.9,
|
||||||
},
|
},
|
||||||
"iLQR": {
|
"iLQR": {
|
||||||
"max_iter": 500,
|
"max_iters": 500,
|
||||||
"init_mu": 1.,
|
"init_mu": 1.,
|
||||||
"mu_min": 1e-6,
|
"mu_min": 1e-6,
|
||||||
"mu_max": 1e10,
|
"mu_max": 1e10,
|
||||||
|
@ -54,16 +54,25 @@ class NonlinearSampleSystemConfigModule():
|
||||||
"threshold": 1e-6,
|
"threshold": 1e-6,
|
||||||
},
|
},
|
||||||
"DDP": {
|
"DDP": {
|
||||||
"max_iter": 500,
|
"max_iters": 500,
|
||||||
"init_mu": 1.,
|
"init_mu": 1.,
|
||||||
"mu_min": 1e-6,
|
"mu_min": 1e-6,
|
||||||
"mu_max": 1e10,
|
"mu_max": 1e10,
|
||||||
"init_delta": 2.,
|
"init_delta": 2.,
|
||||||
"threshold": 1e-6,
|
"threshold": 1e-6,
|
||||||
},
|
},
|
||||||
|
"NMPC": {
|
||||||
|
"threshold": 1e-5,
|
||||||
|
"max_iters": 1000,
|
||||||
|
"learning_rate": 0.1
|
||||||
|
},
|
||||||
"NMPC-CGMRES": {
|
"NMPC-CGMRES": {
|
||||||
|
"threshold": 1e-3
|
||||||
},
|
},
|
||||||
"NMPC-Newton": {
|
"NMPC-Newton": {
|
||||||
|
"threshold": 1e-3,
|
||||||
|
"max_iteration": 500,
|
||||||
|
"learning_rate": 1e-3
|
||||||
},
|
},
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -103,7 +112,7 @@ class NonlinearSampleSystemConfigModule():
|
||||||
|
|
||||||
return 0.5 * (x[0]**2) + 0.5 * (x[1]**2)
|
return 0.5 * (x[0]**2) + 0.5 * (x[1]**2)
|
||||||
|
|
||||||
@ staticmethod
|
@staticmethod
|
||||||
def terminal_state_cost_fn(terminal_x, terminal_g_x):
|
def terminal_state_cost_fn(terminal_x, terminal_g_x):
|
||||||
"""
|
"""
|
||||||
|
|
||||||
|
@ -123,7 +132,7 @@ class NonlinearSampleSystemConfigModule():
|
||||||
|
|
||||||
return 0.5 * (terminal_x[0]**2) + 0.5 * (terminal_x[1]**2)
|
return 0.5 * (terminal_x[0]**2) + 0.5 * (terminal_x[1]**2)
|
||||||
|
|
||||||
@ staticmethod
|
@staticmethod
|
||||||
def gradient_cost_fn_with_state(x, g_x, terminal=False):
|
def gradient_cost_fn_with_state(x, g_x, terminal=False):
|
||||||
""" gradient of costs with respect to the state
|
""" gradient of costs with respect to the state
|
||||||
|
|
||||||
|
@ -147,7 +156,7 @@ class NonlinearSampleSystemConfigModule():
|
||||||
|
|
||||||
return cost_dx
|
return cost_dx
|
||||||
|
|
||||||
@ staticmethod
|
@staticmethod
|
||||||
def gradient_cost_fn_with_input(x, u):
|
def gradient_cost_fn_with_input(x, u):
|
||||||
""" gradient of costs with respect to the input
|
""" gradient of costs with respect to the input
|
||||||
|
|
||||||
|
@ -159,7 +168,7 @@ class NonlinearSampleSystemConfigModule():
|
||||||
"""
|
"""
|
||||||
return 2. * u * np.diag(NonlinearSampleSystemConfigModule.R)
|
return 2. * u * np.diag(NonlinearSampleSystemConfigModule.R)
|
||||||
|
|
||||||
@ staticmethod
|
@staticmethod
|
||||||
def hessian_cost_fn_with_state(x, g_x, terminal=False):
|
def hessian_cost_fn_with_state(x, g_x, terminal=False):
|
||||||
""" hessian costs with respect to the state
|
""" hessian costs with respect to the state
|
||||||
|
|
||||||
|
@ -187,7 +196,7 @@ class NonlinearSampleSystemConfigModule():
|
||||||
|
|
||||||
return hessian[np.newaxis, :, :]
|
return hessian[np.newaxis, :, :]
|
||||||
|
|
||||||
@ staticmethod
|
@staticmethod
|
||||||
def hessian_cost_fn_with_input(x, u):
|
def hessian_cost_fn_with_input(x, u):
|
||||||
""" hessian costs with respect to the input
|
""" hessian costs with respect to the input
|
||||||
|
|
||||||
|
@ -202,7 +211,7 @@ class NonlinearSampleSystemConfigModule():
|
||||||
|
|
||||||
return np.tile(NonlinearSampleSystemConfigModule.R, (pred_len, 1, 1))
|
return np.tile(NonlinearSampleSystemConfigModule.R, (pred_len, 1, 1))
|
||||||
|
|
||||||
@ staticmethod
|
@staticmethod
|
||||||
def hessian_cost_fn_with_input_state(x, u):
|
def hessian_cost_fn_with_input_state(x, u):
|
||||||
""" hessian costs with respect to the state and input
|
""" hessian costs with respect to the state and input
|
||||||
|
|
||||||
|
@ -217,3 +226,71 @@ class NonlinearSampleSystemConfigModule():
|
||||||
(pred_len, input_size) = u.shape
|
(pred_len, input_size) = u.shape
|
||||||
|
|
||||||
return np.zeros((pred_len, input_size, state_size))
|
return np.zeros((pred_len, input_size, state_size))
|
||||||
|
|
||||||
|
@staticmethod
|
||||||
|
def gradient_hamiltonian_input(x, lam, u, g_x):
|
||||||
|
"""
|
||||||
|
|
||||||
|
Args:
|
||||||
|
x (numpy.ndarray): shape(pred_len+1, state_size)
|
||||||
|
lam (numpy.ndarray): shape(pred_len, state_size)
|
||||||
|
u (numpy.ndarray): shape(pred_len, input_size)
|
||||||
|
g_xs (numpy.ndarray): shape(pred_len, state_size)
|
||||||
|
|
||||||
|
Returns:
|
||||||
|
F (numpy.ndarray), shape(pred_len, input_size)
|
||||||
|
"""
|
||||||
|
if len(x.shape) == 1:
|
||||||
|
input_size = u.shape[0]
|
||||||
|
F = np.zeros(input_size)
|
||||||
|
F[0] = u[0] + lam[1]
|
||||||
|
|
||||||
|
return F
|
||||||
|
|
||||||
|
elif len(x.shape) == 2:
|
||||||
|
pred_len, input_size = u.shape
|
||||||
|
F = np.zeros((pred_len, input_size))
|
||||||
|
|
||||||
|
for i in range(pred_len):
|
||||||
|
F[i, 0] = u[i, 0] + lam[i, 1]
|
||||||
|
|
||||||
|
return F
|
||||||
|
|
||||||
|
else:
|
||||||
|
raise NotImplementedError
|
||||||
|
|
||||||
|
@staticmethod
|
||||||
|
def gradient_hamiltonian_state(x, lam, u, g_x):
|
||||||
|
"""
|
||||||
|
Args:
|
||||||
|
x (numpy.ndarray): shape(pred_len+1, state_size)
|
||||||
|
lam (numpy.ndarray): shape(pred_len, state_size)
|
||||||
|
u (numpy.ndarray): shape(pred_len, input_size)
|
||||||
|
g_xs (numpy.ndarray): shape(pred_len, state_size)
|
||||||
|
|
||||||
|
Returns:
|
||||||
|
lam_dot (numpy.ndarray), shape(state_size, )
|
||||||
|
"""
|
||||||
|
if len(lam.shape) == 1:
|
||||||
|
state_size = lam.shape[0]
|
||||||
|
lam_dot = np.zeros(state_size)
|
||||||
|
lam_dot[0] = x[0] - (2. * x[0] * x[1] + 1.) * lam[1]
|
||||||
|
lam_dot[1] = x[1] + lam[0] + \
|
||||||
|
(-3. * (x[1]**2) - x[0]**2 + 1.) * lam[1]
|
||||||
|
|
||||||
|
return lam_dot
|
||||||
|
|
||||||
|
elif len(lam.shape) == 2:
|
||||||
|
pred_len, state_size = lam.shape
|
||||||
|
lam_dot = np.zeros((pred_len, state_size))
|
||||||
|
|
||||||
|
for i in range(pred_len):
|
||||||
|
lam_dot[i, 0] = x[i, 0] - \
|
||||||
|
(2. * x[i, 0] * x[i, 1] + 1.) * lam[i, 1]
|
||||||
|
lam_dot[i, 1] = x[i, 1] + lam[i, 0] + \
|
||||||
|
(-3. * (x[i, 1]**2) - x[i, 0]**2 + 1.) * lam[i, 1]
|
||||||
|
|
||||||
|
return lam_dot
|
||||||
|
|
||||||
|
else:
|
||||||
|
raise NotImplementedError
|
||||||
|
|
|
@ -23,9 +23,15 @@ class TwoWheeledConfigModule():
|
||||||
Sf = np.diag([5., 5., 1.])
|
Sf = np.diag([5., 5., 1.])
|
||||||
"""
|
"""
|
||||||
# for track goal
|
# for track goal
|
||||||
|
"""
|
||||||
R = np.diag([0.01, 0.01])
|
R = np.diag([0.01, 0.01])
|
||||||
Q = np.diag([2.5, 2.5, 0.01])
|
Q = np.diag([2.5, 2.5, 0.01])
|
||||||
Sf = np.diag([2.5, 2.5, 0.01])
|
Sf = np.diag([2.5, 2.5, 0.01])
|
||||||
|
"""
|
||||||
|
# for track goal to NMPC
|
||||||
|
R = np.diag([0.1, 0.1])
|
||||||
|
Q = np.diag([0.1, 0.1, 0.1])
|
||||||
|
Sf = np.diag([0.25, 0.25, 0.25])
|
||||||
|
|
||||||
# bounds
|
# bounds
|
||||||
INPUT_LOWER_BOUND = np.array([-1.5, -3.14])
|
INPUT_LOWER_BOUND = np.array([-1.5, -3.14])
|
||||||
|
@ -62,7 +68,7 @@ class TwoWheeledConfigModule():
|
||||||
"noise_sigma": 1.,
|
"noise_sigma": 1.,
|
||||||
},
|
},
|
||||||
"iLQR": {
|
"iLQR": {
|
||||||
"max_iter": 500,
|
"max_iters": 500,
|
||||||
"init_mu": 1.,
|
"init_mu": 1.,
|
||||||
"mu_min": 1e-6,
|
"mu_min": 1e-6,
|
||||||
"mu_max": 1e10,
|
"mu_max": 1e10,
|
||||||
|
@ -70,13 +76,18 @@ class TwoWheeledConfigModule():
|
||||||
"threshold": 1e-6,
|
"threshold": 1e-6,
|
||||||
},
|
},
|
||||||
"DDP": {
|
"DDP": {
|
||||||
"max_iter": 500,
|
"max_iters": 500,
|
||||||
"init_mu": 1.,
|
"init_mu": 1.,
|
||||||
"mu_min": 1e-6,
|
"mu_min": 1e-6,
|
||||||
"mu_max": 1e10,
|
"mu_max": 1e10,
|
||||||
"init_delta": 2.,
|
"init_delta": 2.,
|
||||||
"threshold": 1e-6,
|
"threshold": 1e-6,
|
||||||
},
|
},
|
||||||
|
"NMPC": {
|
||||||
|
"threshold": 1e-3,
|
||||||
|
"max_iters": 1000,
|
||||||
|
"learning_rate": 0.1
|
||||||
|
},
|
||||||
"NMPC-CGMRES": {
|
"NMPC-CGMRES": {
|
||||||
},
|
},
|
||||||
"NMPC-Newton": {
|
"NMPC-Newton": {
|
||||||
|
@ -232,3 +243,86 @@ class TwoWheeledConfigModule():
|
||||||
(pred_len, input_size) = u.shape
|
(pred_len, input_size) = u.shape
|
||||||
|
|
||||||
return np.zeros((pred_len, input_size, state_size))
|
return np.zeros((pred_len, input_size, state_size))
|
||||||
|
|
||||||
|
@staticmethod
|
||||||
|
def gradient_hamiltonian_input(x, lam, u, g_x):
|
||||||
|
"""
|
||||||
|
|
||||||
|
Args:
|
||||||
|
x (numpy.ndarray): shape(pred_len+1, state_size)
|
||||||
|
lam (numpy.ndarray): shape(pred_len, state_size)
|
||||||
|
u (numpy.ndarray): shape(pred_len, input_size)
|
||||||
|
g_xs (numpy.ndarray): shape(pred_len, state_size)
|
||||||
|
|
||||||
|
Returns:
|
||||||
|
F (numpy.ndarray), shape(pred_len, input_size)
|
||||||
|
"""
|
||||||
|
if len(x.shape) == 1:
|
||||||
|
input_size = u.shape[0]
|
||||||
|
F = np.zeros(input_size)
|
||||||
|
F[0] = u[0] * TwoWheeledConfigModule.R[0, 0] + \
|
||||||
|
lam[0] * np.cos(x[2]) + lam[1] * np.sin(x[2])
|
||||||
|
F[1] = u[1] * TwoWheeledConfigModule.R[1, 1] + lam[2]
|
||||||
|
|
||||||
|
return F
|
||||||
|
|
||||||
|
elif len(x.shape) == 2:
|
||||||
|
pred_len, input_size = u.shape
|
||||||
|
F = np.zeros((pred_len, input_size))
|
||||||
|
|
||||||
|
for i in range(pred_len):
|
||||||
|
F[i, 0] = u[i, 0] * TwoWheeledConfigModule.R[0, 0] + \
|
||||||
|
lam[i, 0] * np.cos(x[i, 2]) + lam[i, 1] * np.sin(x[i, 2])
|
||||||
|
F[i, 1] = u[i, 1] * TwoWheeledConfigModule.R[1, 1] + lam[i, 2]
|
||||||
|
|
||||||
|
return F
|
||||||
|
else:
|
||||||
|
raise NotImplementedError
|
||||||
|
|
||||||
|
@staticmethod
|
||||||
|
def gradient_hamiltonian_state(x, lam, u, g_x):
|
||||||
|
"""
|
||||||
|
Args:
|
||||||
|
x (numpy.ndarray): shape(pred_len+1, state_size)
|
||||||
|
lam (numpy.ndarray): shape(pred_len, state_size)
|
||||||
|
u (numpy.ndarray): shape(pred_len, input_size)
|
||||||
|
g_xs (numpy.ndarray): shape(pred_len, state_size)
|
||||||
|
|
||||||
|
Returns:
|
||||||
|
lam_dot (numpy.ndarray), shape(state_size, )
|
||||||
|
"""
|
||||||
|
if len(lam.shape) == 1:
|
||||||
|
state_size = lam.shape[0]
|
||||||
|
lam_dot = np.zeros(state_size)
|
||||||
|
lam_dot[0] = \
|
||||||
|
(x[0] - g_x[0]) * TwoWheeledConfigModule.Q[0, 0]
|
||||||
|
lam_dot[1] = \
|
||||||
|
(x[1] - g_x[1]) * TwoWheeledConfigModule.Q[1, 1]
|
||||||
|
|
||||||
|
relative_angle = fit_angle_in_range(x[2] - g_x[2])
|
||||||
|
lam_dot[2] = \
|
||||||
|
relative_angle * TwoWheeledConfigModule.Q[2, 2] \
|
||||||
|
- lam[0] * u[0] * np.sin(x[2]) \
|
||||||
|
+ lam[1] * u[0] * np.cos(x[2])
|
||||||
|
|
||||||
|
return lam_dot
|
||||||
|
|
||||||
|
elif len(lam.shape) == 2:
|
||||||
|
pred_len, state_size = lam.shape
|
||||||
|
lam_dot = np.zeros((pred_len, state_size))
|
||||||
|
|
||||||
|
for i in range(pred_len):
|
||||||
|
lam_dot[i, 0] = \
|
||||||
|
(x[i, 0] - g_x[i, 0]) * TwoWheeledConfigModule.Q[0, 0]
|
||||||
|
lam_dot[i, 1] = \
|
||||||
|
(x[i, 1] - g_x[i, 1]) * TwoWheeledConfigModule.Q[1, 1]
|
||||||
|
|
||||||
|
relative_angle = fit_angle_in_range(x[i, 2] - g_x[i, 2])
|
||||||
|
lam_dot[i, 2] = \
|
||||||
|
relative_angle * TwoWheeledConfigModule.Q[2, 2] \
|
||||||
|
- lam[i, 0] * u[i, 0] * np.sin(x[i, 2]) \
|
||||||
|
+ lam[i, 1] * u[i, 0] * np.cos(x[i, 2])
|
||||||
|
|
||||||
|
return lam_dot
|
||||||
|
else:
|
||||||
|
raise NotImplementedError
|
||||||
|
|
|
@ -40,7 +40,7 @@ class DDP(Controller):
|
||||||
config.hessian_cost_fn_with_input_state
|
config.hessian_cost_fn_with_input_state
|
||||||
|
|
||||||
# controller parameters
|
# controller parameters
|
||||||
self.max_iter = config.opt_config["DDP"]["max_iter"]
|
self.max_iters = config.opt_config["DDP"]["max_iters"]
|
||||||
self.init_mu = config.opt_config["DDP"]["init_mu"]
|
self.init_mu = config.opt_config["DDP"]["init_mu"]
|
||||||
self.mu = self.init_mu
|
self.mu = self.init_mu
|
||||||
self.mu_min = config.opt_config["DDP"]["mu_min"]
|
self.mu_min = config.opt_config["DDP"]["mu_min"]
|
||||||
|
@ -88,7 +88,7 @@ class DDP(Controller):
|
||||||
# line search param
|
# line search param
|
||||||
alphas = 1.1**(-np.arange(10)**2)
|
alphas = 1.1**(-np.arange(10)**2)
|
||||||
|
|
||||||
while opt_count < self.max_iter:
|
while opt_count < self.max_iters:
|
||||||
accepted_sol = False
|
accepted_sol = False
|
||||||
|
|
||||||
# forward
|
# forward
|
||||||
|
|
|
@ -38,7 +38,7 @@ class iLQR(Controller):
|
||||||
config.hessian_cost_fn_with_input_state
|
config.hessian_cost_fn_with_input_state
|
||||||
|
|
||||||
# controller parameters
|
# controller parameters
|
||||||
self.max_iter = config.opt_config["iLQR"]["max_iter"]
|
self.max_iters = config.opt_config["iLQR"]["max_iters"]
|
||||||
self.init_mu = config.opt_config["iLQR"]["init_mu"]
|
self.init_mu = config.opt_config["iLQR"]["init_mu"]
|
||||||
self.mu = self.init_mu
|
self.mu = self.init_mu
|
||||||
self.mu_min = config.opt_config["iLQR"]["mu_min"]
|
self.mu_min = config.opt_config["iLQR"]["mu_min"]
|
||||||
|
@ -81,7 +81,7 @@ class iLQR(Controller):
|
||||||
# line search param
|
# line search param
|
||||||
alphas = 1.1**(-np.arange(10)**2)
|
alphas = 1.1**(-np.arange(10)**2)
|
||||||
|
|
||||||
while opt_count < self.max_iter:
|
while opt_count < self.max_iters:
|
||||||
accepted_sol = False
|
accepted_sol = False
|
||||||
|
|
||||||
# forward
|
# forward
|
||||||
|
|
|
@ -5,6 +5,7 @@ from .mppi import MPPI
|
||||||
from .mppi_williams import MPPIWilliams
|
from .mppi_williams import MPPIWilliams
|
||||||
from .ilqr import iLQR
|
from .ilqr import iLQR
|
||||||
from .ddp import DDP
|
from .ddp import DDP
|
||||||
|
from .nmpc import NMPC
|
||||||
|
|
||||||
|
|
||||||
def make_controller(args, config, model):
|
def make_controller(args, config, model):
|
||||||
|
@ -23,5 +24,7 @@ def make_controller(args, config, model):
|
||||||
return iLQR(config, model)
|
return iLQR(config, model)
|
||||||
elif args.controller_type == "DDP":
|
elif args.controller_type == "DDP":
|
||||||
return DDP(config, model)
|
return DDP(config, model)
|
||||||
|
elif args.controller_type == "NMPC":
|
||||||
|
return NMPC(config, model)
|
||||||
|
|
||||||
raise ValueError("No controller: {}".format(args.controller_type))
|
raise ValueError("No controller: {}".format(args.controller_type))
|
||||||
|
|
|
@ -0,0 +1,74 @@
|
||||||
|
from logging import getLogger
|
||||||
|
|
||||||
|
import numpy as np
|
||||||
|
import scipy.stats as stats
|
||||||
|
|
||||||
|
from .controller import Controller
|
||||||
|
from ..envs.cost import calc_cost
|
||||||
|
|
||||||
|
logger = getLogger(__name__)
|
||||||
|
|
||||||
|
|
||||||
|
class NMPC(Controller):
|
||||||
|
def __init__(self, config, model):
|
||||||
|
""" Nonlinear Model Predictive Control using pure gradient algorithm
|
||||||
|
"""
|
||||||
|
super(NMPC, self).__init__(config, model)
|
||||||
|
|
||||||
|
# model
|
||||||
|
self.model = model
|
||||||
|
|
||||||
|
# get cost func
|
||||||
|
self.state_cost_fn = config.state_cost_fn
|
||||||
|
self.terminal_state_cost_fn = config.terminal_state_cost_fn
|
||||||
|
self.input_cost_fn = config.input_cost_fn
|
||||||
|
|
||||||
|
# controller parameters
|
||||||
|
self.threshold = config.opt_config["NMPC"]["threshold"]
|
||||||
|
self.max_iters = config.opt_config["NMPC"]["max_iters"]
|
||||||
|
self.learning_rate = config.opt_config["NMPC"]["learning_rate"]
|
||||||
|
|
||||||
|
# general parameters
|
||||||
|
self.pred_len = config.PRED_LEN
|
||||||
|
self.input_size = config.INPUT_SIZE
|
||||||
|
self.dt = config.DT
|
||||||
|
|
||||||
|
# initialize
|
||||||
|
self.prev_sol = np.zeros((self.pred_len, self.input_size))
|
||||||
|
|
||||||
|
def obtain_sol(self, curr_x, g_xs):
|
||||||
|
""" calculate the optimal inputs
|
||||||
|
Args:
|
||||||
|
curr_x (numpy.ndarray): current state, shape(state_size, )
|
||||||
|
g_xs (numpy.ndarrya): goal trajectory, shape(plan_len, state_size)
|
||||||
|
Returns:
|
||||||
|
opt_input (numpy.ndarray): optimal input, shape(input_size, )
|
||||||
|
"""
|
||||||
|
sol = self.prev_sol.copy()
|
||||||
|
count = 0
|
||||||
|
|
||||||
|
while True:
|
||||||
|
# shape(pred_len+1, state_size)
|
||||||
|
pred_xs = self.model.predict_traj(curr_x, sol)
|
||||||
|
# shape(pred_len, state_size)
|
||||||
|
pred_lams = self.model.predict_adjoint_traj(pred_xs, sol, g_xs)
|
||||||
|
|
||||||
|
F_hat = self.config.gradient_hamiltonian_input(
|
||||||
|
pred_xs, pred_lams, sol, g_xs)
|
||||||
|
|
||||||
|
if np.linalg.norm(F_hat) < self.threshold:
|
||||||
|
break
|
||||||
|
|
||||||
|
if count > self.max_iters:
|
||||||
|
logger.debug(" break max iteartion at F : `{}".format(
|
||||||
|
np.linalg.norm(F_hat)))
|
||||||
|
break
|
||||||
|
|
||||||
|
sol -= self.learning_rate * F_hat
|
||||||
|
count += 1
|
||||||
|
|
||||||
|
# update us for next optimization
|
||||||
|
self.prev_sol = np.concatenate(
|
||||||
|
(sol[1:], np.zeros((1, self.input_size))), axis=0)
|
||||||
|
|
||||||
|
return sol[0]
|
|
@ -58,7 +58,9 @@ class TwoWheeledConstEnv(Env):
|
||||||
"""
|
"""
|
||||||
self.step_count = 0
|
self.step_count = 0
|
||||||
|
|
||||||
self.curr_x = np.zeros(self.config["state_size"])
|
noise = np.clip(np.random.randn(3), -0.1, 0.1)
|
||||||
|
noise *= 0.01
|
||||||
|
self.curr_x = np.zeros(self.config["state_size"]) + noise
|
||||||
|
|
||||||
if init_x is not None:
|
if init_x is not None:
|
||||||
self.curr_x = init_x
|
self.curr_x = init_x
|
||||||
|
@ -252,7 +254,9 @@ class TwoWheeledTrackEnv(Env):
|
||||||
"""
|
"""
|
||||||
self.step_count = 0
|
self.step_count = 0
|
||||||
|
|
||||||
self.curr_x = np.zeros(self.config["state_size"])
|
noise = np.clip(np.random.randn(3), -0.1, 0.1)
|
||||||
|
noise *= 0.01
|
||||||
|
self.curr_x = np.zeros(self.config["state_size"]) + noise
|
||||||
|
|
||||||
if init_x is not None:
|
if init_x is not None:
|
||||||
self.curr_x = init_x
|
self.curr_x = init_x
|
||||||
|
|
|
@ -97,6 +97,9 @@ class Model():
|
||||||
Returns:
|
Returns:
|
||||||
lams (numpy.ndarray): adjoint state, shape(pred_len, state_size),
|
lams (numpy.ndarray): adjoint state, shape(pred_len, state_size),
|
||||||
adjoint size is the same as state_size
|
adjoint size is the same as state_size
|
||||||
|
Notes:
|
||||||
|
Adjoint trajectory be computed by backward path.
|
||||||
|
Usually, we should -\dot{lam} but in backward path case, we can use \dot{lam} directry
|
||||||
"""
|
"""
|
||||||
# get size
|
# get size
|
||||||
(pred_len, input_size) = us.shape
|
(pred_len, input_size) = us.shape
|
||||||
|
@ -108,21 +111,21 @@ class Model():
|
||||||
for t in range(pred_len-1, 0, -1):
|
for t in range(pred_len-1, 0, -1):
|
||||||
prev_lam = \
|
prev_lam = \
|
||||||
self.predict_adjoint_state(lam, xs[t], us[t],
|
self.predict_adjoint_state(lam, xs[t], us[t],
|
||||||
goal=g_xs[t], t=t)
|
g_x=g_xs[t], t=t)
|
||||||
# update
|
# update
|
||||||
lams = np.concatenate((prev_lam[np.newaxis, :], lams), axis=0)
|
lams = np.concatenate((prev_lam[np.newaxis, :], lams), axis=0)
|
||||||
lam = prev_lam
|
lam = prev_lam
|
||||||
|
|
||||||
return lams
|
return lams
|
||||||
|
|
||||||
def predict_adjoint_state(self, lam, x, u, goal=None, t=None):
|
def predict_adjoint_state(self, lam, x, u, g_x=None, t=None):
|
||||||
""" predict adjoint states
|
""" predict adjoint states
|
||||||
|
|
||||||
Args:
|
Args:
|
||||||
lam (numpy.ndarray): adjoint state, shape(state_size, )
|
lam (numpy.ndarray): adjoint state, shape(state_size, )
|
||||||
x (numpy.ndarray): state, shape(state_size, )
|
x (numpy.ndarray): state, shape(state_size, )
|
||||||
u (numpy.ndarray): input, shape(input_size, )
|
u (numpy.ndarray): input, shape(input_size, )
|
||||||
goal (numpy.ndarray): goal state, shape(state_size, )
|
g_x (numpy.ndarray): goal state, shape(state_size, )
|
||||||
Returns:
|
Returns:
|
||||||
prev_lam (numpy.ndarrya): previous adjoint state,
|
prev_lam (numpy.ndarrya): previous adjoint state,
|
||||||
shape(state_size, )
|
shape(state_size, )
|
||||||
|
|
|
@ -13,6 +13,9 @@ class NonlinearSampleSystemModel(Model):
|
||||||
"""
|
"""
|
||||||
super(NonlinearSampleSystemModel, self).__init__()
|
super(NonlinearSampleSystemModel, self).__init__()
|
||||||
self.dt = config.DT
|
self.dt = config.DT
|
||||||
|
self.gradient_hamiltonian_state = config.gradient_hamiltonian_state
|
||||||
|
self.gradient_hamiltonian_input = config.gradient_hamiltonian_input
|
||||||
|
self.gradient_cost_fn_with_state = config.gradient_cost_fn_with_state
|
||||||
|
|
||||||
def predict_next_state(self, curr_x, u):
|
def predict_next_state(self, curr_x, u):
|
||||||
""" predict next state
|
""" predict next state
|
||||||
|
@ -43,6 +46,42 @@ class NonlinearSampleSystemModel(Model):
|
||||||
|
|
||||||
return next_x
|
return next_x
|
||||||
|
|
||||||
|
def predict_adjoint_state(self, lam, x, u, g_x=None, t=None):
|
||||||
|
""" predict adjoint states
|
||||||
|
|
||||||
|
Args:
|
||||||
|
lam (numpy.ndarray): adjoint state, shape(state_size, )
|
||||||
|
x (numpy.ndarray): state, shape(state_size, )
|
||||||
|
u (numpy.ndarray): input, shape(input_size, )
|
||||||
|
goal (numpy.ndarray): goal state, shape(state_size, )
|
||||||
|
Returns:
|
||||||
|
prev_lam (numpy.ndarrya): previous adjoint state,
|
||||||
|
shape(state_size, )
|
||||||
|
"""
|
||||||
|
if len(u.shape) == 1:
|
||||||
|
delta_lam = self.dt * \
|
||||||
|
self.gradient_hamiltonian_state(x, lam, u, g_x)
|
||||||
|
prev_lam = lam + delta_lam
|
||||||
|
return prev_lam
|
||||||
|
|
||||||
|
elif len(u.shape) == 2:
|
||||||
|
raise ValueError
|
||||||
|
|
||||||
|
def predict_terminal_adjoint_state(self, terminal_x, terminal_g_x=None):
|
||||||
|
""" predict terminal adjoint state
|
||||||
|
|
||||||
|
Args:
|
||||||
|
terminal_x (numpy.ndarray): terminal state, shape(state_size, )
|
||||||
|
terminal_g_x (numpy.ndarray): terminal goal state,
|
||||||
|
shape(state_size, )
|
||||||
|
Returns:
|
||||||
|
terminal_lam (numpy.ndarray): terminal adjoint state,
|
||||||
|
shape(state_size, )
|
||||||
|
"""
|
||||||
|
terminal_lam = self.gradient_cost_fn_with_state(
|
||||||
|
terminal_x, terminal_g_x, terminal=True) # return in shape[1, state_size]
|
||||||
|
return terminal_lam[0]
|
||||||
|
|
||||||
def _func_x_1(self, x, u, batch=False):
|
def _func_x_1(self, x, u, batch=False):
|
||||||
if not batch:
|
if not batch:
|
||||||
x_dot = x[1]
|
x_dot = x[1]
|
||||||
|
|
|
@ -12,6 +12,9 @@ class TwoWheeledModel(Model):
|
||||||
"""
|
"""
|
||||||
super(TwoWheeledModel, self).__init__()
|
super(TwoWheeledModel, self).__init__()
|
||||||
self.dt = config.DT
|
self.dt = config.DT
|
||||||
|
self.gradient_hamiltonian_state = config.gradient_hamiltonian_state
|
||||||
|
self.gradient_hamiltonian_input = config.gradient_hamiltonian_input
|
||||||
|
self.gradient_cost_fn_with_state = config.gradient_cost_fn_with_state
|
||||||
|
|
||||||
def predict_next_state(self, curr_x, u):
|
def predict_next_state(self, curr_x, u):
|
||||||
""" predict next state
|
""" predict next state
|
||||||
|
@ -53,6 +56,42 @@ class TwoWheeledModel(Model):
|
||||||
|
|
||||||
return next_x
|
return next_x
|
||||||
|
|
||||||
|
def predict_adjoint_state(self, lam, x, u, g_x=None, t=None):
|
||||||
|
""" predict adjoint states
|
||||||
|
|
||||||
|
Args:
|
||||||
|
lam (numpy.ndarray): adjoint state, shape(state_size, )
|
||||||
|
x (numpy.ndarray): state, shape(state_size, )
|
||||||
|
u (numpy.ndarray): input, shape(input_size, )
|
||||||
|
goal (numpy.ndarray): goal state, shape(state_size, )
|
||||||
|
Returns:
|
||||||
|
prev_lam (numpy.ndarrya): previous adjoint state,
|
||||||
|
shape(state_size, )
|
||||||
|
"""
|
||||||
|
if len(u.shape) == 1:
|
||||||
|
delta_lam = self.dt * \
|
||||||
|
self.gradient_hamiltonian_state(x, lam, u, g_x)
|
||||||
|
prev_lam = lam + delta_lam
|
||||||
|
return prev_lam
|
||||||
|
|
||||||
|
elif len(u.shape) == 2:
|
||||||
|
raise ValueError
|
||||||
|
|
||||||
|
def predict_terminal_adjoint_state(self, terminal_x, terminal_g_x=None):
|
||||||
|
""" predict terminal adjoint state
|
||||||
|
|
||||||
|
Args:
|
||||||
|
terminal_x (numpy.ndarray): terminal state, shape(state_size, )
|
||||||
|
terminal_g_x (numpy.ndarray): terminal goal state,
|
||||||
|
shape(state_size, )
|
||||||
|
Returns:
|
||||||
|
terminal_lam (numpy.ndarray): terminal adjoint state,
|
||||||
|
shape(state_size, )
|
||||||
|
"""
|
||||||
|
terminal_lam = self.gradient_cost_fn_with_state(
|
||||||
|
terminal_x, terminal_g_x, terminal=True) # return in shape[1, state_size]
|
||||||
|
return terminal_lam[0]
|
||||||
|
|
||||||
@staticmethod
|
@staticmethod
|
||||||
def calc_f_x(xs, us, dt):
|
def calc_f_x(xs, us, dt):
|
||||||
""" gradient of model with respect to the state in batch form
|
""" gradient of model with respect to the state in batch form
|
||||||
|
|
|
@ -13,31 +13,22 @@ from PythonLinearNonlinearControl.plotters.animator import Animator
|
||||||
|
|
||||||
|
|
||||||
def run(args):
|
def run(args):
|
||||||
# logger
|
|
||||||
make_logger(args.result_dir)
|
make_logger(args.result_dir)
|
||||||
|
|
||||||
# make envs
|
|
||||||
env = make_env(args)
|
env = make_env(args)
|
||||||
|
|
||||||
# make config
|
|
||||||
config = make_config(args)
|
config = make_config(args)
|
||||||
|
|
||||||
# make planner
|
|
||||||
planner = make_planner(args, config)
|
planner = make_planner(args, config)
|
||||||
|
|
||||||
# make model
|
|
||||||
model = make_model(args, config)
|
model = make_model(args, config)
|
||||||
|
|
||||||
# make controller
|
|
||||||
controller = make_controller(args, config, model)
|
controller = make_controller(args, config, model)
|
||||||
|
|
||||||
# make simulator
|
|
||||||
runner = make_runner(args)
|
runner = make_runner(args)
|
||||||
|
|
||||||
# run experiment
|
|
||||||
history_x, history_u, history_g = runner.run(env, controller, planner)
|
history_x, history_u, history_g = runner.run(env, controller, planner)
|
||||||
|
|
||||||
# plot results
|
|
||||||
plot_results(history_x, history_u, history_g=history_g, args=args)
|
plot_results(history_x, history_u, history_g=history_g, args=args)
|
||||||
save_plot_data(history_x, history_u, history_g=history_g, args=args)
|
save_plot_data(history_x, history_u, history_g=history_g, args=args)
|
||||||
|
|
||||||
|
@ -49,8 +40,8 @@ def run(args):
|
||||||
def main():
|
def main():
|
||||||
parser = argparse.ArgumentParser()
|
parser = argparse.ArgumentParser()
|
||||||
|
|
||||||
parser.add_argument("--controller_type", type=str, default="DDP")
|
parser.add_argument("--controller_type", type=str, default="NMPC")
|
||||||
parser.add_argument("--env", type=str, default="NonlinearSample")
|
parser.add_argument("--env", type=str, default="TwoWheeledTrack")
|
||||||
parser.add_argument("--save_anim", type=bool_flag, default=0)
|
parser.add_argument("--save_anim", type=bool_flag, default=0)
|
||||||
parser.add_argument("--result_dir", type=str, default="./result")
|
parser.add_argument("--result_dir", type=str, default="./result")
|
||||||
|
|
||||||
|
|
Loading…
Reference in New Issue