diff --git a/Environments.md b/Environments.md
new file mode 100644
index 0000000..bd14d23
--- /dev/null
+++ b/Environments.md
@@ -0,0 +1,56 @@
+# Enviroments
+
+| Name | Linear | Nonlinear | State Size | Input size |
+|:----------|:---------------:|:----------------:|:----------------:|:----------------:|
+| First Order Lag System | ✓ | x | 4 | 2 |
+| Two wheeled System (Constant Goal) | x | ✓ | 3 | 2 |
+| Two wheeled System (Moving Goal) (Coming soon) | x | ✓ | 3 | 2 |
+| Cartpole (Swing up) | x | ✓ | 4 | 1 |
+
+## FistOrderLagEnv
+
+### System equation.
+
+
+
+You can set arbinatry time constant, tau. The default is 0.63 s
+
+### Cost.
+
+
+
+Q = diag[1., 1., 1., 1.],
+R = diag[1., 1.]
+
+X_g denote the goal states.
+
+## TwoWheeledEnv
+
+### System equation.
+
+
+
+### Cost.
+
+
+
+Q = diag[5., 5., 1.],
+R = diag[0.1, 0.1]
+
+X_g denote the goal states.
+
+## CatpoleEnv (Swing up)
+
+System equation.
+
+
+
+You can set arbinatry parameters, mc, mp, l and g.
+
+Default settings are as follows:
+
+mc = 1, mp = 0.2, l = 0.5, g = 9.81
+
+### Cost.
+
+
\ No newline at end of file
diff --git a/PythonLinearNonlinearControl/common/utils.py b/PythonLinearNonlinearControl/common/utils.py
index 07ff604..a22b22b 100644
--- a/PythonLinearNonlinearControl/common/utils.py
+++ b/PythonLinearNonlinearControl/common/utils.py
@@ -1,2 +1 @@
import numpy as np
-
diff --git a/PythonLinearNonlinearControl/configs/cartpole.py b/PythonLinearNonlinearControl/configs/cartpole.py
new file mode 100644
index 0000000..64a78db
--- /dev/null
+++ b/PythonLinearNonlinearControl/configs/cartpole.py
@@ -0,0 +1,218 @@
+import numpy as np
+
+class CartPoleConfigModule():
+ # parameters
+ ENV_NAME = "CartPole-v0"
+ TYPE = "Nonlinear"
+ TASK_HORIZON = 500
+ PRED_LEN = 50
+ STATE_SIZE = 4
+ INPUT_SIZE = 1
+ DT = 0.02
+ # cost parameters
+ R = np.diag([0.01])
+ # bounds
+ INPUT_LOWER_BOUND = np.array([-3.])
+ INPUT_UPPER_BOUND = np.array([3.])
+ # parameters
+ MP = 0.2
+ MC = 1.
+ L = 0.5
+ G = 9.81
+
+ def __init__(self):
+ """
+ """
+ # opt configs
+ self.opt_config = {
+ "Random": {
+ "popsize": 5000
+ },
+ "CEM": {
+ "popsize": 500,
+ "num_elites": 50,
+ "max_iters": 15,
+ "alpha": 0.3,
+ "init_var":9.,
+ "threshold":0.001
+ },
+ "MPPI":{
+ "beta" : 0.6,
+ "popsize": 5000,
+ "kappa": 0.9,
+ "noise_sigma": 0.5,
+ },
+ "MPPIWilliams":{
+ "popsize": 5000,
+ "lambda": 1.,
+ "noise_sigma": 0.9,
+ },
+ "iLQR":{
+ "max_iter": 500,
+ "init_mu": 1.,
+ "mu_min": 1e-6,
+ "mu_max": 1e10,
+ "init_delta": 2.,
+ "threshold": 1e-6,
+ },
+ "DDP":{
+ "max_iter": 500,
+ "init_mu": 1.,
+ "mu_min": 1e-6,
+ "mu_max": 1e10,
+ "init_delta": 2.,
+ "threshold": 1e-6,
+ },
+ "NMPC-CGMRES":{
+ },
+ "NMPC-Newton":{
+ },
+ }
+
+ @staticmethod
+ def input_cost_fn(u):
+ """ input cost functions
+ Args:
+ u (numpy.ndarray): input, shape(pred_len, input_size)
+ or shape(pop_size, pred_len, input_size)
+ Returns:
+ cost (numpy.ndarray): cost of input, shape(pred_len, input_size) or
+ shape(pop_size, pred_len, input_size)
+ """
+ return (u**2) * np.diag(CartPoleConfigModule.R)
+
+ @staticmethod
+ def state_cost_fn(x, g_x):
+ """ state cost function
+ Args:
+ x (numpy.ndarray): state, shape(pred_len, state_size)
+ or shape(pop_size, pred_len, state_size)
+ g_x (numpy.ndarray): goal state, shape(pred_len, state_size)
+ or shape(pop_size, pred_len, state_size)
+ Returns:
+ cost (numpy.ndarray): cost of state, shape(pred_len, 1) or
+ shape(pop_size, pred_len, 1)
+ """
+
+ if len(x.shape) > 2:
+ return (6. * (x[:, :, 0]**2) \
+ + 12. * ((np.cos(x[:, :, 2]) + 1.)**2) \
+ + 0.1 * (x[:, :, 1]**2) \
+ + 0.1 * (x[:, :, 3]**2))[:, :, np.newaxis]
+
+ elif len(x.shape) > 1:
+ return (6. * (x[:, 0]**2) \
+ + 12. * ((np.cos(x[:, 2]) + 1.)**2) \
+ + 0.1 * (x[:, 1]**2) \
+ + 0.1 * (x[:, 3]**2))[:, np.newaxis]
+
+ return 6. * (x[0]**2) \
+ + 12. * ((np.cos(x[2]) + 1.)**2) \
+ + 0.1 * (x[1]**2) \
+ + 0.1 * (x[3]**2)
+
+ @staticmethod
+ def terminal_state_cost_fn(terminal_x, terminal_g_x):
+ """
+ Args:
+ terminal_x (numpy.ndarray): terminal state,
+ shape(state_size, ) or shape(pop_size, state_size)
+ terminal_g_x (numpy.ndarray): terminal goal state,
+ shape(state_size, ) or shape(pop_size, state_size)
+ Returns:
+ cost (numpy.ndarray): cost of state, shape(pred_len, ) or
+ shape(pop_size, pred_len)
+ """
+
+ if len(terminal_x.shape) > 1:
+ return (6. * (terminal_x[:, 0]**2) \
+ + 12. * ((np.cos(terminal_x[:, 2]) + 1.)**2) \
+ + 0.1 * (terminal_x[:, 1]**2) \
+ + 0.1 * (terminal_x[:, 3]**2))[:, np.newaxis]
+
+ return 6. * (terminal_x[0]**2) \
+ + 12. * ((np.cos(terminal_x[2]) + 1.)**2) \
+ + 0.1 * (terminal_x[1]**2) \
+ + 0.1 * (terminal_x[3]**2)
+
+ @staticmethod
+ def gradient_cost_fn_with_state(x, g_x, terminal=False):
+ """ gradient of costs with respect to the state
+
+ Args:
+ x (numpy.ndarray): state, shape(pred_len, state_size)
+ g_x (numpy.ndarray): goal state, shape(pred_len, state_size)
+
+ Returns:
+ l_x (numpy.ndarray): gradient of cost, shape(pred_len, state_size)
+ or shape(1, state_size)
+ """
+ if not terminal:
+ return None
+
+ return None
+
+ @staticmethod
+ def gradient_cost_fn_with_input(x, u):
+ """ gradient of costs with respect to the input
+
+ Args:
+ x (numpy.ndarray): state, shape(pred_len, state_size)
+ u (numpy.ndarray): goal state, shape(pred_len, input_size)
+
+ Returns:
+ l_u (numpy.ndarray): gradient of cost, shape(pred_len, input_size)
+ """
+ return None
+
+ @staticmethod
+ def hessian_cost_fn_with_state(x, g_x, terminal=False):
+ """ hessian costs with respect to the state
+
+ Args:
+ x (numpy.ndarray): state, shape(pred_len, state_size)
+ g_x (numpy.ndarray): goal state, shape(pred_len, state_size)
+
+ Returns:
+ l_xx (numpy.ndarray): gradient of cost,
+ shape(pred_len, state_size, state_size) or
+ shape(1, state_size, state_size) or
+ """
+ if not terminal:
+ (pred_len, _) = x.shape
+ return None
+
+ return None
+
+ @staticmethod
+ def hessian_cost_fn_with_input(x, u):
+ """ hessian costs with respect to the input
+
+ Args:
+ x (numpy.ndarray): state, shape(pred_len, state_size)
+ u (numpy.ndarray): goal state, shape(pred_len, input_size)
+
+ Returns:
+ l_uu (numpy.ndarray): gradient of cost,
+ shape(pred_len, input_size, input_size)
+ """
+ (pred_len, _) = u.shape
+
+ return None
+
+ @staticmethod
+ def hessian_cost_fn_with_input_state(x, u):
+ """ hessian costs with respect to the state and input
+
+ Args:
+ x (numpy.ndarray): state, shape(pred_len, state_size)
+ u (numpy.ndarray): goal state, shape(pred_len, input_size)
+
+ Returns:
+ l_ux (numpy.ndarray): gradient of cost ,
+ shape(pred_len, input_size, state_size)
+ """
+ (_, state_size) = x.shape
+ (pred_len, input_size) = u.shape
+
+ return np.zeros((pred_len, input_size, state_size))
\ No newline at end of file
diff --git a/PythonLinearNonlinearControl/configs/first_order_lag.py b/PythonLinearNonlinearControl/configs/first_order_lag.py
index 7726a77..1ad59f6 100644
--- a/PythonLinearNonlinearControl/configs/first_order_lag.py
+++ b/PythonLinearNonlinearControl/configs/first_order_lag.py
@@ -5,7 +5,7 @@ class FirstOrderLagConfigModule():
ENV_NAME = "FirstOrderLag-v0"
TYPE = "Linear"
TASK_HORIZON = 1000
- PRED_LEN = 10
+ PRED_LEN = 50
STATE_SIZE = 4
INPUT_SIZE = 2
DT = 0.05
@@ -43,8 +43,33 @@ class FirstOrderLagConfigModule():
"kappa": 0.9,
"noise_sigma": 0.5,
},
+ "MPPIWilliams":{
+ "popsize": 5000,
+ "lambda": 1.,
+ "noise_sigma": 0.9,
+ },
"MPC":{
- }
+ },
+ "iLQR":{
+ "max_iter": 500,
+ "init_mu": 1.,
+ "mu_min": 1e-6,
+ "mu_max": 1e10,
+ "init_delta": 2.,
+ "threshold": 1e-6,
+ },
+ "DDP":{
+ "max_iter": 500,
+ "init_mu": 1.,
+ "mu_min": 1e-6,
+ "mu_max": 1e10,
+ "init_delta": 2.,
+ "threshold": 1e-6,
+ },
+ "NMPC-CGMRES":{
+ },
+ "NMPC-Newton":{
+ },
}
@staticmethod
@@ -86,4 +111,89 @@ class FirstOrderLagConfigModule():
shape(pop_size, pred_len)
"""
return ((terminal_x - terminal_g_x)**2) \
- * np.diag(FirstOrderLagConfigModule.Sf)
\ No newline at end of file
+ * np.diag(FirstOrderLagConfigModule.Sf)
+
+ @staticmethod
+ def gradient_cost_fn_with_state(x, g_x, terminal=False):
+ """ gradient of costs with respect to the state
+
+ Args:
+ x (numpy.ndarray): state, shape(pred_len, state_size)
+ g_x (numpy.ndarray): goal state, shape(pred_len, state_size)
+
+ Returns:
+ l_x (numpy.ndarray): gradient of cost, shape(pred_len, state_size)
+ or shape(1, state_size)
+ """
+ if not terminal:
+ return 2. * (x - g_x) * np.diag(FirstOrderLagConfigModule.Q)
+
+ return (2. * (x - g_x) \
+ * np.diag(FirstOrderLagConfigModule.Sf))[np.newaxis, :]
+
+ @staticmethod
+ def gradient_cost_fn_with_input(x, u):
+ """ gradient of costs with respect to the input
+
+ Args:
+ x (numpy.ndarray): state, shape(pred_len, state_size)
+ u (numpy.ndarray): goal state, shape(pred_len, input_size)
+
+ Returns:
+ l_u (numpy.ndarray): gradient of cost, shape(pred_len, input_size)
+ """
+ return 2. * u * np.diag(FirstOrderLagConfigModule.R)
+
+ @staticmethod
+ def hessian_cost_fn_with_state(x, g_x, terminal=False):
+ """ hessian costs with respect to the state
+
+ Args:
+ x (numpy.ndarray): state, shape(pred_len, state_size)
+ g_x (numpy.ndarray): goal state, shape(pred_len, state_size)
+
+ Returns:
+ l_xx (numpy.ndarray): gradient of cost,
+ shape(pred_len, state_size, state_size) or
+ shape(1, state_size, state_size) or
+ """
+ if not terminal:
+ (pred_len, _) = x.shape
+ return -g_x[:, :, np.newaxis] \
+ * np.tile(2.*FirstOrderLagConfigModule.Q, (pred_len, 1, 1))
+
+ return -g_x[:, np.newaxis] \
+ * np.tile(2.*FirstOrderLagConfigModule.Sf, (1, 1, 1))
+
+ @staticmethod
+ def hessian_cost_fn_with_input(x, u):
+ """ hessian costs with respect to the input
+
+ Args:
+ x (numpy.ndarray): state, shape(pred_len, state_size)
+ u (numpy.ndarray): goal state, shape(pred_len, input_size)
+
+ Returns:
+ l_uu (numpy.ndarray): gradient of cost,
+ shape(pred_len, input_size, input_size)
+ """
+ (pred_len, _) = u.shape
+
+ return np.tile(2.*FirstOrderLagConfigModule.R, (pred_len, 1, 1))
+
+ @staticmethod
+ def hessian_cost_fn_with_input_state(x, u):
+ """ hessian costs with respect to the state and input
+
+ Args:
+ x (numpy.ndarray): state, shape(pred_len, state_size)
+ u (numpy.ndarray): goal state, shape(pred_len, input_size)
+
+ Returns:
+ l_ux (numpy.ndarray): gradient of cost ,
+ shape(pred_len, input_size, state_size)
+ """
+ (_, state_size) = x.shape
+ (pred_len, input_size) = u.shape
+
+ return np.zeros((pred_len, input_size, state_size))
diff --git a/PythonLinearNonlinearControl/configs/make_configs.py b/PythonLinearNonlinearControl/configs/make_configs.py
index 87e3709..984df94 100644
--- a/PythonLinearNonlinearControl/configs/make_configs.py
+++ b/PythonLinearNonlinearControl/configs/make_configs.py
@@ -1,5 +1,6 @@
from .first_order_lag import FirstOrderLagConfigModule
from .two_wheeled import TwoWheeledConfigModule
+from .cartpole import CartPoleConfigModule
def make_config(args):
"""
@@ -9,4 +10,6 @@ def make_config(args):
if args.env == "FirstOrderLag":
return FirstOrderLagConfigModule()
elif args.env == "TwoWheeledConst" or args.env == "TwoWheeled":
- return TwoWheeledConfigModule()
\ No newline at end of file
+ return TwoWheeledConfigModule()
+ elif args.env == "CartPole":
+ return CartPoleConfigModule()
\ No newline at end of file
diff --git a/PythonLinearNonlinearControl/configs/two_wheeled.py b/PythonLinearNonlinearControl/configs/two_wheeled.py
index 9167ca6..27e9834 100644
--- a/PythonLinearNonlinearControl/configs/two_wheeled.py
+++ b/PythonLinearNonlinearControl/configs/two_wheeled.py
@@ -39,6 +39,11 @@ class TwoWheeledConfigModule():
"kappa": 0.9,
"noise_sigma": 0.5,
},
+ "MPPIWilliams":{
+ "popsize": 5000,
+ "lambda": 1,
+ "noise_sigma": 1.,
+ },
"iLQR":{
"max_iter": 500,
"init_mu": 1.,
diff --git a/PythonLinearNonlinearControl/controllers/ddp.py b/PythonLinearNonlinearControl/controllers/ddp.py
index ac04b7c..4abb229 100644
--- a/PythonLinearNonlinearControl/controllers/ddp.py
+++ b/PythonLinearNonlinearControl/controllers/ddp.py
@@ -23,10 +23,6 @@ class DDP(Controller):
"""
super(DDP, self).__init__(config, model)
- if config.TYPE != "Nonlinear":
- raise ValueError("{} could be not applied to \
- this controller".format(model))
-
# model
self.model = model
@@ -296,6 +292,7 @@ class DDP(Controller):
def backward(self, f_x, f_u, f_xx, f_ux, f_uu, l_x, l_xx, l_u, l_uu, l_ux):
""" backward step of iLQR
+
Args:
f_x (numpy.ndarray): gradient of model with respecto to state,
shape(pred_len+1, state_size, state_size)
@@ -317,7 +314,6 @@ class DDP(Controller):
shape(pred_len, input_size, input_size)
l_ux (numpy.ndarray): hessian of cost with respect
to state and input, shape(pred_len, input_size, state_size)
-
Returns:
k (numpy.ndarray): gain, shape(pred_len, input_size)
K (numpy.ndarray): gain, shape(pred_len, input_size, state_size)
@@ -353,7 +349,8 @@ class DDP(Controller):
def _Q(self, f_x, f_u, f_xx, f_ux, f_uu,
l_x, l_u, l_xx, l_ux, l_uu, V_x, V_xx):
- """Computes second order expansion.
+ """ compute Q function valued
+
Args:
f_x (numpy.ndarray): gradient of model with respecto to state,
shape(state_size, state_size)
diff --git a/PythonLinearNonlinearControl/controllers/ilqr.py b/PythonLinearNonlinearControl/controllers/ilqr.py
index 40be9e9..a676ade 100644
--- a/PythonLinearNonlinearControl/controllers/ilqr.py
+++ b/PythonLinearNonlinearControl/controllers/ilqr.py
@@ -21,10 +21,6 @@ class iLQR(Controller):
"""
super(iLQR, self).__init__(config, model)
- if config.TYPE != "Nonlinear":
- raise ValueError("{} could be not applied to \
- this controller".format(model))
-
# model
self.model = model
diff --git a/PythonLinearNonlinearControl/controllers/make_controllers.py b/PythonLinearNonlinearControl/controllers/make_controllers.py
index 99653d9..74d048c 100644
--- a/PythonLinearNonlinearControl/controllers/make_controllers.py
+++ b/PythonLinearNonlinearControl/controllers/make_controllers.py
@@ -2,6 +2,7 @@ from .mpc import LinearMPC
from .cem import CEM
from .random import RandomShooting
from .mppi import MPPI
+from .mppi_williams import MPPIWilliams
from .ilqr import iLQR
from .ddp import DDP
@@ -15,6 +16,8 @@ def make_controller(args, config, model):
return RandomShooting(config, model)
elif args.controller_type == "MPPI":
return MPPI(config, model)
+ elif args.controller_type == "MPPIWilliams":
+ return MPPIWilliams(config, model)
elif args.controller_type == "iLQR":
return iLQR(config, model)
elif args.controller_type == "DDP":
diff --git a/PythonLinearNonlinearControl/controllers/mppi_williams.py b/PythonLinearNonlinearControl/controllers/mppi_williams.py
new file mode 100644
index 0000000..1fd0102
--- /dev/null
+++ b/PythonLinearNonlinearControl/controllers/mppi_williams.py
@@ -0,0 +1,143 @@
+from logging import getLogger
+
+import numpy as np
+import scipy.stats as stats
+
+from .controller import Controller
+from ..envs.cost import calc_cost
+
+logger = getLogger(__name__)
+
+class MPPIWilliams(Controller):
+ """ Model Predictive Path Integral for linear and nonlinear method
+
+ Attributes:
+ history_u (list[numpy.ndarray]): time history of optimal input
+ Ref:
+ G. Williams et al., "Information theoretic MPC
+ for model-based reinforcement learning,"
+ 2017 IEEE International Conference on Robotics and Automation (ICRA),
+ Singapore, 2017, pp. 1714-1721.
+ """
+ def __init__(self, config, model):
+ super(MPPIWilliams, self).__init__(config, model)
+
+ # model
+ self.model = model
+
+ # general parameters
+ self.pred_len = config.PRED_LEN
+ self.input_size = config.INPUT_SIZE
+
+ # mppi parameters
+ self.pop_size = config.opt_config["MPPIWilliams"]["popsize"]
+ self.lam = config.opt_config["MPPIWilliams"]["lambda"]
+ self.noise_sigma = config.opt_config["MPPIWilliams"]["noise_sigma"]
+ self.opt_dim = self.input_size * self.pred_len
+
+ # get bound
+ self.input_upper_bounds = np.tile(config.INPUT_UPPER_BOUND,
+ (self.pred_len, 1))
+ self.input_lower_bounds = np.tile(config.INPUT_LOWER_BOUND,
+ (self.pred_len, 1))
+
+ # get cost func
+ self.state_cost_fn = config.state_cost_fn
+ self.terminal_state_cost_fn = config.terminal_state_cost_fn
+ self.input_cost_fn = config.input_cost_fn
+
+ # init mean
+ self.prev_sol = np.tile((config.INPUT_UPPER_BOUND \
+ + config.INPUT_LOWER_BOUND) / 2.,
+ self.pred_len)
+ self.prev_sol = self.prev_sol.reshape(self.pred_len, self.input_size)
+
+ # save
+ self.history_u = [np.zeros(self.input_size)]
+
+ def clear_sol(self):
+ """ clear prev sol
+ """
+ logger.debug("Clear Solution")
+ self.prev_sol = \
+ (self.input_upper_bounds + self.input_lower_bounds) / 2.
+ self.prev_sol = self.prev_sol.reshape(self.pred_len, self.input_size)
+
+ def calc_cost(self, curr_x, samples, g_xs):
+ """ calculate the cost of input samples by using MPPI's eq
+
+ Args:
+ curr_x (numpy.ndarray): shape(state_size),
+ current robot position
+ samples (numpy.ndarray): shape(pop_size, opt_dim),
+ input samples
+ g_xs (numpy.ndarray): shape(pred_len, state_size),
+ goal states
+ Returns:
+ costs (numpy.ndarray): shape(pop_size, )
+ """
+ # get size
+ pop_size = samples.shape[0]
+ g_xs = np.tile(g_xs, (pop_size, 1, 1))
+
+ # calc cost, pred_xs.shape = (pop_size, pred_len+1, state_size)
+ pred_xs = self.model.predict_traj(curr_x, samples)
+
+ # get particle cost
+ costs = calc_cost(pred_xs, samples, g_xs,
+ self.state_cost_fn, None, \
+ self.terminal_state_cost_fn)
+
+ return costs
+
+ def obtain_sol(self, curr_x, g_xs):
+ """ calculate the optimal inputs
+
+ Args:
+ curr_x (numpy.ndarray): current state, shape(state_size, )
+ g_xs (numpy.ndarrya): goal trajectory, shape(plan_len, state_size)
+ Returns:
+ opt_input (numpy.ndarray): optimal input, shape(input_size, )
+ """
+ # get noised inputs
+ noise = np.random.normal(
+ loc=0, scale=1.0, size=(self.pop_size, self.pred_len,
+ self.input_size)) * self.noise_sigma
+
+ noised_inputs = self.prev_sol + noise
+
+ # clip actions
+ noised_inputs = np.clip(
+ noised_inputs, self.input_lower_bounds, self.input_upper_bounds)
+
+ # calc cost
+ costs = self.calc_cost(curr_x, noised_inputs, g_xs)
+
+ costs += np.sum(np.sum(
+ self.lam * self.prev_sol * noise / self.noise_sigma,
+ axis=-1), axis=-1)
+
+ # mppi update
+ beta = np.min(costs)
+ eta = np.sum(np.exp(- 1. / self.lam * (costs - beta)), axis=0) \
+ + 1e-10
+
+ # weight
+ # eta.shape = (pred_len, input_size)
+ weights = np.exp(- 1. / self.lam * (costs - beta)) / eta
+
+ # update inputs
+ sol = self.prev_sol \
+ + np.sum(weights[:, np.newaxis, np.newaxis] * noise, axis=0)
+
+ # update
+ self.prev_sol[:-1] = sol[1:]
+ self.prev_sol[-1] = sol[-1] # last use the terminal input
+
+ # log
+ self.history_u.append(sol[0])
+
+ return sol[0]
+
+ def __str__(self):
+ return "MPPIWilliams"
\ No newline at end of file
diff --git a/PythonLinearNonlinearControl/envs/cartpole.py b/PythonLinearNonlinearControl/envs/cartpole.py
new file mode 100644
index 0000000..de9becb
--- /dev/null
+++ b/PythonLinearNonlinearControl/envs/cartpole.py
@@ -0,0 +1,114 @@
+import numpy as np
+
+from .env import Env
+
+class CartPoleEnv(Env):
+ """ Cartpole Environment
+
+ Ref :
+ https://ocw.mit.edu/courses/
+ electrical-engineering-and-computer-science/
+ 6-832-underactuated-robotics-spring-2009/readings/
+ MIT6_832s09_read_ch03.pdf
+ """
+ def __init__(self):
+ """
+ """
+ self.config = {"state_size" : 4,
+ "input_size" : 1,
+ "dt" : 0.02,
+ "max_step" : 500,
+ "input_lower_bound": [-3.],
+ "input_upper_bound": [3.],
+ "mp": 0.2,
+ "mc": 1.,
+ "l": 0.5,
+ "g": 9.81,
+ }
+
+ super(CartPoleEnv, self).__init__(self.config)
+
+ def reset(self, init_x=None):
+ """ reset state
+
+ Returns:
+ init_x (numpy.ndarray): initial state, shape(state_size, )
+ info (dict): information
+ """
+ self.step_count = 0
+
+ self.curr_x = np.array([0., 0., 0., 0.])
+
+ if init_x is not None:
+ self.curr_x = init_x
+
+ # goal
+ self.g_x = np.array([0., 0., -np.pi, 0.])
+
+ # clear memory
+ self.history_x = []
+ self.history_g_x = []
+
+ return self.curr_x, {"goal_state": self.g_x}
+
+ def step(self, u):
+ """ step environments
+
+ Args:
+ u (numpy.ndarray) : input, shape(input_size, )
+ Returns:
+ next_x (numpy.ndarray): next state, shape(state_size, )
+ cost (float): costs
+ done (bool): end the simulation or not
+ info (dict): information
+ """
+ # clip action
+ if self.config["input_lower_bound"] is not None:
+ u = np.clip(u,
+ self.config["input_lower_bound"],
+ self.config["input_upper_bound"])
+
+ # step
+ # x
+ d_x0 = self.curr_x[1]
+ # v_x
+ d_x1 = (u[0] + self.config["mp"] * np.sin(self.curr_x[2]) \
+ * (self.config["l"] * (self.curr_x[3]**2) \
+ + self.config["g"] * np.cos(self.curr_x[2]))) \
+ / (self.config["mc"] + self.config["mp"] \
+ * (np.sin(self.curr_x[2])**2))
+ # theta
+ d_x2 = self.curr_x[3]
+
+ # v_theta
+ d_x3 = (-u[0] * np.cos(self.curr_x[2]) \
+ - self.config["mp"] * self.config["l"] * (self.curr_x[3]**2) \
+ * np.cos(self.curr_x[2]) * np.sin(self.curr_x[2]) \
+ - (self.config["mc"] + self.config["mp"]) * self.config["g"] \
+ * np.sin(self.curr_x[2])) \
+ / (self.config["l"] * (self.config["mc"] + self.config["mp"] \
+ * (np.sin(self.curr_x[2])**2)))
+
+ next_x = self.curr_x +\
+ np.array([d_x0, d_x1, d_x2, d_x3]) * self.config["dt"]
+
+ # TODO: costs
+ costs = 0.
+ costs += 0.1 * np.sum(u**2)
+ costs += 6. * self.curr_x[0]**2 \
+ + 12. * (np.cos(self.curr_x[2]) + 1.)**2 \
+ + 0.1 * self.curr_x[1]**2 \
+ + 0.1 * self.curr_x[3]**2
+
+ # save history
+ self.history_x.append(next_x.flatten())
+ self.history_g_x.append(self.g_x.flatten())
+
+ # update
+ self.curr_x = next_x.flatten().copy()
+ # update costs
+ self.step_count += 1
+
+ return next_x.flatten(), costs, \
+ self.step_count > self.config["max_step"], \
+ {"goal_state" : self.g_x}
\ No newline at end of file
diff --git a/PythonLinearNonlinearControl/envs/cost.py b/PythonLinearNonlinearControl/envs/cost.py
index 5d10697..117d5f2 100644
--- a/PythonLinearNonlinearControl/envs/cost.py
+++ b/PythonLinearNonlinearControl/envs/cost.py
@@ -22,16 +22,22 @@ def calc_cost(pred_xs, input_sample, g_xs,
cost (numpy.ndarray): cost of the input sample, shape(pop_size, )
"""
# state cost
- state_pred_par_cost = state_cost_fn(pred_xs[:, 1:-1, :], g_xs[:, 1:-1, :])
- state_cost = np.sum(np.sum(state_pred_par_cost, axis=-1), axis=-1)
+ state_cost = 0.
+ if state_cost_fn is not None:
+ state_pred_par_cost = state_cost_fn(pred_xs[:, 1:-1, :], g_xs[:, 1:-1, :])
+ state_cost = np.sum(np.sum(state_pred_par_cost, axis=-1), axis=-1)
# terminal cost
- terminal_state_par_cost = terminal_state_cost_fn(pred_xs[:, -1, :],
- g_xs[:, -1, :])
- terminal_state_cost = np.sum(terminal_state_par_cost, axis=-1)
+ terminal_state_cost = 0.
+ if terminal_state_cost_fn is not None:
+ terminal_state_par_cost = terminal_state_cost_fn(pred_xs[:, -1, :],
+ g_xs[:, -1, :])
+ terminal_state_cost = np.sum(terminal_state_par_cost, axis=-1)
# act cost
- act_pred_par_cost = input_cost_fn(input_sample)
- act_cost = np.sum(np.sum(act_pred_par_cost, axis=-1), axis=-1)
+ act_cost = 0.
+ if input_cost_fn is not None:
+ act_pred_par_cost = input_cost_fn(input_sample)
+ act_cost = np.sum(np.sum(act_pred_par_cost, axis=-1), axis=-1)
return state_cost + terminal_state_cost + act_cost
\ No newline at end of file
diff --git a/PythonLinearNonlinearControl/envs/make_envs.py b/PythonLinearNonlinearControl/envs/make_envs.py
index debbf29..4b1adf7 100644
--- a/PythonLinearNonlinearControl/envs/make_envs.py
+++ b/PythonLinearNonlinearControl/envs/make_envs.py
@@ -1,5 +1,6 @@
from .first_order_lag import FirstOrderLagEnv
from .two_wheeled import TwoWheeledConstEnv
+from .cartpole import CartPoleEnv
def make_env(args):
@@ -7,5 +8,7 @@ def make_env(args):
return FirstOrderLagEnv()
elif args.env == "TwoWheeledConst":
return TwoWheeledConstEnv()
+ elif args.env == "CartPole":
+ return CartPoleEnv()
raise NotImplementedError("There is not {} Env".format(args.env))
\ No newline at end of file
diff --git a/PythonLinearNonlinearControl/envs/two_wheeled.py b/PythonLinearNonlinearControl/envs/two_wheeled.py
index c5194cd..8be0d36 100644
--- a/PythonLinearNonlinearControl/envs/two_wheeled.py
+++ b/PythonLinearNonlinearControl/envs/two_wheeled.py
@@ -86,7 +86,7 @@ class TwoWheeledConstEnv(Env):
# TODO: costs
costs = 0.
costs += 0.1 * np.sum(u**2)
- costs += np.sum((self.curr_x - self.g_x)**2)
+ costs += np.sum(((self.curr_x - self.g_x)**2) * np.array([5., 5., 1.]))
# save history
self.history_x.append(next_x.flatten())
diff --git a/PythonLinearNonlinearControl/models/cartpole.py b/PythonLinearNonlinearControl/models/cartpole.py
new file mode 100644
index 0000000..42c6616
--- /dev/null
+++ b/PythonLinearNonlinearControl/models/cartpole.py
@@ -0,0 +1,186 @@
+import numpy as np
+
+from .model import Model
+
+class CartPoleModel(Model):
+ """ cartpole model
+ """
+ def __init__(self, config):
+ """
+ """
+ super(CartPoleModel, self).__init__()
+ self.dt = config.DT
+ self.mc = config.MC
+ self.mp = config.MP
+ self.l = config.L
+ self.g = config.G
+
+ def predict_next_state(self, curr_x, u):
+ """ predict next state
+
+ Args:
+ curr_x (numpy.ndarray): current state, shape(state_size, ) or
+ shape(pop_size, state_size)
+ u (numpy.ndarray): input, shape(input_size, ) or
+ shape(pop_size, input_size)
+ Returns:
+ next_x (numpy.ndarray): next state, shape(state_size, ) or
+ shape(pop_size, state_size)
+ """
+ if len(u.shape) == 1:
+ # x
+ d_x0 = curr_x[1]
+ # v_x
+ d_x1 = (u[0] + self.mp * np.sin(curr_x[2]) \
+ * (self.l * (curr_x[3]**2) \
+ + self.g * np.cos(curr_x[2]))) \
+ / (self.mc + self.mp * (np.sin(curr_x[2])**2))
+ # theta
+ d_x2 = curr_x[3]
+ # v_theta
+ d_x3 = (-u[0] * np.cos(curr_x[2]) \
+ - self.mp * self.l * (curr_x[3]**2) \
+ * np.cos(curr_x[2]) * np.sin(curr_x[2]) \
+ - (self.mc + self.mp) * self.g * np.sin(curr_x[2])) \
+ / (self.l * (self.mc + self.mp * (np.sin(curr_x[2])**2)))
+
+ next_x = curr_x +\
+ np.array([d_x0, d_x1, d_x2, d_x3]) * self.dt
+
+ return next_x
+
+ elif len(u.shape) == 2:
+ # x
+ d_x0 = curr_x[:, 1]
+ # v_x
+ d_x1 = (u[:, 0] + self.mp * np.sin(curr_x[:, 2]) \
+ * (self.l * (curr_x[:, 3]**2) \
+ + self.g * np.cos(curr_x[:, 2]))) \
+ / (self.mc + self.mp * (np.sin(curr_x[:, 2])**2))
+ # theta
+ d_x2 = curr_x[:, 3]
+ # v_theta
+ d_x3 = (-u[:, 0] * np.cos(curr_x[:, 2]) \
+ - self.mp * self.l * (curr_x[:, 3]**2) \
+ * np.cos(curr_x[:, 2]) * np.sin(curr_x[:, 2]) \
+ - (self.mc + self.mp) * self.g * np.sin(curr_x[:, 2])) \
+ / (self.l * (self.mc + self.mp * (np.sin(curr_x[:, 2])**2)))
+
+ next_x = curr_x +\
+ np.stack((d_x0, d_x1, d_x2, d_x3), axis=1) * self.dt
+
+ return next_x
+
+ def calc_f_x(self, xs, us, dt):
+ """ gradient of model with respect to the state in batch form
+ Args:
+ xs (numpy.ndarray): state, shape(pred_len+1, state_size)
+ us (numpy.ndarray): input, shape(pred_len, input_size,)
+
+ Return:
+ f_x (numpy.ndarray): gradient of model with respect to x,
+ shape(pred_len, state_size, state_size)
+
+ Notes:
+ This should be discrete form !!
+ """
+ # get size
+ (_, state_size) = xs.shape
+ (pred_len, _) = us.shape
+
+ f_x = np.zeros((pred_len, state_size, state_size))
+
+ f_x[:, 0, 2] = -np.sin(xs[:, 2]) * us[:, 0]
+ f_x[:, 1, 2] = np.cos(xs[:, 2]) * us[:, 0]
+
+ return f_x * dt + np.eye(state_size) # to discrete form
+
+ def calc_f_u(self, xs, us, dt):
+ """ gradient of model with respect to the input in batch form
+ Args:
+ xs (numpy.ndarray): state, shape(pred_len+1, state_size)
+ us (numpy.ndarray): input, shape(pred_len, input_size,)
+
+ Return:
+ f_u (numpy.ndarray): gradient of model with respect to x,
+ shape(pred_len, state_size, input_size)
+
+ Notes:
+ This should be discrete form !!
+ """
+ # get size
+ (_, state_size) = xs.shape
+ (pred_len, input_size) = us.shape
+
+ f_u = np.zeros((pred_len, state_size, input_size))
+
+ f_u[:, 1, 0] = 1. / (self.mc + self.mp * (np.sin(xs[:, 2])**2))
+
+ f_u[:, 3, 0] = -np.cos(xs[:, 2]) \
+ / (self.l * (self.mc \
+ + self.mp * (np.sin(xs[:, 2])**2)))
+
+ return f_u * dt # to discrete form
+
+ def calc_f_xx(self, xs, us, dt):
+ """ hessian of model with respect to the state in batch form
+
+ Args:
+ xs (numpy.ndarray): state, shape(pred_len+1, state_size)
+ us (numpy.ndarray): input, shape(pred_len, input_size,)
+
+ Return:
+ f_xx (numpy.ndarray): gradient of model with respect to x,
+ shape(pred_len, state_size, state_size, state_size)
+ """
+ # get size
+ (_, state_size) = xs.shape
+ (pred_len, _) = us.shape
+
+ f_xx = np.zeros((pred_len, state_size, state_size, state_size))
+
+ f_xx[:, 0, 2, 2] = -np.cos(xs[:, 2]) * us[:, 0]
+ f_xx[:, 1, 2, 2] = -np.sin(xs[:, 2]) * us[:, 0]
+
+ return f_xx * dt
+
+ def calc_f_ux(self, xs, us, dt):
+ """ hessian of model with respect to state and input in batch form
+
+ Args:
+ xs (numpy.ndarray): state, shape(pred_len+1, state_size)
+ us (numpy.ndarray): input, shape(pred_len, input_size,)
+
+ Return:
+ f_ux (numpy.ndarray): gradient of model with respect to x,
+ shape(pred_len, state_size, input_size, state_size)
+ """
+ # get size
+ (_, state_size) = xs.shape
+ (pred_len, input_size) = us.shape
+
+ f_ux = np.zeros((pred_len, state_size, input_size, state_size))
+
+ f_ux[:, 0, 0, 2] = -np.sin(xs[:, 2])
+ f_ux[:, 1, 0, 2] = np.cos(xs[:, 2])
+
+ return f_ux * dt
+
+ def calc_f_uu(self, xs, us, dt):
+ """ hessian of model with respect to input in batch form
+
+ Args:
+ xs (numpy.ndarray): state, shape(pred_len+1, state_size)
+ us (numpy.ndarray): input, shape(pred_len, input_size,)
+
+ Return:
+ f_uu (numpy.ndarray): gradient of model with respect to x,
+ shape(pred_len, state_size, input_size, input_size)
+ """
+ # get size
+ (_, state_size) = xs.shape
+ (pred_len, input_size) = us.shape
+
+ f_uu = np.zeros((pred_len, state_size, input_size, input_size))
+
+ return f_uu * dt
\ No newline at end of file
diff --git a/PythonLinearNonlinearControl/models/make_models.py b/PythonLinearNonlinearControl/models/make_models.py
index 7688f93..fcb29ae 100644
--- a/PythonLinearNonlinearControl/models/make_models.py
+++ b/PythonLinearNonlinearControl/models/make_models.py
@@ -1,5 +1,6 @@
from .first_order_lag import FirstOrderLagModel
from .two_wheeled import TwoWheeledModel
+from .cartpole import CartPoleModel
def make_model(args, config):
@@ -7,5 +8,7 @@ def make_model(args, config):
return FirstOrderLagModel(config)
elif args.env == "TwoWheeledConst" or args.env == "TwoWheeled":
return TwoWheeledModel(config)
+ elif args.env == "CartPole":
+ return CartPoleModel(config)
- raise NotImplementedError("There is not {} Model".format(args.env))
+ raise NotImplementedError("There is not {} Model".format(args.env))
\ No newline at end of file
diff --git a/PythonLinearNonlinearControl/models/model.py b/PythonLinearNonlinearControl/models/model.py
index 58fe32e..5eb2cb7 100644
--- a/PythonLinearNonlinearControl/models/model.py
+++ b/PythonLinearNonlinearControl/models/model.py
@@ -211,3 +211,94 @@ class LinearModel(Model):
next_x = np.matmul(curr_x, self.A.T) + np.matmul(u, self.B.T)
return next_x
+
+ def calc_f_x(self, xs, us, dt):
+ """ gradient of model with respect to the state in batch form
+
+ Args:
+ xs (numpy.ndarray): state, shape(pred_len+1, state_size)
+ us (numpy.ndarray): input, shape(pred_len, input_size,)
+ Return:
+ f_x (numpy.ndarray): gradient of model with respect to x,
+ shape(pred_len, state_size, state_size)
+ Notes:
+ This should be discrete form !!
+ """
+ # get size
+ (pred_len, _) = us.shape
+
+ return np.tile(self.A, (pred_len, 1, 1))
+
+ def calc_f_u(self, xs, us, dt):
+ """ gradient of model with respect to the input in batch form
+
+ Args:
+ xs (numpy.ndarray): state, shape(pred_len+1, state_size)
+ us (numpy.ndarray): input, shape(pred_len, input_size,)
+ Return:
+ f_u (numpy.ndarray): gradient of model with respect to x,
+ shape(pred_len, state_size, input_size)
+ Notes:
+ This should be discrete form !!
+ """
+ # get size
+ (pred_len, input_size) = us.shape
+
+ return np.tile(self.B, (pred_len, 1, 1))
+
+ @staticmethod
+ def calc_f_xx(xs, us, dt):
+ """ hessian of model with respect to the state in batch form
+
+ Args:
+ xs (numpy.ndarray): state, shape(pred_len+1, state_size)
+ us (numpy.ndarray): input, shape(pred_len, input_size,)
+ Return:
+ f_xx (numpy.ndarray): gradient of model with respect to x,
+ shape(pred_len, state_size, state_size, state_size)
+ """
+ # get size
+ (_, state_size) = xs.shape
+ (pred_len, _) = us.shape
+
+ f_xx = np.zeros((pred_len, state_size, state_size, state_size))
+
+ return f_xx
+
+ @staticmethod
+ def calc_f_ux(xs, us, dt):
+ """ hessian of model with respect to state and input in batch form
+
+ Args:
+ xs (numpy.ndarray): state, shape(pred_len+1, state_size)
+ us (numpy.ndarray): input, shape(pred_len, input_size,)
+ Return:
+ f_ux (numpy.ndarray): gradient of model with respect to x,
+ shape(pred_len, state_size, input_size, state_size)
+ """
+ # get size
+ (_, state_size) = xs.shape
+ (pred_len, input_size) = us.shape
+
+ f_ux = np.zeros((pred_len, state_size, input_size, state_size))
+
+ return f_ux
+
+ @staticmethod
+ def calc_f_uu(xs, us, dt):
+ """ hessian of model with respect to input in batch form
+
+ Args:
+ xs (numpy.ndarray): state, shape(pred_len+1, state_size)
+ us (numpy.ndarray): input, shape(pred_len, input_size,)
+ Return:
+ f_uu (numpy.ndarray): gradient of model with respect to x,
+ shape(pred_len, state_size, input_size, input_size)
+ """
+ # get size
+ (_, state_size) = xs.shape
+ (pred_len, input_size) = us.shape
+
+ f_uu = np.zeros((pred_len, state_size, input_size, input_size))
+
+ return f_uu
diff --git a/PythonLinearNonlinearControl/plotters/plot_func.py b/PythonLinearNonlinearControl/plotters/plot_func.py
index 216788e..16866ea 100644
--- a/PythonLinearNonlinearControl/plotters/plot_func.py
+++ b/PythonLinearNonlinearControl/plotters/plot_func.py
@@ -3,6 +3,8 @@ import os
import numpy as np
import matplotlib.pyplot as plt
+from ..helper import save_pickle, load_pickle
+
def plot_result(history, history_g=None, ylabel="x",
save_dir="./result", name="state_history"):
"""
@@ -47,14 +49,108 @@ def plot_result(history, history_g=None, ylabel="x",
def plot_results(args, history_x, history_u, history_g=None):
"""
+
Args:
history_x (numpy.ndarray): history of state, shape(iters, state_size)
history_u (numpy.ndarray): history of state, shape(iters, input_size)
Returns:
+ None
"""
plot_result(history_x, history_g=history_g, ylabel="x",
- name="state_history",
+ name= args.env + "-state_history",
save_dir="./result/" + args.controller_type)
plot_result(history_u, history_g=np.zeros_like(history_u), ylabel="u",
- name="input_history",
- save_dir="./result/" + args.controller_type)
\ No newline at end of file
+ name= args.env + "-input_history",
+ save_dir="./result/" + args.controller_type)
+
+def save_plot_data(args, history_x, history_u, history_g=None):
+ """ save plot data
+
+ Args:
+ history_x (numpy.ndarray): history of state, shape(iters, state_size)
+ history_u (numpy.ndarray): history of state, shape(iters, input_size)
+ Returns:
+ None
+ """
+ path = os.path.join("./result/" + args.controller_type,
+ args.env + "-history_x.pkl")
+ save_pickle(path, history_x)
+
+ path = os.path.join("./result/" + args.controller_type,
+ args.env + "-history_u.pkl")
+ save_pickle(path, history_u)
+
+ path = os.path.join("./result/" + args.controller_type,
+ args.env + "-history_g.pkl")
+ save_pickle(path, history_g)
+
+def load_plot_data(env, controller_type, result_dir="./result"):
+ """
+ Args:
+ env (str): environments name
+ controller_type (str): controller type
+ result_dir (str): result directory
+ Returns:
+ history_x (numpy.ndarray): history of state, shape(iters, state_size)
+ history_u (numpy.ndarray): history of state, shape(iters, input_size)
+ history_g (numpy.ndarray): history of state, shape(iters, input_size)
+ """
+ path = os.path.join("./result/" + controller_type,
+ env + "-history_x.pkl")
+ history_x = load_pickle(path)
+
+ path = os.path.join("./result/" + controller_type,
+ env + "-history_u.pkl")
+ history_u = load_pickle(path)
+
+ path = os.path.join("./result/" + controller_type,
+ env + "-history_g.pkl")
+ history_g = load_pickle(path)
+
+ return history_x, history_u, history_g
+
+def plot_multi_result(histories, histories_g=None, labels=None, ylabel="x",
+ save_dir="./result", name="state_history"):
+ """
+ Args:
+ history (numpy.ndarray): history, shape(iters, size)
+ """
+ (_, iters, size) = histories.shape
+
+ for i in range(0, size, 2):
+
+ figure = plt.figure()
+ axis1 = figure.add_subplot(211)
+ axis2 = figure.add_subplot(212)
+
+ axis1.set_ylabel(ylabel + "_{}".format(i))
+ axis2.set_ylabel(ylabel + "_{}".format(i+1))
+ axis2.set_xlabel("time steps")
+
+ # gt
+ def plot(axis, history, history_g=None, label=""):
+ axis.plot(range(iters), history,
+ linewidth=3, label=label, alpha=0.7, linestyle="dashed")
+ if history_g is not None:
+ axis.plot(range(iters), history_g,\
+ c="b", linewidth=3)
+
+ if i < size:
+ for j, (history, history_g) \
+ in enumerate(zip(histories, histories_g)):
+ plot(axis1, history[:, i],
+ history_g=history_g[:, i], label=labels[j])
+ if i+1 < size:
+ for j, (history, history_g) in \
+ enumerate(zip(histories, histories_g)):
+ plot(axis2, history[:, i+1],
+ history_g=history_g[:, i+1], label=labels[j])
+
+ # save
+ if save_dir is not None:
+ path = os.path.join(save_dir, name + "-{}".format(i))
+ else:
+ path = name
+
+ axis1.legend(ncol=3, bbox_to_anchor=(0., 1.02, 1., 0.102), loc=3)
+ figure.savefig(path, bbox_inches="tight", pad_inches=0.05)
diff --git a/README.md b/README.md
index 81bbae1..b177720 100644
--- a/README.md
+++ b/README.md
@@ -14,7 +14,8 @@ PythonLinearNonLinearControl is a library implementing the linear and nonlinear
|:----------|:---------------: |:----------------:|:----------------:|:----------------:|:----------------:|
| Linear Model Predictive Control (MPC) | ✓ | x | x | x | x |
| Cross Entropy Method (CEM) | ✓ | ✓ | x | x | x |
-| Model Preidictive Path Integral Control (MPPI) | ✓ | ✓ | x | x | x |
+| Model Preidictive Path Integral Control of Nagabandi, A. (MPPI) | ✓ | ✓ | x | x | x |
+| Model Preidictive Path Integral Control of Williams, G. (MPPIWilliams) | ✓ | ✓ | x | x | x |
| Random Shooting Method (Random) | ✓ | ✓ | x | x | x |
| Iterative LQR (iLQR) | x | ✓ | x | ✓ | x |
| Differential Dynamic Programming (DDP) | x | ✓ | x | ✓ | ✓ |
@@ -33,9 +34,12 @@ Following algorithms are implemented in PythonLinearNonlinearControl
- [Cross Entropy Method (CEM)](https://arxiv.org/abs/1805.12114)
- Ref: Chua, K., Calandra, R., McAllister, R., & Levine, S. (2018). Deep reinforcement learning in a handful of trials using probabilistic dynamics models. In Advances in Neural Information Processing Systems (pp. 4754-4765)
- [script](PythonLinearNonlinearControl/controllers/cem.py)
-- [Model Preidictive Path Integral Control (MPPI)](https://arxiv.org/abs/1909.11652)
+- [Model Preidictive Path Integral Control of Nagabandi, A. (MPPI)](https://arxiv.org/abs/1909.11652)
- Ref: Nagabandi, A., Konoglie, K., Levine, S., & Kumar, V. (2019). Deep Dynamics Models for Learning Dexterous Manipulation. arXiv preprint arXiv:1909.11652.
- [script](PythonLinearNonlinearControl/controllers/mppi.py)
+- [Model Preidictive Path Integral Control of Williams, G. (MPPIWilliams)](https://ieeexplore.ieee.org/abstract/document/7989202)
+ - Ref: Williams, G., Wagener, N., Goldfain, B., Drews, P., Rehg, J. M., Boots, B., & Theodorou, E. A. (2017, May). Information theoretic MPC for model-based reinforcement learning. In 2017 IEEE International Conference on Robotics and Automation (ICRA) (pp. 1714-1721). IEEE.
+ - [script](PythonLinearNonlinearControl/controllers/mppi_williams.py)
- [Random Shooting Method (Random)](https://arxiv.org/abs/1805.12114)
- Ref: Chua, K., Calandra, R., McAllister, R., & Levine, S. (2018). Deep reinforcement learning in a handful of trials using probabilistic dynamics models. In Advances in Neural Information Processing Systems (pp. 4754-4765)
- [script](PythonLinearNonlinearControl/controllers/random.py)
@@ -62,10 +66,13 @@ Following algorithms are implemented in PythonLinearNonlinearControl
| First Order Lag System | ✓ | x | 4 | 2 |
| Two wheeled System (Constant Goal) | x | ✓ | 3 | 2 |
| Two wheeled System (Moving Goal) (Coming soon) | x | ✓ | 3 | 2 |
+| Cartpole (Swing up) | x | ✓ | 4 | 1 |
-All environments are continuous.
+All states and inputs of environments are continuous.
**It should be noted that the algorithms for linear model could be applied to nonlinear enviroments if you have linealized the model of nonlinear environments.**
+You could know abount our environmets more in [Environments.md](Environments.md)
+
# Usage
## To install this package
diff --git a/assets/cartpole.png b/assets/cartpole.png
new file mode 100644
index 0000000..53abb68
Binary files /dev/null and b/assets/cartpole.png differ
diff --git a/assets/cartpole_score.png b/assets/cartpole_score.png
new file mode 100644
index 0000000..ef4d286
Binary files /dev/null and b/assets/cartpole_score.png differ
diff --git a/assets/firstorderlag.png b/assets/firstorderlag.png
new file mode 100644
index 0000000..ad0c9ff
Binary files /dev/null and b/assets/firstorderlag.png differ
diff --git a/assets/quadratic_score.png b/assets/quadratic_score.png
new file mode 100644
index 0000000..7202879
Binary files /dev/null and b/assets/quadratic_score.png differ
diff --git a/assets/twowheeled.png b/assets/twowheeled.png
new file mode 100644
index 0000000..921e111
Binary files /dev/null and b/assets/twowheeled.png differ
diff --git a/scripts/show_result.py b/scripts/show_result.py
new file mode 100644
index 0000000..e54b9dc
--- /dev/null
+++ b/scripts/show_result.py
@@ -0,0 +1,55 @@
+import os
+
+import argparse
+import pickle
+import numpy as np
+import matplotlib.pyplot as plt
+
+from PythonLinearNonlinearControl.plotters.plot_func import load_plot_data, \
+ plot_multi_result
+
+def run(args):
+
+ controllers = ["iLQR", "DDP", "CEM", "MPPI"]
+
+ history_xs = None
+ history_us = None
+ history_gs = None
+
+ # load data
+ for controller in controllers:
+ history_x, history_u, history_g = \
+ load_plot_data(args.env, controller,
+ result_dir=args.result_dir)
+
+ if history_xs is None:
+ history_xs = history_x[np.newaxis, :]
+ history_us = history_u[np.newaxis, :]
+ history_gs = history_g[np.newaxis, :]
+ continue
+
+ history_xs = np.concatenate((history_xs,
+ history_x[np.newaxis, :]), axis=0)
+ history_us = np.concatenate((history_us,
+ history_u[np.newaxis, :]), axis=0)
+ history_gs = np.concatenate((history_gs,
+ history_g[np.newaxis, :]), axis=0)
+
+ plot_multi_result(history_xs, histories_g=history_gs, labels=controllers,
+ ylabel="x")
+
+ plot_multi_result(history_us, histories_g=np.zeros_like(history_us),
+ labels=controllers, ylabel="u", name="input_history")
+
+def main():
+ parser = argparse.ArgumentParser()
+
+ parser.add_argument("--env", type=str, default="FirstOrderLag")
+ parser.add_argument("--result_dir", type=str, default="./result")
+
+ args = parser.parse_args()
+
+ run(args)
+
+if __name__ == "__main__":
+ main()
\ No newline at end of file
diff --git a/scripts/simple_run.py b/scripts/simple_run.py
index 0796266..25f828c 100644
--- a/scripts/simple_run.py
+++ b/scripts/simple_run.py
@@ -7,7 +7,8 @@ from PythonLinearNonlinearControl.configs.make_configs import make_config
from PythonLinearNonlinearControl.models.make_models import make_model
from PythonLinearNonlinearControl.envs.make_envs import make_env
from PythonLinearNonlinearControl.runners.make_runners import make_runner
-from PythonLinearNonlinearControl.plotters.plot_func import plot_results
+from PythonLinearNonlinearControl.plotters.plot_func import plot_results, \
+ save_plot_data
def run(args):
# logger
@@ -36,11 +37,12 @@ def run(args):
# plot results
plot_results(args, history_x, history_u, history_g=history_g)
+ save_plot_data(args, history_x, history_u, history_g=history_g)
def main():
parser = argparse.ArgumentParser()
- parser.add_argument("--controller_type", type=str, default="DDP")
+ parser.add_argument("--controller_type", type=str, default="CEM")
parser.add_argument("--planner_type", type=str, default="const")
parser.add_argument("--env", type=str, default="TwoWheeledConst")
parser.add_argument("--result_dir", type=str, default="./result")
diff --git a/setup.cfg b/setup.cfg
new file mode 100644
index 0000000..6ac0b45
--- /dev/null
+++ b/setup.cfg
@@ -0,0 +1,5 @@
+[aliases]
+test=pytest
+
+[tool:pytest]
+addopts=-s
\ No newline at end of file
diff --git a/tests/configs/test_cartpole.py b/tests/configs/test_cartpole.py
new file mode 100644
index 0000000..6f74321
--- /dev/null
+++ b/tests/configs/test_cartpole.py
@@ -0,0 +1,31 @@
+import pytest
+import numpy as np
+
+from PythonLinearNonlinearControl.configs.cartpole \
+ import CartPoleConfigModule
+
+class TestCalcCost():
+ def test_calc_costs(self):
+ # make config
+ config = CartPoleConfigModule()
+ # set
+ pred_len = 5
+ state_size = 4
+ input_size = 1
+ pop_size = 2
+ pred_xs = np.ones((pop_size, pred_len, state_size))
+ g_xs = np.ones((pop_size, pred_len, state_size)) * 0.5
+ input_samples = np.ones((pop_size, pred_len, input_size)) * 0.5
+
+ costs = config.input_cost_fn(input_samples)
+
+ assert costs.shape == (pop_size, pred_len, input_size)
+
+ costs = config.state_cost_fn(pred_xs, g_xs)
+
+ assert costs.shape == (pop_size, pred_len, 1)
+
+ costs = config.terminal_state_cost_fn(pred_xs[:, -1, :],\
+ g_xs[:, -1, :])
+
+ assert costs.shape == (pop_size, 1)
\ No newline at end of file
diff --git a/tests/configs/test_two_wheeled.py b/tests/configs/test_two_wheeled.py
new file mode 100644
index 0000000..fb9cb7c
--- /dev/null
+++ b/tests/configs/test_two_wheeled.py
@@ -0,0 +1,34 @@
+import pytest
+import numpy as np
+
+from PythonLinearNonlinearControl.configs.two_wheeled \
+ import TwoWheeledConfigModule
+
+class TestCalcCost():
+ def test_calc_costs(self):
+ # make config
+ config = TwoWheeledConfigModule()
+ # set
+ pred_len = 5
+ state_size = 3
+ input_size = 2
+ pop_size = 2
+ pred_xs = np.ones((pop_size, pred_len, state_size))
+ g_xs = np.ones((pop_size, pred_len, state_size)) * 0.5
+ input_samples = np.ones((pop_size, pred_len, input_size)) * 0.5
+
+ costs = config.input_cost_fn(input_samples)
+ expected_costs = np.ones((pop_size, pred_len, input_size))*0.5
+
+ assert costs == pytest.approx(expected_costs**2 * np.diag(config.R))
+
+ costs = config.state_cost_fn(pred_xs, g_xs)
+ expected_costs = np.ones((pop_size, pred_len, state_size))*0.5
+
+ assert costs == pytest.approx(expected_costs**2 * np.diag(config.Q))
+
+ costs = config.terminal_state_cost_fn(pred_xs[:, -1, :],\
+ g_xs[:, -1, :])
+ expected_costs = np.ones((pop_size, state_size))*0.5
+
+ assert costs == pytest.approx(expected_costs**2 * np.diag(config.Sf))
\ No newline at end of file
diff --git a/tests/env/test_cartpole.py b/tests/env/test_cartpole.py
new file mode 100644
index 0000000..7b726bc
--- /dev/null
+++ b/tests/env/test_cartpole.py
@@ -0,0 +1,73 @@
+import pytest
+import numpy as np
+
+from PythonLinearNonlinearControl.envs.cartpole import CartPoleEnv
+
+class TestCartPoleEnv():
+ """
+ """
+ def test_step(self):
+ env = CartPoleEnv()
+
+ curr_x = np.ones(4)
+ curr_x[2] = np.pi / 6.
+
+ env.reset(init_x=curr_x)
+
+ u = np.ones(1)
+
+ next_x, _, _, _ = env.step(u)
+
+ d_x0 = curr_x[1]
+ d_x1 = (1. + env.config["mp"] * np.sin(np.pi / 6.) \
+ * (env.config["l"] * (1.**2) \
+ + env.config["g"] * np.cos(np.pi / 6.))) \
+ / (env.config["mc"] + env.config["mp"] * np.sin(np.pi / 6.)**2)
+ d_x2 = curr_x[3]
+ d_x3 = (-1. * np.cos(np.pi / 6.) \
+ - env.config["mp"] * env.config["l"] * (1.**2) \
+ * np.cos(np.pi / 6.) * np.sin(np.pi / 6.) \
+ - (env.config["mp"] + env.config["mc"]) * env.config["g"] \
+ * np.sin(np.pi / 6.)) \
+ / (env.config["l"] \
+ * (env.config["mc"] \
+ + env.config["mp"] * np.sin(np.pi / 6.)**2))
+
+ expected = np.array([d_x0, d_x1, d_x2, d_x3]) * env.config["dt"] \
+ + curr_x
+
+ assert next_x == pytest.approx(expected, abs=1e-5)
+
+ def test_bound_step(self):
+ env = CartPoleEnv()
+
+ curr_x = np.ones(4)
+ curr_x[2] = np.pi / 6.
+
+ env.reset(init_x=curr_x)
+
+ u = np.ones(1) * 1e3
+
+ next_x, _, _, _ = env.step(u)
+
+ u = env.config["input_upper_bound"][0]
+
+ d_x0 = curr_x[1]
+ d_x1 = (u + env.config["mp"] * np.sin(np.pi / 6.) \
+ * (env.config["l"] * (1.**2) \
+ + env.config["g"] * np.cos(np.pi / 6.))) \
+ / (env.config["mc"] + env.config["mp"] * np.sin(np.pi / 6.)**2)
+ d_x2 = curr_x[3]
+ d_x3 = (-u * np.cos(np.pi / 6.) \
+ - env.config["mp"] * env.config["l"] * (1.**2) \
+ * np.cos(np.pi / 6.) * np.sin(np.pi / 6.) \
+ - (env.config["mp"] + env.config["mc"]) * env.config["g"] \
+ * np.sin(np.pi / 6.)) \
+ / (env.config["l"] \
+ * (env.config["mc"] \
+ + env.config["mp"] * np.sin(np.pi / 6.)**2))
+
+ expected = np.array([d_x0, d_x1, d_x2, d_x3]) * env.config["dt"] \
+ + curr_x
+
+ assert next_x == pytest.approx(expected, abs=1e-5)
\ No newline at end of file
diff --git a/tests/models/test_cartpole.py b/tests/models/test_cartpole.py
new file mode 100644
index 0000000..f7241b8
--- /dev/null
+++ b/tests/models/test_cartpole.py
@@ -0,0 +1,57 @@
+import pytest
+import numpy as np
+
+from PythonLinearNonlinearControl.models.cartpole import CartPoleModel
+from PythonLinearNonlinearControl.configs.cartpole \
+ import CartPoleConfigModule
+
+class TestCartPoleModel():
+ """
+ """
+ def test_step(self):
+ config = CartPoleConfigModule()
+ cartpole_model = CartPoleModel(config)
+
+ curr_x = np.ones(4)
+ curr_x[2] = np.pi / 6.
+
+ us = np.ones((1, 1))
+
+ next_x = cartpole_model.predict_traj(curr_x, us)
+
+ d_x0 = curr_x[1]
+ d_x1 = (1. + config.MP * np.sin(np.pi / 6.) \
+ * (config.L * (1.**2) \
+ + config.G * np.cos(np.pi / 6.))) \
+ / (config.MC + config.MP * np.sin(np.pi / 6.)**2)
+ d_x2 = curr_x[3]
+ d_x3 = (-1. * np.cos(np.pi / 6.) \
+ - config.MP * config.L * (1.**2) \
+ * np.cos(np.pi / 6.) * np.sin(np.pi / 6.) \
+ - (config.MP + config.MC) * config.G \
+ * np.sin(np.pi / 6.)) \
+ / (config.L \
+ * (config.MC \
+ + config.MP * np.sin(np.pi / 6.)**2))
+
+ expected = np.array([d_x0, d_x1, d_x2, d_x3]) * config.DT \
+ + curr_x
+
+ expected = np.stack((curr_x, expected), axis=0)
+
+ assert next_x == pytest.approx(expected, abs=1e-5)
+
+ def test_predict_traj(self):
+ config = CartPoleConfigModule()
+ cartpole_model = CartPoleModel(config)
+
+ curr_x = np.ones(config.STATE_SIZE)
+ curr_x[-1] = np.pi / 6.
+ u = np.ones((1, config.INPUT_SIZE))
+
+ pred_xs = cartpole_model.predict_traj(curr_x, u)
+
+ u = np.tile(u, (2, 1, 1))
+ pred_xs_alltogether = cartpole_model.predict_traj(curr_x, u)[0]
+
+ assert pred_xs_alltogether == pytest.approx(pred_xs)
\ No newline at end of file
diff --git a/tests/models/test_first_order_lag.py b/tests/models/test_first_order_lag.py
new file mode 100644
index 0000000..3f1790c
--- /dev/null
+++ b/tests/models/test_first_order_lag.py
@@ -0,0 +1,43 @@
+import pytest
+import numpy as np
+
+from PythonLinearNonlinearControl.models.model \
+ import LinearModel
+from PythonLinearNonlinearControl.models.first_order_lag \
+ import FirstOrderLagModel
+from PythonLinearNonlinearControl.configs.first_order_lag \
+ import FirstOrderLagConfigModule
+
+from unittest.mock import patch
+from unittest.mock import Mock
+
+class TestFirstOrderLagModel():
+ """
+ """
+ def test_step(self):
+ config = FirstOrderLagConfigModule()
+ firstorderlag_model = FirstOrderLagModel(config)
+
+ curr_x = np.ones(config.STATE_SIZE)
+ u = np.ones((1, config.INPUT_SIZE))
+
+ with patch.object(LinearModel, "predict_traj") as mock_predict_traj:
+ firstorderlag_model.predict_traj(curr_x, u)
+
+ mock_predict_traj.assert_called_once_with(curr_x, u)
+
+ def test_predict_traj(self):
+
+ config = FirstOrderLagConfigModule()
+ firstorderlag_model = FirstOrderLagModel(config)
+
+ curr_x = np.ones(config.STATE_SIZE)
+ curr_x[-1] = np.pi / 6.
+ u = np.ones((1, config.INPUT_SIZE))
+
+ pred_xs = firstorderlag_model.predict_traj(curr_x, u)
+
+ u = np.tile(u, (1, 1, 1))
+ pred_xs_alltogether = firstorderlag_model.predict_traj(curr_x, u)[0]
+
+ assert pred_xs_alltogether == pytest.approx(pred_xs)
\ No newline at end of file