diff --git a/.gitignore b/.gitignore index b467fc0..0879c05 100644 --- a/.gitignore +++ b/.gitignore @@ -109,7 +109,6 @@ celerybeat.pid # Environments .env .venv -env/ venv/ ENV/ env.bak/ diff --git a/PythonLinearNonlinearControl/configs/first_order_lag.py b/PythonLinearNonlinearControl/configs/first_order_lag.py index 166b3d0..7726a77 100644 --- a/PythonLinearNonlinearControl/configs/first_order_lag.py +++ b/PythonLinearNonlinearControl/configs/first_order_lag.py @@ -43,22 +43,19 @@ class FirstOrderLagConfigModule(): "kappa": 0.9, "noise_sigma": 0.5, }, - "iLQR":{ - }, - "cgmres-NMPC":{ - }, - "newton-NMPC":{ - }, + "MPC":{ + } } @staticmethod def input_cost_fn(u): """ input cost functions Args: - u (numpy.ndarray): input, shape(input_size, ) - or shape(pop_size, input_size) + u (numpy.ndarray): input, shape(pred_len, input_size) + or shape(pop_size, pred_len, input_size) Returns: - cost (numpy.ndarray): cost of input, none or shape(pop_size, ) + cost (numpy.ndarray): cost of input, shape(pred_len, input_size) or + shape(pop_size, pred_len, input_size) """ return (u**2) * np.diag(FirstOrderLagConfigModule.R) @@ -67,11 +64,12 @@ class FirstOrderLagConfigModule(): """ state cost function Args: x (numpy.ndarray): state, shape(pred_len, state_size) - or shape(pop_size, pred_len, state_size) - g_x (numpy.ndarray): goal state, shape(state_size, ) - or shape(pop_size, state_size) + or shape(pop_size, pred_len, state_size) + g_x (numpy.ndarray): goal state, shape(pred_len, state_size) + or shape(pop_size, pred_len, state_size) Returns: - cost (numpy.ndarray): cost of state, none or shape(pop_size, ) + cost (numpy.ndarray): cost of state, shape(pred_len, state_size) or + shape(pop_size, pred_len, state_size) """ return ((x - g_x)**2) * np.diag(FirstOrderLagConfigModule.Q) @@ -84,7 +82,8 @@ class FirstOrderLagConfigModule(): terminal_g_x (numpy.ndarray): terminal goal state, shape(state_size, ) or shape(pop_size, state_size) Returns: - cost (numpy.ndarray): cost of state, none or shape(pop_size, ) + cost (numpy.ndarray): cost of state, shape(pred_len, ) or + shape(pop_size, pred_len) """ return ((terminal_x - terminal_g_x)**2) \ * np.diag(FirstOrderLagConfigModule.Sf) \ No newline at end of file diff --git a/PythonLinearNonlinearControl/configs/two_wheeled.py b/PythonLinearNonlinearControl/configs/two_wheeled.py index f8f2692..f2cab4b 100644 --- a/PythonLinearNonlinearControl/configs/two_wheeled.py +++ b/PythonLinearNonlinearControl/configs/two_wheeled.py @@ -5,13 +5,13 @@ class TwoWheeledConfigModule(): ENV_NAME = "TwoWheeled-v0" TYPE = "Nonlinear" TASK_HORIZON = 1000 - PRED_LEN = 10 + PRED_LEN = 20 STATE_SIZE = 3 INPUT_SIZE = 2 DT = 0.01 # cost parameters - R = np.eye(INPUT_SIZE) - Q = np.eye(STATE_SIZE) + R = np.eye(INPUT_SIZE) * 0.1 + Q = np.eye(STATE_SIZE) * 0.5 Sf = np.eye(STATE_SIZE) # bounds INPUT_LOWER_BOUND = np.array([-1.5, 3.14]) @@ -40,34 +40,50 @@ class TwoWheeledConfigModule(): "noise_sigma": 0.5, }, "iLQR":{ + "max_iter": 500, + "mu": 1., + "mu_min": 1e-6, + "mu_max": 1e10, + "init_delta": 2., + "threshold": 1e-6, + }, + "DDP":{ + "max_iter": 500, + "mu": 1., + "mu_min": 1e-6, + "mu_max": 1e10, + "init_delta": 2., + "threshold": 1e-6, }, "NMPC-CGMRES":{ }, "NMPC-Newton":{ }, - } + } @staticmethod def input_cost_fn(u): """ input cost functions Args: - u (numpy.ndarray): input, shape(input_size, ) - or shape(pop_size, input_size) + u (numpy.ndarray): input, shape(pred_len, input_size) + or shape(pop_size, pred_len, input_size) Returns: - cost (numpy.ndarray): cost of input, none or shape(pop_size, ) + cost (numpy.ndarray): cost of input, shape(pred_len, input_size) or + shape(pop_size, pred_len, input_size) """ - return (u**2) * np.diag(TwoWheeledConfigModule.R) * 0.1 + return (u**2) * np.diag(TwoWheeledConfigModule.R) @staticmethod def state_cost_fn(x, g_x): """ state cost function Args: x (numpy.ndarray): state, shape(pred_len, state_size) - or shape(pop_size, pred_len, state_size) - g_x (numpy.ndarray): goal state, shape(state_size, ) - or shape(pop_size, state_size) + or shape(pop_size, pred_len, state_size) + g_x (numpy.ndarray): goal state, shape(pred_len, state_size) + or shape(pop_size, pred_len, state_size) Returns: - cost (numpy.ndarray): cost of state, none or shape(pop_size, ) + cost (numpy.ndarray): cost of state, shape(pred_len, state_size) or + shape(pop_size, pred_len, state_size) """ return ((x - g_x)**2) * np.diag(TwoWheeledConfigModule.Q) @@ -80,7 +96,93 @@ class TwoWheeledConfigModule(): terminal_g_x (numpy.ndarray): terminal goal state, shape(state_size, ) or shape(pop_size, state_size) Returns: - cost (numpy.ndarray): cost of state, none or shape(pop_size, ) + cost (numpy.ndarray): cost of state, shape(pred_len, ) or + shape(pop_size, pred_len) """ return ((terminal_x - terminal_g_x)**2) \ - * np.diag(TwoWheeledConfigModule.Sf) \ No newline at end of file + * np.diag(TwoWheeledConfigModule.Sf) + + @staticmethod + def gradient_cost_fn_with_state(x, g_x, terminal=False): + """ gradient of costs with respect to the state + + Args: + x (numpy.ndarray): state, shape(pred_len, state_size) + g_x (numpy.ndarray): goal state, shape(pred_len, state_size) + + Returns: + l_x (numpy.ndarray): gradient of cost, shape(pred_len, state_size) + or shape(1, state_size) + """ + if not terminal: + return 2. * (x - g_x) * np.diag(TwoWheeledConfigModule.Q) + + return (2. * (x - g_x) \ + * np.diag(TwoWheeledConfigModule.Sf))[np.newaxis, :] + + @staticmethod + def gradient_cost_fn_with_input(x, u): + """ gradient of costs with respect to the input + + Args: + x (numpy.ndarray): state, shape(pred_len, state_size) + u (numpy.ndarray): goal state, shape(pred_len, input_size) + + Returns: + l_u (numpy.ndarray): gradient of cost, shape(pred_len, input_size) + """ + return 2. * u * np.diag(TwoWheeledConfigModule.R) + + @staticmethod + def hessian_cost_fn_with_state(x, g_x, terminal=False): + """ hessian costs with respect to the state + + Args: + x (numpy.ndarray): state, shape(pred_len, state_size) + g_x (numpy.ndarray): goal state, shape(pred_len, state_size) + + Returns: + l_xx (numpy.ndarray): gradient of cost, + shape(pred_len, state_size, state_size) or + shape(1, state_size, state_size) or + """ + if not terminal: + (pred_len, _) = x.shape + return -g_x[:, :, np.newaxis] \ + * np.tile(2.*TwoWheeledConfigModule.Q, (pred_len, 1, 1)) + + return -g_x[:, np.newaxis] \ + * np.tile(2.*TwoWheeledConfigModule.Sf, (1, 1, 1)) + + @staticmethod + def hessian_cost_fn_with_input(x, u): + """ hessian costs with respect to the input + + Args: + x (numpy.ndarray): state, shape(pred_len, state_size) + u (numpy.ndarray): goal state, shape(pred_len, input_size) + + Returns: + l_uu (numpy.ndarray): gradient of cost, + shape(pred_len, input_size, input_size) + """ + (pred_len, _) = u.shape + + return np.tile(2.*TwoWheeledConfigModule.R, (pred_len, 1, 1)) + + @staticmethod + def hessian_cost_fn_with_input_state(x, u): + """ hessian costs with respect to the state and input + + Args: + x (numpy.ndarray): state, shape(pred_len, state_size) + u (numpy.ndarray): goal state, shape(pred_len, input_size) + + Returns: + l_ux (numpy.ndarray): gradient of cost , + shape(pred_len, input_size, state_size) + """ + (_, state_size) = x.shape + (pred_len, input_size) = u.shape + + return np.zeros((pred_len, input_size, state_size)) \ No newline at end of file diff --git a/PythonLinearNonlinearControl/controllers/controller.py b/PythonLinearNonlinearControl/controllers/controller.py index 4e63be5..50abb01 100644 --- a/PythonLinearNonlinearControl/controllers/controller.py +++ b/PythonLinearNonlinearControl/controllers/controller.py @@ -24,7 +24,8 @@ class Controller(): Returns: opt_input (numpy.ndarray): optimal input, shape(input_size, ) """ - raise NotImplementedError("Implement gradient of hamitonian with respect to the state") + raise NotImplementedError("Implement the algorithm to \ + get optimal input") def calc_cost(self, curr_x, samples, g_xs): """ calculate the cost of input samples diff --git a/PythonLinearNonlinearControl/controllers/ddp.py b/PythonLinearNonlinearControl/controllers/ddp.py new file mode 100644 index 0000000..a542410 --- /dev/null +++ b/PythonLinearNonlinearControl/controllers/ddp.py @@ -0,0 +1,403 @@ +from logging import getLogger + +import numpy as np +import scipy.stats as stats + +from .controller import Controller +from ..envs.cost import calc_cost + +logger = getLogger(__name__) + +class DDP(Controller): + """ Differential Dynamic Programming + + Ref: + Tassa, Y., Erez, T., & Todorov, E. (2012). . In 2012 IEEE/RSJ International Conference on + Intelligent Robots and Systems (pp. 4906-4913). and Study Wolf, + https://github.com/studywolf/control + """ + def __init__(self, config, model): + """ + """ + super(DDP, self).__init__(config, model) + + if config.TYPE != "Nonlinear": + raise ValueError("{} could be not applied to \ + this controller".format(model)) + + # model + self.model = model + + # get cost func + self.state_cost_fn = config.state_cost_fn + self.terminal_state_cost_fn = config.terminal_state_cost_fn + self.input_cost_fn = config.input_cost_fn + self.gradient_cost_fn_with_state = config.gradient_cost_fn_with_state + self.gradient_cost_fn_with_input = config.gradient_cost_fn_with_input + self.hessian_cost_fn_with_state = config.hessian_cost_fn_with_state + self.hessian_cost_fn_with_input = config.hessian_cost_fn_with_input + self.hessian_cost_fn_with_input_state = \ + config.hessian_cost_fn_with_input_state + + # controller parameters + self.max_iter = config.opt_config["DDP"]["max_iter"] + self.mu = config.opt_config["DDP"]["mu"] + self.mu_min = config.opt_config["DDP"]["mu_min"] + self.mu_max = config.opt_config["DDP"]["mu_max"] + self.init_delta = config.opt_config["DDP"]["init_delta"] + self.delta = self.init_delta + self.threshold = config.opt_config["DDP"]["threshold"] + + # general parameters + self.pred_len = config.PRED_LEN + self.input_size = config.INPUT_SIZE + self.dt = config.DT + + # cost parameters + self.Q = config.Q + self.R = config.R + self.Sf = config.Sf + + # initialize + self.prev_sol = np.zeros((self.pred_len, self.input_size)) + + def clear_sol(self): + """ clear prev sol + """ + logger.debug("Clear Sol") + self.prev_sol = np.zeros((self.pred_len, self.input_size)) + + def obtain_sol(self, curr_x, g_xs): + """ calculate the optimal inputs + + Args: + curr_x (numpy.ndarray): current state, shape(state_size, ) + g_xs (numpy.ndarrya): goal trajectory, shape(plan_len, state_size) + Returns: + opt_input (numpy.ndarray): optimal input, shape(input_size, ) + """ + # initialize + opt_count = 0 + sol = self.prev_sol.copy() + converged_sol = False + update_sol = True + + # line search param + alphas = 1.1**(-np.arange(10)**2) + + while opt_count < self.max_iter: + accepted_sol = False + + # forward + if update_sol == True: + pred_xs, cost, f_x, f_u, f_xx, f_ux, f_uu,\ + l_x, l_xx, l_u, l_uu, l_ux = \ + self.forward(curr_x, g_xs, sol) + update_sol = False + + try: + # backward + k, K = self.backward(f_x, f_u, f_xx, f_ux, f_uu, \ + l_x, l_xx, l_u, l_uu, l_ux) + + # line search + for alpha in alphas: + new_pred_xs, new_sol = \ + self.calc_input(k, K, pred_xs, sol, alpha) + + new_cost = calc_cost(new_pred_xs[np.newaxis, :, :], + new_sol[np.newaxis, :, :], + g_xs[np.newaxis, :, :], + self.state_cost_fn, + self.input_cost_fn, + self.terminal_state_cost_fn) + + if new_cost < cost: + if np.abs((cost - new_cost) / cost) < self.threshold: + converged_sol = True + + cost = new_cost + pred_xs = new_pred_xs + sol = new_sol + update_sol = True + + # decrease regularization term + self.delta = min(1.0, self.delta) / self.init_delta + self.mu *= self.delta + if self.mu <= self.mu_min: + self.mu = 0.0 + + # accept the solution + accepted_sol = True + break + + except np.linalg.LinAlgError as e: + logger.debug("Non ans : {}".format(e)) + + if not accepted_sol: + # increase regularization term. + self.delta = max(1.0, self.delta) * self.init_delta + self.mu = max(self.mu_min, self.mu * self.delta) + logger.debug("Update regularization term to {}"\ + .format(self.mu)) + if self.mu >= self.mu_max: + logger.debug("Reach Max regularization term") + break + + if converged_sol: + logger.debug("Get converged sol") + break + + opt_count += 1 + + # update prev sol + self.prev_sol[:-1] = sol[1:] + self.prev_sol[-1] = sol[-1] # last use the terminal input + + return sol[0] + + def calc_input(self, k, K, pred_xs, sol, alpha): + """ calc input trajectory by using k and K + + Args: + k (numpy.ndarray): gain, shape(pred_len, input_size) + K (numpy.ndarray): gain, shape(pred_len, input_size, state_size) + pred_xs (numpy.ndarray): predicted state, + shape(pred_len+1, state_size) + sol (numpy.ndarray): input trajectory, previous solutions + shape(pred_len, input_size) + alpha (float): param of line search + Returns: + new_pred_xs (numpy.ndarray): update state trajectory, + shape(pred_len+1, state_size) + new_sol (numpy.ndarray): update input trajectory, + shape(pred_len, input_size) + """ + # get size + (pred_len, input_size, state_size) = K.shape + # initialize + new_pred_xs = np.zeros((pred_len+1, state_size)) + new_pred_xs[0] = pred_xs[0].copy() # init state is same + new_sol = np.zeros((pred_len, input_size)) + + for t in range(pred_len): + new_sol[t] = sol[t] \ + + alpha * k[t] \ + + np.dot(K[t], (new_pred_xs[t] - pred_xs[t])) + new_pred_xs[t+1] = self.model.predict_next_state(new_pred_xs[t], + new_sol[t]) + + return new_pred_xs, new_sol + + def forward(self, curr_x, g_xs, sol): + """ forward step of iLQR + + Args: + curr_x (numpy.ndarray): current state, shape(state_size, ) + g_xs (numpy.ndarrya): goal trajectory, shape(plan_len, state_size) + sol (numpy.ndarray): solutions, shape(plan_len, input_size) + Returns: + f_x (numpy.ndarray): gradient of model with respecto to state, + shape(pred_len, state_size, state_size) + f_u (numpy.ndarray): gradient of model with respecto to input, + shape(pred_len, state_size, input_size) + f_xx (numpy.ndarray): gradient of model with respecto to state, + shape(pred_len+1, state_size, state_size, state_size) + f_ux (numpy.ndarray): gradient of model with respecto to input, + shape(pred_len, state_size, input_size, state_size) + f_uu (numpy.ndarray): gradient of model with respecto to input, + shape(pred_len, state_size, input_size, input_size) + l_x (numpy.ndarray): gradient of cost with respecto to state, + shape(pred_len+1, state_size) + l_u (numpy.ndarray): gradient of cost with respecto to input, + shape(pred_len, input_size) + l_xx (numpy.ndarray): hessian of cost with respecto to state, + shape(pred_len+1, state_size, state_size) + l_uu (numpy.ndarray): hessian of cost with respecto to input, + shape(pred_len+1, input_size, input_size) + l_ux (numpy.ndarray): hessian of cost with respect + to state and input, shape(pred_len, input_size, state_size) + """ + # simulate forward using the current control trajectory + pred_xs = self.model.predict_traj(curr_x, sol) + # check costs + cost = self.calc_cost(curr_x, + sol[np.newaxis, :, :], + g_xs) + + # calc gradinet in batch + f_x = self.model.calc_f_x(pred_xs[:-1], sol, self.dt) + f_u = self.model.calc_f_u(pred_xs[:-1], sol, self.dt) + # calc hessian in batch + f_xx = self.model.calc_f_xx(pred_xs[:-1], sol, self.dt) + f_ux = self.model.calc_f_ux(pred_xs[:-1], sol, self.dt) + f_uu = self.model.calc_f_uu(pred_xs[:-1], sol, self.dt) + + # gradint of costs + l_x, l_xx, l_u, l_uu, l_ux = \ + self._calc_gradient_hessian_cost(pred_xs, g_xs, sol) + + return pred_xs, cost, f_x, f_u, f_xx, f_ux, f_uu, \ + l_x, l_xx, l_u, l_uu, l_ux + + def _calc_gradient_hessian_cost(self, pred_xs, g_x, sol): + """ calculate gradient and hessian of model and cost fn + + Args: + pred_xs (numpy.ndarray): predict traj, + shape(pred_len+1, state_size) + sol (numpy.ndarray): input traj, + shape(pred_len, input_size) + Returns + l_x (numpy.ndarray): gradient of cost, + shape(pred_len+1, state_size) + l_u (numpy.ndarray): gradient of cost, + shape(pred_len, input_size) + l_xx (numpy.ndarray): hessian of cost, + shape(pred_len+1, state_size, state_size) + l_uu (numpy.ndarray): hessian of cost, + shape(pred_len+1, input_size, input_size) + l_ux (numpy.ndarray): hessian of cost, + shape(pred_len, input_size, state_size) + """ + # l_x.shape = (pred_len+1, state_size) + l_x = self.gradient_cost_fn_with_state(pred_xs[:-1], + g_x[:-1], terminal=False) + terminal_l_x = \ + self.gradient_cost_fn_with_state(pred_xs[-1], + g_x[-1], terminal=True) + + l_x = np.concatenate((l_x, terminal_l_x), axis=0) + + # l_u.shape = (pred_len, input_size) + l_u = self.gradient_cost_fn_with_input(pred_xs[:-1], sol) + + # l_xx.shape = (pred_len+1, state_size, state_size) + l_xx = self.hessian_cost_fn_with_state(pred_xs[:-1], + g_x[:-1], terminal=False) + terminal_l_xx = \ + self.hessian_cost_fn_with_state(pred_xs[-1], + g_x[-1], terminal=True) + + l_xx = np.concatenate((l_xx, terminal_l_xx), axis=0) + + # l_uu.shape = (pred_len, input_size, input_size) + l_uu = self.hessian_cost_fn_with_input(pred_xs[:-1], sol) + + # l_ux.shape = (pred_len, input_size, state_size) + l_ux = self.hessian_cost_fn_with_input_state(pred_xs[:-1], sol) + + return l_x, l_xx, l_u, l_uu, l_ux + + def backward(self, f_x, f_u, f_xx, f_ux, f_uu, l_x, l_xx, l_u, l_uu, l_ux): + """ backward step of iLQR + Args: + f_x (numpy.ndarray): gradient of model with respecto to state, + shape(pred_len+1, state_size, state_size) + f_u (numpy.ndarray): gradient of model with respecto to input, + shape(pred_len, state_size, input_size) + f_xx (numpy.ndarray): gradient of model with respecto to state, + shape(pred_len+1, state_size, state_size, state_size) + f_ux (numpy.ndarray): gradient of model with respecto to input, + shape(pred_len, state_size, input_size, state_size) + f_uu (numpy.ndarray): gradient of model with respecto to input, + shape(pred_len, state_size, input_size, input_size) + l_x (numpy.ndarray): gradient of cost with respecto to state, + shape(pred_len+1, state_size) + l_u (numpy.ndarray): gradient of cost with respecto to input, + shape(pred_len, input_size) + l_xx (numpy.ndarray): hessian of cost with respecto to state, + shape(pred_len+1, state_size, state_size) + l_uu (numpy.ndarray): hessian of cost with respecto to input, + shape(pred_len, input_size, input_size) + l_ux (numpy.ndarray): hessian of cost with respect + to state and input, shape(pred_len, input_size, state_size) + + Returns: + k (numpy.ndarray): gain, shape(pred_len, input_size) + K (numpy.ndarray): gain, shape(pred_len, input_size, state_size) + """ + # get size + (_, state_size, _) = f_x.shape + + # initialzie + V_x = l_x[-1] + V_xx = l_xx[-1] + k = np.zeros((self.pred_len, self.input_size)) + K = np.zeros((self.pred_len, self.input_size, state_size)) + + for t in range(self.pred_len-1, -1, -1): + # get Q val + Q_x, Q_u, Q_xx, Q_ux, Q_uu = self._Q(f_x[t], f_u[t], + f_xx[t], f_ux[t], f_uu[t], + l_x[t], + l_u[t], l_xx[t], l_ux[t], + l_uu[t], V_x, V_xx) + # calc gain + k[t] = - np.linalg.solve(Q_uu, Q_u) + K[t] = - np.linalg.solve(Q_uu, Q_ux) + # update V_x val + V_x = Q_x + np.dot(np.dot(K[t].T, Q_uu), k[t]) + V_x += np.dot(K[t].T, Q_u) + np.dot(Q_ux.T, k[t]) + # update V_xx val + V_xx = Q_xx + np.dot(np.dot(K[t].T, Q_uu), K[t]) + V_xx += np.dot(K[t].T, Q_ux) + np.dot(Q_ux.T, K[t]) + V_xx = 0.5 * (V_xx + V_xx.T) # to maintain symmetry. + + return k, K + + def _Q(self, f_x, f_u, f_xx, f_ux, f_uu, + l_x, l_u, l_xx, l_ux, l_uu, V_x, V_xx): + """Computes second order expansion. + Args: + f_x (numpy.ndarray): gradient of model with respecto to state, + shape(state_size, state_size) + f_u (numpy.ndarray): gradient of model with respecto to input, + shape(state_size, input_size) + f_xx (numpy.ndarray): gradient of model with respecto to state, + shape(state_size, state_size, state_size) + f_ux (numpy.ndarray): gradient of model with respecto to input, + shape(state_size, input_size, state_size) + f_uu (numpy.ndarray): gradient of model with respecto to input, + shape(state_size, input_size, input_size) + l_x (numpy.ndarray): gradient of cost with respecto to state, + shape(state_size, ) + l_u (numpy.ndarray): gradient of cost with respecto to input, + shape(input_size, ) + l_xx (numpy.ndarray): hessian of cost with respecto to state, + shape(state_size, state_size) + l_uu (numpy.ndarray): hessian of cost with respecto to input, + shape(input_size, input_size) + l_ux (numpy.ndarray): hessian of cost with respect + to state and input, shape(input_size, state_size) + V_x (numpy.ndarray): gradient of Value function, + shape(state_size, ) + V_xx (numpy.ndarray): hessian of Value function, + shape(state_size, state_size) + Returns: + Q_x (numpy.ndarray): gradient of Q function, shape(state_size, ) + Q_u (numpy.ndarray): gradient of Q function, shape(input_size, ) + Q_xx (numpy.ndarray): hessian of Q fucntion, + shape(state_size, state_size) + Q_ux (numpy.ndarray): hessian of Q fucntion, + shape(input_size, state_size) + Q_uu (numpy.ndarray): hessian of Q fucntion, + shape(input_size, input_size) + """ + # get size + state_size = len(l_x) + + Q_x = l_x + np.dot(f_x.T, V_x) + Q_u = l_u + np.dot(f_u.T, V_x) + Q_xx = l_xx + np.dot(np.dot(f_x.T, V_xx), f_x) + + reg = self.mu * np.eye(state_size) + Q_ux = l_ux + np.dot(np.dot(f_u.T, (V_xx + reg)), f_x) + Q_uu = l_uu + np.dot(np.dot(f_u.T, (V_xx + reg)), f_u) + + # tensor constraction + Q_xx += np.tensordot(V_x, f_xx, axes=1) + Q_ux += np.tensordot(V_x, f_ux, axes=1) + Q_uu += np.tensordot(V_x, f_uu, axes=1) + + return Q_x, Q_u, Q_xx, Q_ux, Q_uu \ No newline at end of file diff --git a/PythonLinearNonlinearControl/controllers/ilqr.py b/PythonLinearNonlinearControl/controllers/ilqr.py index 46eedac..ff99f4f 100644 --- a/PythonLinearNonlinearControl/controllers/ilqr.py +++ b/PythonLinearNonlinearControl/controllers/ilqr.py @@ -10,6 +10,11 @@ logger = getLogger(__name__) class iLQR(Controller): """ iterative Liner Quadratique Regulator + + Ref: + Tassa, Y., Erez, T., & Todorov, E. (2012). . In 2012 IEEE/RSJ International Conference on + Intelligent Robots and Systems (pp. 4906-4913). and Study Wolf, + https://github.com/studywolf/control """ def __init__(self, config, model): """ @@ -20,5 +25,346 @@ class iLQR(Controller): raise ValueError("{} could be not applied to \ this controller".format(model)) + # model self.model = model - \ No newline at end of file + + # get cost func + self.state_cost_fn = config.state_cost_fn + self.terminal_state_cost_fn = config.terminal_state_cost_fn + self.input_cost_fn = config.input_cost_fn + self.gradient_cost_fn_with_state = config.gradient_cost_fn_with_state + self.gradient_cost_fn_with_input = config.gradient_cost_fn_with_input + self.hessian_cost_fn_with_state = config.hessian_cost_fn_with_state + self.hessian_cost_fn_with_input = config.hessian_cost_fn_with_input + self.hessian_cost_fn_with_input_state = \ + config.hessian_cost_fn_with_input_state + + # controller parameters + self.max_iter = config.opt_config["iLQR"]["max_iter"] + self.mu = config.opt_config["iLQR"]["mu"] + self.mu_min = config.opt_config["iLQR"]["mu_min"] + self.mu_max = config.opt_config["iLQR"]["mu_max"] + self.init_delta = config.opt_config["iLQR"]["init_delta"] + self.delta = self.init_delta + self.threshold = config.opt_config["iLQR"]["threshold"] + + # general parameters + self.pred_len = config.PRED_LEN + self.input_size = config.INPUT_SIZE + self.dt = config.DT + + # cost parameters + self.Q = config.Q + self.R = config.R + self.Sf = config.Sf + + # initialize + self.prev_sol = np.zeros((self.pred_len, self.input_size)) + + def clear_sol(self): + """ clear prev sol + """ + logger.debug("Clear Sol") + self.prev_sol = np.zeros((self.pred_len, self.input_size)) + + def obtain_sol(self, curr_x, g_xs): + """ calculate the optimal inputs + + Args: + curr_x (numpy.ndarray): current state, shape(state_size, ) + g_xs (numpy.ndarrya): goal trajectory, shape(plan_len, state_size) + Returns: + opt_input (numpy.ndarray): optimal input, shape(input_size, ) + """ + # initialize + opt_count = 0 + sol = self.prev_sol.copy() + converged_sol = False + update_sol = True + + # line search param + alphas = 1.1**(-np.arange(10)**2) + + while opt_count < self.max_iter: + accepted_sol = False + + # forward + if update_sol == True: + pred_xs, cost, f_x, f_u, l_x, l_xx, l_u, l_uu, l_ux = \ + self.forward(curr_x, g_xs, sol) + update_sol = False + + try: + # backward + k, K = self.backward(f_x, f_u, l_x, l_xx, l_u, l_uu, l_ux) + + # line search + for alpha in alphas: + new_pred_xs, new_sol = \ + self.calc_input(k, K, pred_xs, sol, alpha) + + new_cost = calc_cost(new_pred_xs[np.newaxis, :, :], + new_sol[np.newaxis, :, :], + g_xs[np.newaxis, :, :], + self.state_cost_fn, + self.input_cost_fn, + self.terminal_state_cost_fn) + + if new_cost < cost: + if np.abs((cost - new_cost) / cost) < self.threshold: + converged_sol = True + + cost = new_cost + pred_xs = new_pred_xs + sol = new_sol + update_sol = True + + # decrease regularization term + self.delta = min(1.0, self.delta) / self.init_delta + self.mu *= self.delta + if self.mu <= self.mu_min: + self.mu = 0.0 + + # accept the solution + accepted_sol = True + break + + except np.linalg.LinAlgError as e: + logger.debug("Non ans : {}".format(e)) + + if not accepted_sol: + # increase regularization term. + self.delta = max(1.0, self.delta) * self.init_delta + self.mu = max(self.mu_min, self.mu * self.delta) + logger.debug("Update regularization term to {}"\ + .format(self.mu)) + if self.mu >= self.mu_max: + logger.debug("Reach Max regularization term") + break + + if converged_sol: + logger.debug("Get converged sol") + break + + opt_count += 1 + + # update prev sol + self.prev_sol[:-1] = sol[1:] + self.prev_sol[-1] = sol[-1] # last use the terminal input + + return sol[0] + + def calc_input(self, k, K, pred_xs, sol, alpha): + """ calc input trajectory by using k and K + + Args: + k (numpy.ndarray): gain, shape(pred_len, input_size) + K (numpy.ndarray): gain, shape(pred_len, input_size, state_size) + pred_xs (numpy.ndarray): predicted state, + shape(pred_len+1, state_size) + sol (numpy.ndarray): input trajectory, previous solutions + shape(pred_len, input_size) + alpha (float): param of line search + Returns: + new_pred_xs (numpy.ndarray): update state trajectory, + shape(pred_len+1, state_size) + new_sol (numpy.ndarray): update input trajectory, + shape(pred_len, input_size) + """ + # get size + (pred_len, input_size, state_size) = K.shape + # initialize + new_pred_xs = np.zeros((pred_len+1, state_size)) + new_pred_xs[0] = pred_xs[0].copy() # init state is same + new_sol = np.zeros((pred_len, input_size)) + + for t in range(pred_len): + new_sol[t] = sol[t] \ + + alpha * k[t] \ + + np.dot(K[t], (new_pred_xs[t] - pred_xs[t])) + new_pred_xs[t+1] = self.model.predict_next_state(new_pred_xs[t], + new_sol[t]) + + return new_pred_xs, new_sol + + def forward(self, curr_x, g_xs, sol): + """ forward step of iLQR + + Args: + curr_x (numpy.ndarray): current state, shape(state_size, ) + g_xs (numpy.ndarrya): goal trajectory, shape(plan_len, state_size) + sol (numpy.ndarray): solutions, shape(plan_len, input_size) + Returns: + f_x (numpy.ndarray): gradient of model with respecto to state, + shape(pred_len, state_size, state_size) + f_u (numpy.ndarray): gradient of model with respecto to input, + shape(pred_len, state_size, input_size) + l_x (numpy.ndarray): gradient of cost with respecto to state, + shape(pred_len+1, state_size) + l_u (numpy.ndarray): gradient of cost with respecto to input, + shape(pred_len, input_size) + l_xx (numpy.ndarray): hessian of cost with respecto to state, + shape(pred_len+1, state_size, state_size) + l_uu (numpy.ndarray): hessian of cost with respecto to input, + shape(pred_len+1, input_size, input_size) + l_ux (numpy.ndarray): hessian of cost with respect + to state and input, shape(pred_len, input_size, state_size) + """ + # simulate forward using the current control trajectory + pred_xs = self.model.predict_traj(curr_x, sol) + # check costs + cost = self.calc_cost(curr_x, + sol[np.newaxis, :, :], + g_xs) + + # calc gradinet in batch + f_x = self.model.calc_f_x(pred_xs[:-1], sol, self.dt) + f_u = self.model.calc_f_u(pred_xs[:-1], sol, self.dt) + + # gradint of costs + l_x, l_xx, l_u, l_uu, l_ux = \ + self._calc_gradient_hessian_cost(pred_xs, g_xs, sol) + + return pred_xs, cost, f_x, f_u, l_x, l_xx, l_u, l_uu, l_ux + + def _calc_gradient_hessian_cost(self, pred_xs, g_x, sol): + """ calculate gradient and hessian of model and cost fn + + Args: + pred_xs (numpy.ndarray): predict traj, + shape(pred_len+1, state_size) + sol (numpy.ndarray): input traj, + shape(pred_len, input_size) + Returns + l_x (numpy.ndarray): gradient of cost, + shape(pred_len+1, state_size) + l_u (numpy.ndarray): gradient of cost, + shape(pred_len, input_size) + l_xx (numpy.ndarray): hessian of cost, + shape(pred_len+1, state_size, state_size) + l_uu (numpy.ndarray): hessian of cost, + shape(pred_len+1, input_size, input_size) + l_ux (numpy.ndarray): hessian of cost, + shape(pred_len, input_size, state_size) + """ + # l_x.shape = (pred_len+1, state_size) + l_x = self.gradient_cost_fn_with_state(pred_xs[:-1], + g_x[:-1], terminal=False) + terminal_l_x = \ + self.gradient_cost_fn_with_state(pred_xs[-1], + g_x[-1], terminal=True) + + l_x = np.concatenate((l_x, terminal_l_x), axis=0) + + # l_u.shape = (pred_len, input_size) + l_u = self.gradient_cost_fn_with_input(pred_xs[:-1], sol) + + # l_xx.shape = (pred_len+1, state_size, state_size) + l_xx = self.hessian_cost_fn_with_state(pred_xs[:-1], + g_x[:-1], terminal=False) + terminal_l_xx = \ + self.hessian_cost_fn_with_state(pred_xs[-1], + g_x[-1], terminal=True) + + l_xx = np.concatenate((l_xx, terminal_l_xx), axis=0) + + # l_uu.shape = (pred_len, input_size, input_size) + l_uu = self.hessian_cost_fn_with_input(pred_xs[:-1], sol) + + # l_ux.shape = (pred_len, input_size, state_size) + l_ux = self.hessian_cost_fn_with_input_state(pred_xs[:-1], sol) + + return l_x, l_xx, l_u, l_uu, l_ux + + def backward(self, f_x, f_u, l_x, l_xx, l_u, l_uu, l_ux): + """ backward step of iLQR + Args: + f_x (numpy.ndarray): gradient of model with respecto to state, + shape(pred_len+1, state_size, state_size) + f_u (numpy.ndarray): gradient of model with respecto to input, + shape(pred_len, state_size, input_size) + l_x (numpy.ndarray): gradient of cost with respecto to state, + shape(pred_len+1, state_size) + l_u (numpy.ndarray): gradient of cost with respecto to input, + shape(pred_len, input_size) + l_xx (numpy.ndarray): hessian of cost with respecto to state, + shape(pred_len+1, state_size, state_size) + l_uu (numpy.ndarray): hessian of cost with respecto to input, + shape(pred_len, input_size, input_size) + l_ux (numpy.ndarray): hessian of cost with respect + to state and input, shape(pred_len, input_size, state_size) + + Returns: + k (numpy.ndarray): gain, shape(pred_len, input_size) + K (numpy.ndarray): gain, shape(pred_len, input_size, state_size) + """ + # get size + (_, state_size, _) = f_x.shape + + # initialzie + V_x = l_x[-1] + V_xx = l_xx[-1] + k = np.zeros((self.pred_len, self.input_size)) + K = np.zeros((self.pred_len, self.input_size, state_size)) + + for t in range(self.pred_len-1, -1, -1): + # get Q val + Q_x, Q_u, Q_xx, Q_ux, Q_uu = self._Q(f_x[t], f_u[t], l_x[t], + l_u[t], l_xx[t], l_ux[t], + l_uu[t], V_x, V_xx) + # calc gain + k[t] = - np.linalg.solve(Q_uu, Q_u) + K[t] = - np.linalg.solve(Q_uu, Q_ux) + # update V_x val + V_x = Q_x + np.dot(np.dot(K[t].T, Q_uu), k[t]) + V_x += np.dot(K[t].T, Q_u) + np.dot(Q_ux.T, k[t]) + # update V_xx val + V_xx = Q_xx + np.dot(np.dot(K[t].T, Q_uu), K[t]) + V_xx += np.dot(K[t].T, Q_ux) + np.dot(Q_ux.T, K[t]) + V_xx = 0.5 * (V_xx + V_xx.T) # to maintain symmetry. + + return k, K + + def _Q(self, f_x, f_u, l_x, l_u, l_xx, l_ux, l_uu, V_x, V_xx): + """Computes second order expansion. + Args: + f_x (numpy.ndarray): gradient of model with respecto to state, + shape(state_size, state_size) + f_u (numpy.ndarray): gradient of model with respecto to input, + shape(state_size, input_size) + l_x (numpy.ndarray): gradient of cost with respecto to state, + shape(state_size, ) + l_u (numpy.ndarray): gradient of cost with respecto to input, + shape(input_size, ) + l_xx (numpy.ndarray): hessian of cost with respecto to state, + shape(state_size, state_size) + l_uu (numpy.ndarray): hessian of cost with respecto to input, + shape(input_size, input_size) + l_ux (numpy.ndarray): hessian of cost with respect + to state and input, shape(input_size, state_size) + V_x (numpy.ndarray): gradient of Value function, + shape(state_size, ) + V_xx (numpy.ndarray): hessian of Value function, + shape(state_size, state_size) + Returns: + Q_x (numpy.ndarray): gradient of Q function, shape(state_size, ) + Q_u (numpy.ndarray): gradient of Q function, shape(input_size, ) + Q_xx (numpy.ndarray): hessian of Q fucntion, + shape(state_size, state_size) + Q_ux (numpy.ndarray): hessian of Q fucntion, + shape(input_size, state_size) + Q_uu (numpy.ndarray): hessian of Q fucntion, + shape(input_size, input_size) + """ + # get size + state_size = len(l_x) + + Q_x = l_x + np.dot(f_x.T, V_x) + Q_u = l_u + np.dot(f_u.T, V_x) + Q_xx = l_xx + np.dot(np.dot(f_x.T, V_xx), f_x) + + reg = self.mu * np.eye(state_size) + Q_ux = l_ux + np.dot(np.dot(f_u.T, (V_xx + reg)), f_x) + Q_uu = l_uu + np.dot(np.dot(f_u.T, (V_xx + reg)), f_u) + + return Q_x, Q_u, Q_xx, Q_ux, Q_uu \ No newline at end of file diff --git a/PythonLinearNonlinearControl/controllers/make_controllers.py b/PythonLinearNonlinearControl/controllers/make_controllers.py index 05a5f3b..b786351 100644 --- a/PythonLinearNonlinearControl/controllers/make_controllers.py +++ b/PythonLinearNonlinearControl/controllers/make_controllers.py @@ -3,6 +3,7 @@ from .cem import CEM from .random import RandomShooting from .mppi import MPPI from .ilqr import iLQR +from .ddp import DDP def make_controller(args, config, model): @@ -15,4 +16,6 @@ def make_controller(args, config, model): elif args.controller_type == "MPPI": return MPPI(config, model) elif args.controller_type == "iLQR": + return iLQR(config, model) + elif args.controller_type == "DDP": return iLQR(config, model) \ No newline at end of file diff --git a/PythonLinearNonlinearControl/envs/first_order_lag.py b/PythonLinearNonlinearControl/envs/first_order_lag.py index 8d0588d..7da07b8 100644 --- a/PythonLinearNonlinearControl/envs/first_order_lag.py +++ b/PythonLinearNonlinearControl/envs/first_order_lag.py @@ -91,7 +91,7 @@ class FirstOrderLagEnv(Env): # clip action u = np.clip(u, self.config["input_lower_bound"], - self.config["input_lower_bound"]) + self.config["input_upper_bound"]) next_x = np.matmul(self.A, self.curr_x[:, np.newaxis]) \ + np.matmul(self.B, u[:, np.newaxis]) diff --git a/PythonLinearNonlinearControl/models/model.py b/PythonLinearNonlinearControl/models/model.py index 27bbd34..58fe32e 100644 --- a/PythonLinearNonlinearControl/models/model.py +++ b/PythonLinearNonlinearControl/models/model.py @@ -140,18 +140,41 @@ class Model(): """ raise NotImplementedError("Implement terminal adjoint state") - def gradient_x(self, x, u): - """ gradient of model with respect to the state + @staticmethod + def calc_f_x(xs, us, dt): + """ gradient of model with respect to the state in batch form """ raise NotImplementedError("Implement gradient of model \ with respect to the state") - def gradient_u(self, x, u): - """ gradient of model with respect to the input + @staticmethod + def calc_f_u(xs, us, dt): + """ gradient of model with respect to the input in batch form """ raise NotImplementedError("Implement gradient of model \ with respect to the input") + @staticmethod + def calc_f_xx(xs, us, dt): + """ hessian of model with respect to the state in batch form + """ + raise NotImplementedError("Implement hessian of model \ + with respect to the state") + + @staticmethod + def calc_f_ux(xs, us, dt): + """ hessian of model with respect to the input in batch form + """ + raise NotImplementedError("Implement hessian of model \ + with respect to the input and state") + + @staticmethod + def calc_f_uu(xs, us, dt): + """ hessian of model with respect to the state in batch form + """ + raise NotImplementedError("Implement hessian of model \ + with respect to the input") + class LinearModel(Model): """ discrete linear model, x[k+1] = Ax[k] + Bu[k] diff --git a/PythonLinearNonlinearControl/models/two_wheeled.py b/PythonLinearNonlinearControl/models/two_wheeled.py index babb86f..e4bf4ed 100644 --- a/PythonLinearNonlinearControl/models/two_wheeled.py +++ b/PythonLinearNonlinearControl/models/two_wheeled.py @@ -50,4 +50,118 @@ class TwoWheeledModel(Model): next_x = x_dot[:, :, 0] * self.dt + curr_x return next_x - \ No newline at end of file + + @staticmethod + def calc_f_x(xs, us, dt): + """ gradient of model with respect to the state in batch form + Args: + xs (numpy.ndarray): state, shape(pred_len+1, state_size) + us (numpy.ndarray): input, shape(pred_len, input_size,) + + Return: + f_x (numpy.ndarray): gradient of model with respect to x, + shape(pred_len, state_size, state_size) + + Notes: + This should be discrete form !! + """ + # get size + (_, state_size) = xs.shape + (pred_len, _) = us.shape + + f_x = np.zeros((pred_len, state_size, state_size)) + f_x[:, 0, 2] = -np.sin(xs[:, 2]) * us[:, 0] + f_x[:, 1, 2] = np.cos(xs[:, 2]) * us[:, 0] + + return f_x * dt + np.eye(state_size) # to discrete form + + @staticmethod + def calc_f_u(xs, us, dt): + """ gradient of model with respect to the input in batch form + Args: + xs (numpy.ndarray): state, shape(pred_len+1, state_size) + us (numpy.ndarray): input, shape(pred_len, input_size,) + + Return: + f_u (numpy.ndarray): gradient of model with respect to x, + shape(pred_len, state_size, input_size) + + Notes: + This should be discrete form !! + """ + # get size + (_, state_size) = xs.shape + (pred_len, input_size) = us.shape + + f_u = np.zeros((pred_len, state_size, input_size)) + f_u[:, 0, 0] = np.cos(xs[:, 2]) + f_u[:, 1, 0] = np.sin(xs[:, 2]) + f_u[:, 2, 1] = 1. + + return f_u * dt # to discrete form + + @staticmethod + def calc_f_xx(xs, us, dt): + """ hessian of model with respect to the state in batch form + + Args: + xs (numpy.ndarray): state, shape(pred_len+1, state_size) + us (numpy.ndarray): input, shape(pred_len, input_size,) + + Return: + f_xx (numpy.ndarray): gradient of model with respect to x, + shape(pred_len, state_size, state_size, state_size) + """ + # get size + (_, state_size) = xs.shape + (pred_len, _) = us.shape + + f_xx = np.zeros((pred_len, state_size, state_size, state_size)) + + f_xx[:, 0, 2, 2] = -np.cos(xs[:, 2]) * us[:, 0] + f_xx[:, 1, 2, 2] = -np.sin(xs[:, 2]) * us[:, 0] + + return f_xx + + @staticmethod + def calc_f_ux(xs, us, dt): + """ hessian of model with respect to state and input in batch form + + Args: + xs (numpy.ndarray): state, shape(pred_len+1, state_size) + us (numpy.ndarray): input, shape(pred_len, input_size,) + + Return: + f_ux (numpy.ndarray): gradient of model with respect to x, + shape(pred_len, state_size, input_size, state_size) + """ + # get size + (_, state_size) = xs.shape + (pred_len, input_size) = us.shape + + f_ux = np.zeros((pred_len, state_size, input_size, state_size)) + + f_ux[:, 0, 0, 2] = -np.sin(xs[:, 2]) + f_ux[:, 1, 0, 2] = np.cos(xs[:, 2]) + + return f_ux + + @staticmethod + def calc_f_uu(xs, us, dt): + """ hessian of model with respect to input in batch form + + Args: + xs (numpy.ndarray): state, shape(pred_len+1, state_size) + us (numpy.ndarray): input, shape(pred_len, input_size,) + + Return: + f_uu (numpy.ndarray): gradient of model with respect to x, + shape(pred_len, state_size, input_size, input_size) + """ + # get size + (_, state_size) = xs.shape + (pred_len, input_size) = us.shape + + f_uu = np.zeros((pred_len, state_size, input_size, input_size)) + + return f_uu \ No newline at end of file diff --git a/README.md b/README.md index e3c0767..52b72db 100644 --- a/README.md +++ b/README.md @@ -39,8 +39,11 @@ Following algorithms are implemented in PythonLinearNonlinearControl - Ref: Chua, K., Calandra, R., McAllister, R., & Levine, S. (2018). Deep reinforcement learning in a handful of trials using probabilistic dynamics models. In Advances in Neural Information Processing Systems (pp. 4754-4765) - [script](PythonLinearNonlinearControl/controllers/random.py) - [Iterative LQR (iLQR)](https://ieeexplore.ieee.org/document/6386025) - - Ref: Tassa, Y., Erez, T., & Todorov, E. (2012, October). Synthesis and stabilization of complex behaviors through online trajectory optimization. In 2012 IEEE/RSJ International Conference on Intelligent Robots and Systems (pp. 4906-4913). IEEE. and [Study Wolf](https://github.com/studywolf/control) - - [script (Coming soon)]() + - Ref: Tassa, Y., Erez, T., & Todorov, E. (2012, October). Synthesis and stabilization of complex behaviors through online trajectory optimization. In 2012 IEEE/RSJ International Conference on Intelligent Robots and Systems (pp. 4906-4913). IEEE. and [Study Wolf](https://github.com/studywolf/control), https://github.com/anassinator/ilqr + - [script](PythonLinearNonlinearControl/controllers/ilqr.py) +- [Dynamic Differential Programing (DDP)](https://ieeexplore.ieee.org/document/6386025) + - Ref: Tassa, Y., Erez, T., & Todorov, E. (2012, October). Synthesis and stabilization of complex behaviors through online trajectory optimization. In 2012 IEEE/RSJ International Conference on Intelligent Robots and Systems (pp. 4906-4913). IEEE. and [Study Wolf](https://github.com/studywolf/control), https://github.com/anassinator/ilqr + - [script](PythonLinearNonlinearControl/controllers/ddp.py) - [Unconstrained Nonlinear Model Predictive Control (NMPC)](https://www.sciencedirect.com/science/article/pii/S0005109897000058) - Ref: Ohtsuka, T., & Fujii, H. A. (1997). Real-time optimization algorithm for nonlinear receding-horizon control. Automatica, 33(6), 1147-1154. - [script (Coming soon)]() @@ -93,7 +96,7 @@ pip install -e . You can run the experiments as follows: ``` -python scripts/simple_run.py --model first-order_lag --controller CEM +python scripts/simple_run.py --env first-order_lag --controller CEM ``` **figures and animations are saved in the ./result folder.** diff --git a/scripts/simple_run.py b/scripts/simple_run.py index 0fd47dc..0796266 100644 --- a/scripts/simple_run.py +++ b/scripts/simple_run.py @@ -40,7 +40,7 @@ def run(args): def main(): parser = argparse.ArgumentParser() - parser.add_argument("--controller_type", type=str, default="CEM") + parser.add_argument("--controller_type", type=str, default="DDP") parser.add_argument("--planner_type", type=str, default="const") parser.add_argument("--env", type=str, default="TwoWheeledConst") parser.add_argument("--result_dir", type=str, default="./result") diff --git a/tests/env/__init__.py b/tests/env/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/tests/env/test_first_orger_lag.py b/tests/env/test_first_orger_lag.py new file mode 100644 index 0000000..916e0d6 --- /dev/null +++ b/tests/env/test_first_orger_lag.py @@ -0,0 +1,43 @@ +import pytest +import numpy as np + +from PythonLinearNonlinearControl.envs.first_order_lag import FirstOrderLagEnv + +class TestFirstOrderLagEnv(): + + def test_step(self): + env = FirstOrderLagEnv() + + curr_x = np.ones(4) + + env.reset(init_x=curr_x) + + u = np.ones(2) * 0.1 + + next_x, _, _, _ = env.step(u) + + dx = np.dot(env.A, curr_x[:, np.newaxis]) + du = np.dot(env.B, u[:, np.newaxis]) + + expected = (dx + du).flatten() + + assert next_x == pytest.approx(expected, abs=1e-5) + + def test_bound_step(self): + env = FirstOrderLagEnv() + + curr_x = np.ones(4) + + env.reset(init_x=curr_x) + + u = np.ones(2) * 1e5 + + next_x, _, _, _ = env.step(u) + + dx = np.dot(env.A, curr_x[:, np.newaxis]) + du = np.dot(env.B, + np.array(env.config["input_upper_bound"])[:, np.newaxis]) + + expected = (dx + du).flatten() + + assert next_x == pytest.approx(expected, abs=1e-5) \ No newline at end of file diff --git a/tests/env/test_two_wheeled.py b/tests/env/test_two_wheeled.py new file mode 100644 index 0000000..603c778 --- /dev/null +++ b/tests/env/test_two_wheeled.py @@ -0,0 +1,50 @@ +import pytest +import numpy as np + +from PythonLinearNonlinearControl.envs.two_wheeled import TwoWheeledConstEnv + +class TestTwoWheeledEnv(): + """ + """ + def test_step(self): + env = TwoWheeledConstEnv() + + curr_x = np.ones(3) + curr_x[-1] = np.pi / 6. + + env.reset(init_x=curr_x) + + u = np.ones(2) + + next_x, _, _, _ = env.step(u) + + pos_x = np.cos(curr_x[-1]) * u[0] * env.config["dt"] + curr_x[0] + pos_y = np.sin(curr_x[-1]) * u[0] * env.config["dt"] + curr_x[1] + + expected = np.array([pos_x, pos_y,\ + curr_x[-1] + u[1] * env.config["dt"]]) + + assert next_x == pytest.approx(expected) + + def test_bound_step(self): + env = TwoWheeledConstEnv() + + curr_x = np.ones(3) + curr_x[-1] = np.pi / 6. + + env.reset(init_x=curr_x) + + u = np.ones(2) * 1e3 + + next_x, _, _, _ = env.step(u) + + pos_x = np.cos(curr_x[-1]) * env.config["input_upper_bound"][0] \ + * env.config["dt"] + curr_x[0] + pos_y = np.sin(curr_x[-1]) * env.config["input_upper_bound"][0] \ + * env.config["dt"] + curr_x[1] + + expected = np.array([pos_x, pos_y,\ + curr_x[-1] + env.config["input_upper_bound"][1] \ + * env.config["dt"]]) + + assert next_x == pytest.approx(expected) \ No newline at end of file