From e7263b216a267991bd6b270605cfea9bac97cd50 Mon Sep 17 00:00:00 2001 From: Shunichi09 Date: Thu, 7 Feb 2019 18:08:17 +0900 Subject: [PATCH] modify mpc --- mpc/basic/main_ACC_TEMP.py | 243 +++++++++ mpc/sample/pathplanner.py | 233 +++++++++ mpc/sample/test.py | 614 +++++++++++++++++++++++ mpc/with_disturbance/animation.py | 4 +- mpc/with_disturbance/coordinate_trans.py | 2 +- mpc/with_disturbance/iterative_MPC.py | 295 +++++++++++ mpc/with_disturbance/main_track.py | 281 ++++++++--- mpc/with_disturbance/traj_func.py | 32 ++ 8 files changed, 1620 insertions(+), 84 deletions(-) create mode 100644 mpc/basic/main_ACC_TEMP.py create mode 100644 mpc/sample/pathplanner.py create mode 100644 mpc/sample/test.py create mode 100644 mpc/with_disturbance/iterative_MPC.py create mode 100644 mpc/with_disturbance/traj_func.py diff --git a/mpc/basic/main_ACC_TEMP.py b/mpc/basic/main_ACC_TEMP.py new file mode 100644 index 0000000..9ad7c97 --- /dev/null +++ b/mpc/basic/main_ACC_TEMP.py @@ -0,0 +1,243 @@ +import numpy as np +import matplotlib.pyplot as plt +import math +import copy + +from mpc_func_with_cvxopt import MpcController as MpcController_cvxopt +from animation import AnimDrawer +# from control import matlab + +class TwoWheeledSystem(): + """SampleSystem, this is the simulator + Attributes + ----------- + xs : numpy.ndarray + system states, [x, y, theta] + history_xs : list + time history of state + """ + def __init__(self, init_states=None): + """ + Palameters + ----------- + init_state : float, optional, shape(3, ) + initial state of system default is None + """ + self.xs = np.zeros(3) + + if init_states is not None: + self.xs = copy.deepcopy(init_states) + + self.history_xs = [init_states] + + def update_state(self, us, dt=0.01): + """ + Palameters + ------------ + u : numpy.ndarray + inputs of system in some cases this means the reference + dt : float in seconds, optional + sampling time of simulation, default is 0.01 [s] + """ + # for theta 1, theta 1 dot, theta 2, theta 2 dot + k0 = [0.0 for _ in range(3)] + k1 = [0.0 for _ in range(3)] + k2 = [0.0 for _ in range(3)] + k3 = [0.0 for _ in range(3)] + + functions = [self._func_x_1, self._func_x_2, self._func_x_3] + + # solve Runge-Kutta + for i, func in enumerate(functions): + k0[i] = dt * func(self.xs[0], self.xs[1], self.xs[2], us[0], us[1]) + + for i, func in enumerate(functions): + k1[i] = dt * func(self.xs[0] + k0[0]/2., self.xs[1] + k0[1]/2., self.xs[2] + k0[2]/2., us[0], us[1]) + + for i, func in enumerate(functions): + k2[i] = dt * func(self.xs[0] + k0[0]/2., self.xs[1] + k0[1]/2., self.xs[2] + k0[2]/2., us[0], us[1]) + + for i, func in enumerate(functions): + k3[i] = dt * func(self.xs[0] + k2[0], self.xs[1] + k2[1], self.xs[2] + k2[2], us[0], us[1]) + + self.xs[0] += (k0[0] + 2. * k1[0] + 2. * k2[0] + k3[0]) / 6. + self.xs[1] += (k0[1] + 2. * k1[1] + 2. * k2[1] + k3[1]) / 6. + self.xs[2] += (k0[2] + 2. * k1[2] + 2. * k2[2] + k3[2]) / 6. + + # save + save_states = copy.deepcopy(self.xs) + self.history_xs.append(save_states) + print(self.xs) + + def _func_x_1(self, y_1, y_2, y_3, u_1, u_2): + """ + Parameters + ------------ + y_1 : float + y_2 : float + y_3 : float + u_1 : float + system input + u_2 : float + system input + """ + y_dot = math.cos(y_3) * u_1 + return y_dot + + def _func_x_2(self, y_1, y_2, y_3, u_1, u_2): + """ + Parameters + ------------ + y_1 : float + y_2 : float + y_3 : float + u_1 : float + system input + u_2 : float + system input + """ + y_dot = math.sin(y_3) * u_1 + return y_dot + + def _func_x_3(self, y_1, y_2, y_3, u_1, u_2): + """ + Parameters + ------------ + y_1 : float + y_2 : float + y_3 : float + u_1 : float + system input + u_2 : float + system input + """ + y_dot = u_2 + return y_dot + +def main(): + dt = 0.05 + simulation_time = 10 # in seconds + iteration_num = int(simulation_time / dt) + + # you must be care about this matrix + # these A and B are for continuos system if you want to use discret system matrix please skip this step + # lineared car system + V = 5.0 + Ad = np.array([[1., V * dt], [0., 1.]]) + Bd = np.array([[0.], [1. * dt]]) + + C = np.eye(2) + D = np.zeros((2, 1)) + + # make simulator with coninuous matrix + init_xs_lead = np.array([5., 0., 0.]) + init_xs_follow = np.array([0., 0., 0.]) + lead_car = TwoWheeledSystem(init_states=init_xs_lead) + follow_car = TwoWheeledSystem(init_states=init_xs_follow) + + # create system + # sysc = matlab.ss(A, B, C, D) + # discrete system + # sysd = matlab.c2d(sysc, dt) + + # evaluation function weight + Q = np.diag([1., 1.]) + R = np.diag([5.]) + pre_step = 15 + + # make controller with discreted matrix + # please check the solver, if you want to use the scipy, set the MpcController_scipy + lead_controller = MpcController_cvxopt(Ad, Bd, Q, R, pre_step, + dt_input_upper=np.array([30 * dt]), dt_input_lower=np.array([-30 * dt]), + input_upper=np.array([30.]), input_lower=np.array([-30.])) + + follow_controller = MpcController_cvxopt(Ad, Bd, Q, R, pre_step, + dt_input_upper=np.array([30 * dt]), dt_input_lower=np.array([-30 * dt]), + input_upper=np.array([30.]), input_lower=np.array([-30.])) + + lead_controller.initialize_controller() + follow_controller.initialize_controller() + + # reference + lead_reference = np.array([[0., 0.] for _ in range(pre_step)]).flatten() + + for i in range(iteration_num): + print("simulation time = {0}".format(i)) + # make lead car's move + if i > int(iteration_num / 3): + lead_reference = np.array([[4., 0.] for _ in range(pre_step)]).flatten() + + lead_states = lead_car.xs + lead_opt_u = lead_controller.calc_input(lead_states[1:], lead_reference) + lead_opt_u = np.hstack((np.array([V]), lead_opt_u)) + + # make follow car + follow_reference = np.array([lead_states[1:] for _ in range(pre_step)]).flatten() + follow_states = follow_car.xs + + follow_opt_u = follow_controller.calc_input(follow_states[1:], follow_reference) + follow_opt_u = np.hstack((np.array([V]), follow_opt_u)) + + lead_car.update_state(lead_opt_u, dt=dt) + follow_car.update_state(follow_opt_u, dt=dt) + + # figures and animation + lead_history_states = np.array(lead_car.history_xs) + follow_history_states = np.array(follow_car.history_xs) + + time_history_fig = plt.figure() + x_fig = time_history_fig.add_subplot(311) + y_fig = time_history_fig.add_subplot(312) + theta_fig = time_history_fig.add_subplot(313) + + car_traj_fig = plt.figure() + traj_fig = car_traj_fig.add_subplot(111) + traj_fig.set_aspect('equal') + + x_fig.plot(np.arange(0, simulation_time+0.01, dt), lead_history_states[:, 0], label="lead") + x_fig.plot(np.arange(0, simulation_time+0.01, dt), follow_history_states[:, 0], label="follow") + x_fig.set_xlabel("time [s]") + x_fig.set_ylabel("x") + x_fig.legend() + + y_fig.plot(np.arange(0, simulation_time+0.01, dt), lead_history_states[:, 1], label="lead") + y_fig.plot(np.arange(0, simulation_time+0.01, dt), follow_history_states[:, 1], label="follow") + y_fig.plot(np.arange(0, simulation_time+0.01, dt), [4. for _ in range(iteration_num+1)], linestyle="dashed") + y_fig.set_xlabel("time [s]") + y_fig.set_ylabel("y") + y_fig.legend() + + theta_fig.plot(np.arange(0, simulation_time+0.01, dt), lead_history_states[:, 2], label="lead") + theta_fig.plot(np.arange(0, simulation_time+0.01, dt), follow_history_states[:, 2], label="follow") + theta_fig.plot(np.arange(0, simulation_time+0.01, dt), [0. for _ in range(iteration_num+1)], linestyle="dashed") + theta_fig.set_xlabel("time [s]") + theta_fig.set_ylabel("theta") + theta_fig.legend() + + time_history_fig.tight_layout() + + traj_fig.plot(lead_history_states[:, 0], lead_history_states[:, 1], label="lead") + traj_fig.plot(follow_history_states[:, 0], follow_history_states[:, 1], label="follow") + traj_fig.set_xlabel("x") + traj_fig.set_ylabel("y") + traj_fig.legend() + plt.show() + + lead_history_us = np.array(lead_controller.history_us) + follow_history_us = np.array(follow_controller.history_us) + input_history_fig = plt.figure() + u_1_fig = input_history_fig.add_subplot(111) + + u_1_fig.plot(np.arange(0, simulation_time+0.01, dt), lead_history_us[:, 0], label="lead") + u_1_fig.plot(np.arange(0, simulation_time+0.01, dt), follow_history_us[:, 0], label="follow") + u_1_fig.set_xlabel("time [s]") + u_1_fig.set_ylabel("u_omega") + + input_history_fig.tight_layout() + plt.show() + + animdrawer = AnimDrawer([lead_history_states, follow_history_states]) + animdrawer.draw_anim() + +if __name__ == "__main__": + main() \ No newline at end of file diff --git a/mpc/sample/pathplanner.py b/mpc/sample/pathplanner.py new file mode 100644 index 0000000..455ddb4 --- /dev/null +++ b/mpc/sample/pathplanner.py @@ -0,0 +1,233 @@ +""" +Cubic spline planner +Author: Atsushi Sakai(@Atsushi_twi) +""" +import math +import numpy as np +import bisect + + +class Spline: + """ + Cubic Spline class + """ + + def __init__(self, x, y): + self.b, self.c, self.d, self.w = [], [], [], [] + + self.x = x + self.y = y + + self.nx = len(x) # dimension of x + h = np.diff(x) + + # calc coefficient c + self.a = [iy for iy in y] + + # calc coefficient c + A = self.__calc_A(h) + B = self.__calc_B(h) + self.c = np.linalg.solve(A, B) + # print(self.c1) + + # calc spline coefficient b and d + for i in range(self.nx - 1): + self.d.append((self.c[i + 1] - self.c[i]) / (3.0 * h[i])) + tb = (self.a[i + 1] - self.a[i]) / h[i] - h[i] * \ + (self.c[i + 1] + 2.0 * self.c[i]) / 3.0 + self.b.append(tb) + + def calc(self, t): + """ + Calc position + if t is outside of the input x, return None + """ + + if t < self.x[0]: + return None + elif t > self.x[-1]: + return None + + i = self.__search_index(t) + dx = t - self.x[i] + result = self.a[i] + self.b[i] * dx + \ + self.c[i] * dx ** 2.0 + self.d[i] * dx ** 3.0 + + return result + + def calcd(self, t): + """ + Calc first derivative + if t is outside of the input x, return None + """ + + if t < self.x[0]: + return None + elif t > self.x[-1]: + return None + + i = self.__search_index(t) + dx = t - self.x[i] + result = self.b[i] + 2.0 * self.c[i] * dx + 3.0 * self.d[i] * dx ** 2.0 + return result + + def calcdd(self, t): + """ + Calc second derivative + """ + + if t < self.x[0]: + return None + elif t > self.x[-1]: + return None + + i = self.__search_index(t) + dx = t - self.x[i] + result = 2.0 * self.c[i] + 6.0 * self.d[i] * dx + return result + + def __search_index(self, x): + """ + search data segment index + """ + return bisect.bisect(self.x, x) - 1 + + def __calc_A(self, h): + """ + calc matrix A for spline coefficient c + """ + A = np.zeros((self.nx, self.nx)) + A[0, 0] = 1.0 + for i in range(self.nx - 1): + if i != (self.nx - 2): + A[i + 1, i + 1] = 2.0 * (h[i] + h[i + 1]) + A[i + 1, i] = h[i] + A[i, i + 1] = h[i] + + A[0, 1] = 0.0 + A[self.nx - 1, self.nx - 2] = 0.0 + A[self.nx - 1, self.nx - 1] = 1.0 + # print(A) + return A + + def __calc_B(self, h): + """ + calc matrix B for spline coefficient c + """ + B = np.zeros(self.nx) + for i in range(self.nx - 2): + B[i + 1] = 3.0 * (self.a[i + 2] - self.a[i + 1]) / \ + h[i + 1] - 3.0 * (self.a[i + 1] - self.a[i]) / h[i] + return B + + +class Spline2D: + """ + 2D Cubic Spline class + """ + + def __init__(self, x, y): + self.s = self.__calc_s(x, y) + self.sx = Spline(self.s, x) + self.sy = Spline(self.s, y) + + def __calc_s(self, x, y): + dx = np.diff(x) + dy = np.diff(y) + self.ds = [math.sqrt(idx ** 2 + idy ** 2) + for (idx, idy) in zip(dx, dy)] + s = [0] + s.extend(np.cumsum(self.ds)) + return s + + def calc_position(self, s): + """ + calc position + """ + x = self.sx.calc(s) + y = self.sy.calc(s) + + return x, y + + def calc_curvature(self, s): + """ + calc curvature + """ + dx = self.sx.calcd(s) + ddx = self.sx.calcdd(s) + dy = self.sy.calcd(s) + ddy = self.sy.calcdd(s) + k = (ddy * dx - ddx * dy) / ((dx ** 2 + dy ** 2)**(3 / 2)) + return k + + def calc_yaw(self, s): + """ + calc yaw + """ + dx = self.sx.calcd(s) + dy = self.sy.calcd(s) + yaw = math.atan2(dy, dx) + return yaw + + +def calc_spline_course(x, y, ds=0.1): + sp = Spline2D(x, y) + s = list(np.arange(0, sp.s[-1], ds)) + + rx, ry, ryaw, rk = [], [], [], [] + for i_s in s: + ix, iy = sp.calc_position(i_s) + rx.append(ix) + ry.append(iy) + ryaw.append(sp.calc_yaw(i_s)) + rk.append(sp.calc_curvature(i_s)) + + return rx, ry, ryaw, rk, s + + +def main(): + print("Spline 2D test") + import matplotlib.pyplot as plt + x = [-2.5, 0.0, 2.5, 5.0, 7.5, 3.0, -1.0] + y = [0.7, -6, 5, 6.5, 0.0, 5.0, -2.0] + ds = 0.1 # [m] distance of each intepolated points + + sp = Spline2D(x, y) + s = np.arange(0, sp.s[-1], ds) + + rx, ry, ryaw, rk = [], [], [], [] + for i_s in s: + ix, iy = sp.calc_position(i_s) + rx.append(ix) + ry.append(iy) + ryaw.append(sp.calc_yaw(i_s)) + rk.append(sp.calc_curvature(i_s)) + + plt.subplots(1) + plt.plot(x, y, "xb", label="input") + plt.plot(rx, ry, "-r", label="spline") + plt.grid(True) + plt.axis("equal") + plt.xlabel("x[m]") + plt.ylabel("y[m]") + plt.legend() + + plt.subplots(1) + plt.plot(s, [np.rad2deg(iyaw) for iyaw in ryaw], "-r", label="yaw") + plt.grid(True) + plt.legend() + plt.xlabel("line length[m]") + plt.ylabel("yaw angle[deg]") + + plt.subplots(1) + plt.plot(s, rk, "-r", label="curvature") + plt.grid(True) + plt.legend() + plt.xlabel("line length[m]") + plt.ylabel("curvature [1/m]") + + plt.show() + + +if __name__ == '__main__': + main() \ No newline at end of file diff --git a/mpc/sample/test.py b/mpc/sample/test.py new file mode 100644 index 0000000..5328002 --- /dev/null +++ b/mpc/sample/test.py @@ -0,0 +1,614 @@ +""" +Path tracking simulation with iterative linear model predictive control for speed and steer control +author: Atsushi Sakai (@Atsushi_twi) +""" +import matplotlib.pyplot as plt +import cvxpy +import math +import numpy as np +import sys + +try: + import pathplanner +except: + raise + + +NX = 4 # x = x, y, v, yaw +NU = 2 # a = [accel, steer] +T = 5 # horizon length + +# mpc parameters +R = np.diag([0.01, 0.01]) # input cost matrix +Rd = np.diag([0.01, 1.0]) # input difference cost matrix +Q = np.diag([1.0, 1.0, 0.5, 0.5]) # state cost matrix +Qf = Q # state final matrix +GOAL_DIS = 1.5 # goal distance +STOP_SPEED = 0.5 / 3.6 # stop speed +MAX_TIME = 500.0 # max simulation time + +# iterative paramter +MAX_ITER = 3 # Max iteration +DU_TH = 0.1 # iteration finish param + +TARGET_SPEED = 10.0 / 3.6 # [m/s] target speed +N_IND_SEARCH = 10 # Search index number + +DT = 0.2 # [s] time tick + +# Vehicle parameters +LENGTH = 4.5 # [m] +WIDTH = 2.0 # [m] +BACKTOWHEEL = 1.0 # [m] +WHEEL_LEN = 0.3 # [m] +WHEEL_WIDTH = 0.2 # [m] +TREAD = 0.7 # [m] +WB = 2.5 # [m] + +MAX_STEER = np.deg2rad(45.0) # maximum steering angle [rad] +MAX_DSTEER = np.deg2rad(30.0) # maximum steering speed [rad/s] +MAX_SPEED = 55.0 / 3.6 # maximum speed [m/s] +MIN_SPEED = -20.0 / 3.6 # minimum speed [m/s] +MAX_ACCEL = 1.0 # maximum accel [m/ss] + +show_animation = True + + +class State: + """ + vehicle state class + """ + + def __init__(self, x=0.0, y=0.0, yaw=0.0, v=0.0): + self.x = x + self.y = y + self.yaw = yaw + self.v = v + self.predelta = None + + +def pi_2_pi(angle): + while(angle > math.pi): + angle = angle - 2.0 * math.pi + + while(angle < -math.pi): + angle = angle + 2.0 * math.pi + + return angle + + +def get_linear_model_matrix(v, phi, delta): + + A = np.zeros((NX, NX)) + A[0, 0] = 1.0 + A[1, 1] = 1.0 + A[2, 2] = 1.0 + A[3, 3] = 1.0 + A[0, 2] = DT * math.cos(phi) + A[0, 3] = - DT * v * math.sin(phi) + A[1, 2] = DT * math.sin(phi) + A[1, 3] = DT * v * math.cos(phi) + A[3, 2] = DT * math.tan(delta) / WB + + B = np.zeros((NX, NU)) + B[2, 0] = DT + B[3, 1] = DT * v / (WB * math.cos(delta) ** 2) + + C = np.zeros(NX) + C[0] = DT * v * math.sin(phi) * phi + C[1] = - DT * v * math.cos(phi) * phi + C[3] = - v * delta / (WB * math.cos(delta) ** 2) + + return A, B, C + + +def plot_car(x, y, yaw, steer=0.0, cabcolor="-r", truckcolor="-k"): # pragma: no cover + + outline = np.array([[-BACKTOWHEEL, (LENGTH - BACKTOWHEEL), (LENGTH - BACKTOWHEEL), -BACKTOWHEEL, -BACKTOWHEEL], + [WIDTH / 2, WIDTH / 2, - WIDTH / 2, -WIDTH / 2, WIDTH / 2]]) + + fr_wheel = np.array([[WHEEL_LEN, -WHEEL_LEN, -WHEEL_LEN, WHEEL_LEN, WHEEL_LEN], + [-WHEEL_WIDTH - TREAD, -WHEEL_WIDTH - TREAD, WHEEL_WIDTH - TREAD, WHEEL_WIDTH - TREAD, -WHEEL_WIDTH - TREAD]]) + + rr_wheel = np.copy(fr_wheel) + + fl_wheel = np.copy(fr_wheel) + fl_wheel[1, :] *= -1 + rl_wheel = np.copy(rr_wheel) + rl_wheel[1, :] *= -1 + + Rot1 = np.array([[math.cos(yaw), math.sin(yaw)], + [-math.sin(yaw), math.cos(yaw)]]) + Rot2 = np.array([[math.cos(steer), math.sin(steer)], + [-math.sin(steer), math.cos(steer)]]) + + fr_wheel = (fr_wheel.T.dot(Rot2)).T + fl_wheel = (fl_wheel.T.dot(Rot2)).T + fr_wheel[0, :] += WB + fl_wheel[0, :] += WB + + fr_wheel = (fr_wheel.T.dot(Rot1)).T + fl_wheel = (fl_wheel.T.dot(Rot1)).T + + outline = (outline.T.dot(Rot1)).T + rr_wheel = (rr_wheel.T.dot(Rot1)).T + rl_wheel = (rl_wheel.T.dot(Rot1)).T + + outline[0, :] += x + outline[1, :] += y + fr_wheel[0, :] += x + fr_wheel[1, :] += y + rr_wheel[0, :] += x + rr_wheel[1, :] += y + fl_wheel[0, :] += x + fl_wheel[1, :] += y + rl_wheel[0, :] += x + rl_wheel[1, :] += y + + plt.plot(np.array(outline[0, :]).flatten(), + np.array(outline[1, :]).flatten(), truckcolor) + plt.plot(np.array(fr_wheel[0, :]).flatten(), + np.array(fr_wheel[1, :]).flatten(), truckcolor) + plt.plot(np.array(rr_wheel[0, :]).flatten(), + np.array(rr_wheel[1, :]).flatten(), truckcolor) + plt.plot(np.array(fl_wheel[0, :]).flatten(), + np.array(fl_wheel[1, :]).flatten(), truckcolor) + plt.plot(np.array(rl_wheel[0, :]).flatten(), + np.array(rl_wheel[1, :]).flatten(), truckcolor) + plt.plot(x, y, "*") + + +def update_state(state, a, delta): + + # input check + if delta >= MAX_STEER: + delta = MAX_STEER + elif delta <= -MAX_STEER: + delta = -MAX_STEER + + state.x = state.x + state.v * math.cos(state.yaw) * DT + state.y = state.y + state.v * math.sin(state.yaw) * DT + state.yaw = state.yaw + state.v / WB * math.tan(delta) * DT + state.v = state.v + a * DT + + if state. v > MAX_SPEED: + state.v = MAX_SPEED + elif state. v < MIN_SPEED: + state.v = MIN_SPEED + + return state + + +def get_nparray_from_matrix(x): + return np.array(x).flatten() + + +def calc_nearest_index(state, cx, cy, cyaw, pind): + + dx = [state.x - icx for icx in cx[pind:(pind + N_IND_SEARCH)]] + dy = [state.y - icy for icy in cy[pind:(pind + N_IND_SEARCH)]] + + d = [idx ** 2 + idy ** 2 for (idx, idy) in zip(dx, dy)] + + mind = min(d) + + ind = d.index(mind) + pind + + mind = math.sqrt(mind) + + dxl = cx[ind] - state.x + dyl = cy[ind] - state.y + + angle = pi_2_pi(cyaw[ind] - math.atan2(dyl, dxl)) + if angle < 0: + mind *= -1 + + return ind, mind + + +def predict_motion(x0, oa, od, xref): + xbar = xref * 0.0 + for i, _ in enumerate(x0): + xbar[i, 0] = x0[i] + + state = State(x=x0[0], y=x0[1], yaw=x0[3], v=x0[2]) + for (ai, di, i) in zip(oa, od, range(1, T + 1)): + state = update_state(state, ai, di) + xbar[0, i] = state.x + xbar[1, i] = state.y + xbar[2, i] = state.v + xbar[3, i] = state.yaw + + return xbar + + +def iterative_linear_mpc_control(xref, x0, dref, oa, od): + """ + MPC contorl with updating operational point iteraitvely + """ + + if oa is None or od is None: + oa = [0.0] * T + od = [0.0] * T + + for i in range(MAX_ITER): + xbar = predict_motion(x0, oa, od, xref) + poa, pod = oa[:], od[:] + oa, od, ox, oy, oyaw, ov = linear_mpc_control(xref, xbar, x0, dref) + du = sum(abs(oa - poa)) + sum(abs(od - pod)) # calc u change value + if du <= DU_TH: + break + else: + print("Iterative is max iter") + + return oa, od, ox, oy, oyaw, ov + + +def linear_mpc_control(xref, xbar, x0, dref): + """ + linear mpc control + xref: reference point + xbar: operational point + x0: initial state + dref: reference steer angle + """ + + x = cvxpy.Variable((NX, T + 1)) + u = cvxpy.Variable((NU, T)) + + cost = 0.0 + constraints = [] + + for t in range(T): + cost += cvxpy.quad_form(u[:, t], R) + + if t != 0: + cost += cvxpy.quad_form(xref[:, t] - x[:, t], Q) + + A, B, C = get_linear_model_matrix( + xbar[2, t], xbar[3, t], dref[0, t]) + constraints += [x[:, t + 1] == A * x[:, t] + B * u[:, t] + C] + + if t < (T - 1): + cost += cvxpy.quad_form(u[:, t + 1] - u[:, t], Rd) + constraints += [cvxpy.abs(u[1, t + 1] - u[1, t]) <= + MAX_DSTEER * DT] + + cost += cvxpy.quad_form(xref[:, T] - x[:, T], Qf) + + constraints += [x[:, 0] == x0] + constraints += [x[2, :] <= MAX_SPEED] + constraints += [x[2, :] >= MIN_SPEED] + constraints += [cvxpy.abs(u[0, :]) <= MAX_ACCEL] + constraints += [cvxpy.abs(u[1, :]) <= MAX_STEER] + + prob = cvxpy.Problem(cvxpy.Minimize(cost), constraints) + prob.solve(solver=cvxpy.ECOS, verbose=False) + + if prob.status == cvxpy.OPTIMAL or prob.status == cvxpy.OPTIMAL_INACCURATE: + ox = get_nparray_from_matrix(x.value[0, :]) + oy = get_nparray_from_matrix(x.value[1, :]) + ov = get_nparray_from_matrix(x.value[2, :]) + oyaw = get_nparray_from_matrix(x.value[3, :]) + oa = get_nparray_from_matrix(u.value[0, :]) + odelta = get_nparray_from_matrix(u.value[1, :]) + + else: + print("Error: Cannot solve mpc..") + oa, odelta, ox, oy, oyaw, ov = None, None, None, None, None, None + + return oa, odelta, ox, oy, oyaw, ov + + +def calc_ref_trajectory(state, cx, cy, cyaw, ck, sp, dl, pind): + xref = np.zeros((NX, T + 1)) + dref = np.zeros((1, T + 1)) + ncourse = len(cx) + + ind, _ = calc_nearest_index(state, cx, cy, cyaw, pind) + + if pind >= ind: + ind = pind + + xref[0, 0] = cx[ind] + xref[1, 0] = cy[ind] + xref[2, 0] = sp[ind] + xref[3, 0] = cyaw[ind] + dref[0, 0] = 0.0 # steer operational point should be 0 + + travel = 0.0 + + for i in range(T + 1): + travel += abs(state.v) * DT + dind = int(round(travel / dl)) + + if (ind + dind) < ncourse: + xref[0, i] = cx[ind + dind] + xref[1, i] = cy[ind + dind] + xref[2, i] = sp[ind + dind] + xref[3, i] = cyaw[ind + dind] + dref[0, i] = 0.0 + else: + xref[0, i] = cx[ncourse - 1] + xref[1, i] = cy[ncourse - 1] + xref[2, i] = sp[ncourse - 1] + xref[3, i] = cyaw[ncourse - 1] + dref[0, i] = 0.0 + + return xref, ind, dref + + +def check_goal(state, goal, tind, nind): + + # check goal + dx = state.x - goal[0] + dy = state.y - goal[1] + d = math.sqrt(dx ** 2 + dy ** 2) + + isgoal = (d <= GOAL_DIS) + + if abs(tind - nind) >= 5: + isgoal = False + + isstop = (abs(state.v) <= STOP_SPEED) + + if isgoal and isstop: + return True + + return False + + +def do_simulation(cx, cy, cyaw, ck, sp, dl, initial_state): + """ + Simulation + cx: course x position list + cy: course y position list + cy: course yaw position list + ck: course curvature list + sp: speed profile + dl: course tick [m] + """ + + goal = [cx[-1], cy[-1]] + + state = initial_state + + # initial yaw compensation + if state.yaw - cyaw[0] >= math.pi: + state.yaw -= math.pi * 2.0 + elif state.yaw - cyaw[0] <= -math.pi: + state.yaw += math.pi * 2.0 + + time = 0.0 + x = [state.x] + y = [state.y] + yaw = [state.yaw] + v = [state.v] + t = [0.0] + d = [0.0] + a = [0.0] + target_ind, _ = calc_nearest_index(state, cx, cy, cyaw, 0) + + odelta, oa = None, None + + cyaw = smooth_yaw(cyaw) + + while MAX_TIME >= time: + xref, target_ind, dref = calc_ref_trajectory( + state, cx, cy, cyaw, ck, sp, dl, target_ind) + + x0 = [state.x, state.y, state.v, state.yaw] # current state + + oa, odelta, ox, oy, oyaw, ov = iterative_linear_mpc_control( + xref, x0, dref, oa, odelta) + + if odelta is not None: + di, ai = odelta[0], oa[0] + + state = update_state(state, ai, di) + time = time + DT + + x.append(state.x) + y.append(state.y) + yaw.append(state.yaw) + v.append(state.v) + t.append(time) + d.append(di) + a.append(ai) + + if check_goal(state, goal, target_ind, len(cx)): + print("Goal") + break + + if show_animation: # pragma: no cover + plt.cla() + if ox is not None: + plt.plot(ox, oy, "xr", label="MPC") + plt.plot(cx, cy, "-r", label="course") + plt.plot(x, y, "ob", label="trajectory") + plt.plot(xref[0, :], xref[1, :], "xk", label="xref") + plt.plot(cx[target_ind], cy[target_ind], "xg", label="target") + plot_car(state.x, state.y, state.yaw, steer=di) + plt.axis("equal") + plt.grid(True) + plt.title("Time[s]:" + str(round(time, 2)) + + ", speed[km/h]:" + str(round(state.v * 3.6, 2))) + plt.pause(0.0001) + + return t, x, y, yaw, v, d, a + + +def calc_speed_profile(cx, cy, cyaw, target_speed): + + speed_profile = [target_speed] * len(cx) + direction = 1.0 # forward + + # Set stop point + for i in range(len(cx) - 1): + dx = cx[i + 1] - cx[i] + dy = cy[i + 1] - cy[i] + + move_direction = math.atan2(dy, dx) + + if dx != 0.0 and dy != 0.0: + dangle = abs(pi_2_pi(move_direction - cyaw[i])) + if dangle >= math.pi / 4.0: + direction = -1.0 + else: + direction = 1.0 + + if direction != 1.0: + speed_profile[i] = - target_speed + else: + speed_profile[i] = target_speed + + speed_profile[-1] = 0.0 + + return speed_profile + + +def smooth_yaw(yaw): + + for i in range(len(yaw) - 1): + dyaw = yaw[i + 1] - yaw[i] + + while dyaw >= math.pi / 2.0: + yaw[i + 1] -= math.pi * 2.0 + dyaw = yaw[i + 1] - yaw[i] + + while dyaw <= -math.pi / 2.0: + yaw[i + 1] += math.pi * 2.0 + dyaw = yaw[i + 1] - yaw[i] + + return yaw + + +def get_straight_course(dl): + ax = [0.0, 5.0, 10.0, 20.0, 30.0, 40.0, 50.0] + ay = [0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0] + cx, cy, cyaw, ck, s = pathplanner.calc_spline_course( + ax, ay, ds=dl) + + return cx, cy, cyaw, ck + + +def get_straight_course2(dl): + ax = [0.0, -10.0, -20.0, -40.0, -50.0, -60.0, -70.0] + ay = [0.0, -1.0, 1.0, 0.0, -1.0, 1.0, 0.0] + cx, cy, cyaw, ck, s = pathplanner.calc_spline_course( + ax, ay, ds=dl) + + return cx, cy, cyaw, ck + + +def get_straight_course3(dl): + ax = [0.0, -10.0, -20.0, -40.0, -50.0, -60.0, -70.0] + ay = [0.0, -1.0, 1.0, 0.0, -1.0, 1.0, 0.0] + cx, cy, cyaw, ck, s = pathplanner.calc_spline_course( + ax, ay, ds=dl) + + cyaw = [i - math.pi for i in cyaw] + + return cx, cy, cyaw, ck + + +def get_forward_course(dl): + ax = [0.0, 60.0, 125.0, 50.0, 75.0, 30.0, -10.0] + ay = [0.0, 0.0, 50.0, 65.0, 30.0, 50.0, -20.0] + cx, cy, cyaw, ck, s = pathplanner.calc_spline_course( + ax, ay, ds=dl) + + return cx, cy, cyaw, ck + + +def get_switch_back_course(dl): + ax = [0.0, 30.0, 6.0, 20.0, 35.0] + ay = [0.0, 0.0, 20.0, 35.0, 20.0] + cx, cy, cyaw, ck, s = pathplanner.calc_spline_course( + ax, ay, ds=dl) + ax = [35.0, 10.0, 0.0, 0.0] + ay = [20.0, 30.0, 5.0, 0.0] + cx2, cy2, cyaw2, ck2, s2 = pathplanner.calc_spline_course( + ax, ay, ds=dl) + cyaw2 = [i - math.pi for i in cyaw2] + cx.extend(cx2) + cy.extend(cy2) + cyaw.extend(cyaw2) + ck.extend(ck2) + + return cx, cy, cyaw, ck + + +def main(): + print(__file__ + " start!!") + + dl = 1.0 # course tick + # cx, cy, cyaw, ck = get_straight_course(dl) + # cx, cy, cyaw, ck = get_straight_course2(dl) + cx, cy, cyaw, ck = get_straight_course3(dl) + # cx, cy, cyaw, ck = get_forward_course(dl) + # CX, cy, cyaw, ck = get_switch_back_course(dl) + + sp = calc_speed_profile(cx, cy, cyaw, TARGET_SPEED) + + initial_state = State(x=cx[0], y=cy[0], yaw=cyaw[0], v=0.0) + + t, x, y, yaw, v, d, a = do_simulation( + cx, cy, cyaw, ck, sp, dl, initial_state) + + if show_animation: # pragma: no cover + plt.close("all") + plt.subplots() + plt.plot(cx, cy, "-r", label="spline") + plt.plot(x, y, "-g", label="tracking") + plt.grid(True) + plt.axis("equal") + plt.xlabel("x[m]") + plt.ylabel("y[m]") + plt.legend() + + plt.subplots() + plt.plot(t, v, "-r", label="speed") + plt.grid(True) + plt.xlabel("Time [s]") + plt.ylabel("Speed [kmh]") + + plt.show() + + +def main2(): + print(__file__ + " start!!") + + dl = 1.0 # course tick + cx, cy, cyaw, ck = get_straight_course3(dl) + + sp = calc_speed_profile(cx, cy, cyaw, TARGET_SPEED) + + initial_state = State(x=cx[0], y=cy[0], yaw=0.0, v=0.0) + + t, x, y, yaw, v, d, a = do_simulation( + cx, cy, cyaw, ck, sp, dl, initial_state) + + if show_animation: # pragma: no cover + plt.close("all") + plt.subplots() + plt.plot(cx, cy, "-r", label="spline") + plt.plot(x, y, "-g", label="tracking") + plt.grid(True) + plt.axis("equal") + plt.xlabel("x[m]") + plt.ylabel("y[m]") + plt.legend() + + plt.subplots() + plt.plot(t, v, "-r", label="speed") + plt.grid(True) + plt.xlabel("Time [s]") + plt.ylabel("Speed [kmh]") + + plt.show() + + +if __name__ == '__main__': + # main() + main2() \ No newline at end of file diff --git a/mpc/with_disturbance/animation.py b/mpc/with_disturbance/animation.py index 6ece541..beee54c 100755 --- a/mpc/with_disturbance/animation.py +++ b/mpc/with_disturbance/animation.py @@ -153,8 +153,8 @@ class AnimDrawer(): self.axis.set_aspect('equal', adjustable='box') # (2) set the xlim and ylim - self.axis.set_xlim(-5, 50) - self.axis.set_ylim(-2, 5) + self.axis.set_xlim(-2, 50) + self.axis.set_ylim(-10, 10) def _set_img(self): """ initialize the imgs of animation diff --git a/mpc/with_disturbance/coordinate_trans.py b/mpc/with_disturbance/coordinate_trans.py index ea66939..9cfa220 100755 --- a/mpc/with_disturbance/coordinate_trans.py +++ b/mpc/with_disturbance/coordinate_trans.py @@ -49,7 +49,7 @@ def coordinate_transformation_in_position(positions, base_positions): Returns ------- - traslated_positions : numpy.ndarray + traslated_positions : numpy.ndarray, shape(2, N) ''' diff --git a/mpc/with_disturbance/iterative_MPC.py b/mpc/with_disturbance/iterative_MPC.py new file mode 100644 index 0000000..5b2059c --- /dev/null +++ b/mpc/with_disturbance/iterative_MPC.py @@ -0,0 +1,295 @@ +import numpy as np +import matplotlib.pyplot as plt +import math +import copy + +from cvxopt import matrix, solvers + +class IterativeMpcController(): + """ + Attributes + ------------ + A : numpy.ndarray + system matrix + B : numpy.ndarray + input matrix + W_D : numpy.ndarray + distubance matrix in state equation + Q : numpy.ndarray + evaluation function weight for states + Qs : numpy.ndarray + concatenated evaluation function weight for states + R : numpy.ndarray + evaluation function weight for inputs + Rs : numpy.ndarray + concatenated evaluation function weight for inputs + pre_step : int + prediction step + state_size : int + state size of the plant + input_size : int + input size of the plant + dt_input_upper : numpy.ndarray, shape(input_size, ), optional + constraints of input dt, default is None + dt_input_lower : numpy.ndarray, shape(input_size, ), optional + constraints of input dt, default is None + input_upper : numpy.ndarray, shape(input_size, ), optional + constraints of input, default is None + input_lower : numpy.ndarray, shape(input_size, ), optional + constraints of input, default is None + """ + def __init__(self, system_model, Q, R, pre_step, initial_input=None, dt_input_upper=None, dt_input_lower=None, input_upper=None, input_lower=None): + """ + Parameters + ------------ + system_model : SystemModel class + Q : numpy.ndarray + evaluation function weight for states + R : numpy.ndarray + evaluation function weight for inputs + pre_step : int + prediction step + dt_input_upper : numpy.ndarray, shape(input_size, ), optional + constraints of input dt, default is None + dt_input_lower : numpy.ndarray, shape(input_size, ), optional + constraints of input dt, default is None + input_upper : numpy.ndarray, shape(input_size, ), optional + constraints of input, default is None + input_lower : numpy.ndarray, shape(input_size, ), optional + constraints of input, default is None + history_us : list + time history of optimal input us(numpy.ndarray) + """ + self.Ad_s = system_model.Ad_s + self.Bd_s = system_model.Bd_s + self.W_D_s = system_model.W_D_s + self.Q = np.array(Q) + self.R = np.array(R) + self.pre_step = pre_step + + self.Qs = None + self.Rs = None + + self.state_size = self.Ad_s[0].shape[0] + self.input_size = self.Bd_s[0].shape[1] + + self.history_us = [np.zeros(self.input_size)] + + # initial state + if initial_input is not None: + self.history_us = [initial_input] + + # constraints + self.dt_input_lower = dt_input_lower + self.dt_input_upper = dt_input_upper + self.input_upper = input_upper + self.input_lower = input_lower + + # about mpc matrix + self.W = None + self.omega = None + self.F = None + self.f = None + + def initialize_controller(self): + """ + make matrix to calculate optimal control input + """ + A_factorials = [self.Ad_s[0]] + + self.phi_mat = copy.deepcopy(self.Ad_s[0]) + + for i in range(self.pre_step - 1): + temp_mat = np.dot(A_factorials[-1], self.Ad_s[i + 1]) + self.phi_mat = np.vstack((self.phi_mat, temp_mat)) + + A_factorials.append(temp_mat) # after we use this factorials + + print("phi_mat = \n{0}".format(self.phi_mat)) + + self.gamma_mat = copy.deepcopy(self.Bd_s[0]) + gammma_mat_temp = copy.deepcopy(self.Bd_s[0]) + + for i in range(self.pre_step - 1): + temp_1_mat = np.dot(A_factorials[i], self.Bd_s[i + 1]) + gammma_mat_temp = temp_1_mat + gammma_mat_temp + self.gamma_mat = np.vstack((self.gamma_mat, gammma_mat_temp)) + + print("gamma_mat = \n{0}".format(self.gamma_mat)) + + self.theta_mat = copy.deepcopy(self.gamma_mat) + + for i in range(self.pre_step - 1): + temp_mat = np.zeros_like(self.gamma_mat) + temp_mat[int((i + 1)*self.state_size): , :] = self.gamma_mat[:-int((i + 1)*self.state_size) , :] + + self.theta_mat = np.hstack((self.theta_mat, temp_mat)) + + print("theta_mat = \n{0}".format(self.theta_mat)) + + # disturbance + print("A_factorials_mat = \n{0}".format(A_factorials)) + A_factorials_mat = np.array(A_factorials).reshape(-1, self.state_size) + print("A_factorials_mat = \n{0}".format(A_factorials_mat)) + + eye = np.eye(self.state_size) + self.dist_mat = np.vstack((eye, A_factorials_mat[:-self.state_size, :])) + base_mat = copy.deepcopy(self.dist_mat) + + print("base_mat = \n{0}".format(base_mat)) + + for i in range(self.pre_step - 1): + temp_mat = np.zeros_like(A_factorials_mat) + temp_mat[int((i + 1)*self.state_size): , :] = base_mat[:-int((i + 1)*self.state_size) , :] + self.dist_mat = np.hstack((self.dist_mat, temp_mat)) + + print("dist_mat = \n{0}".format(self.dist_mat)) + + W_Ds = copy.deepcopy(self.W_D_s[0]) + + for i in range(self.pre_step - 1): + W_Ds = np.vstack((W_Ds, self.W_D_s[i + 1])) + + self.dist_mat = np.dot(self.dist_mat, W_Ds) + + print("dist_mat = \n{0}".format(self.dist_mat)) + + # evaluation function weight + diag_Qs = np.array([np.diag(self.Q) for _ in range(self.pre_step)]) + diag_Rs = np.array([np.diag(self.R) for _ in range(self.pre_step)]) + + self.Qs = np.diag(diag_Qs.flatten()) + self.Rs = np.diag(diag_Rs.flatten()) + + print("Qs = \n{0}".format(self.Qs)) + print("Rs = \n{0}".format(self.Rs)) + + # constraints + # about dt U + if self.input_lower is not None: + # initialize + self.F = np.zeros((self.input_size * 2, self.pre_step * self.input_size)) + for i in range(self.input_size): + self.F[i * 2: (i + 1) * 2, i] = np.array([1., -1.]) + temp_F = copy.deepcopy(self.F) + + print("F = \n{0}".format(self.F)) + + for i in range(self.pre_step - 1): + temp_F = copy.deepcopy(temp_F) + + for j in range(self.input_size): + temp_F[j * 2: (j + 1) * 2, ((i+1) * self.input_size) + j] = np.array([1., -1.]) + + self.F = np.vstack((self.F, temp_F)) + + self.F1 = self.F[:, :self.input_size] + + temp_f = [] + + for i in range(self.input_size): + temp_f.append(-1 * self.input_upper[i]) + temp_f.append(self.input_lower[i]) + + self.f = np.array([temp_f for _ in range(self.pre_step)]).flatten() + + print("F = \n{0}".format(self.F)) + print("F1 = \n{0}".format(self.F1)) + print("f = \n{0}".format(self.f)) + + # about dt_u + if self.dt_input_lower is not None: + self.W = np.zeros((2, self.pre_step * self.input_size)) + self.W[:, 0] = np.array([1., -1.]) + + for i in range(self.pre_step * self.input_size - 1): + temp_W = np.zeros((2, self.pre_step * self.input_size)) + temp_W[:, i+1] = np.array([1., -1.]) + self.W = np.vstack((self.W, temp_W)) + + temp_omega = [] + + for i in range(self.input_size): + temp_omega.append(self.dt_input_upper[i]) + temp_omega.append(-1. * self.dt_input_lower[i]) + + self.omega = np.array([temp_omega for _ in range(self.pre_step)]).flatten() + + print("W = \n{0}".format(self.W)) + print("omega = \n{0}".format(self.omega)) + + # about state + print("check the matrix!! if you think rite, plese push enter") + # input() + + def calc_input(self, states, references): + """calculate optimal input + Parameters + ----------- + states : numpy.ndarray, shape(state length, ) + current state of system + references : numpy.ndarray, shape(state length * pre_step, ) + reference of the system, you should set this value as reachable goal + + References + ------------ + opt_input : numpy.ndarray, shape(input_length, ) + optimal input + """ + temp_1 = np.dot(self.phi_mat, states.reshape(-1, 1)) + temp_2 = np.dot(self.gamma_mat, self.history_us[-1].reshape(-1, 1)) + + error = references.reshape(-1, 1) - temp_1 - temp_2 - self.dist_mat + + G = 2. * np.dot(self.theta_mat.T, np.dot(self.Qs, error)) + + H = np.dot(self.theta_mat.T, np.dot(self.Qs, self.theta_mat)) + self.Rs + + # constraints + A = [] + b = [] + + if self.W is not None: + A.append(self.W) + b.append(self.omega.reshape(-1, 1)) + + if self.F is not None: + b_F = - np.dot(self.F1, self.history_us[-1].reshape(-1, 1)) - self.f.reshape(-1, 1) + A.append(self.F) + b.append(b_F) + + A = np.array(A).reshape(-1, self.input_size * self.pre_step) + + ub = np.array(b).flatten() + + # make cvxpy problem formulation + P = 2*matrix(H) + q = matrix(-1 * G) + A = matrix(A) + b = matrix(ub) + + # constraint + if self.W is not None or self.F is not None : + opt_result = solvers.qp(P, q, G=A, h=b) + + opt_dt_us = list(opt_result['x']) + + opt_u = opt_dt_us[:self.input_size] + self.history_us[-1] + + # save + self.history_us.append(opt_u) + + return opt_u + + def update_system_model(self, system_model): + """update system model + Parameters + ----------- + system_model : SystemModel class + """ + + self.Ad_s = system_model.Ad_s + self.Bd_s = system_model.Bd_s + self.W_D_s = system_model.W_D_s + + self.initialize_controller() diff --git a/mpc/with_disturbance/main_track.py b/mpc/with_disturbance/main_track.py index 03200e5..48c785d 100644 --- a/mpc/with_disturbance/main_track.py +++ b/mpc/with_disturbance/main_track.py @@ -3,10 +3,12 @@ import matplotlib.pyplot as plt import math import copy -from mpc_func_with_cvxopt import MpcController as MpcController_cvxopt +# from mpc_func_with_cvxopt import MpcController as MpcController_cvxopt +from iterative_MPC import IterativeMpcController from animation import AnimDrawer -from control import matlab +# from control import matlab from coordinate_trans import coordinate_transformation_in_angle, coordinate_transformation_in_position +from traj_func import make_sample_traj class WheeledSystem(): """SampleSystem, this is the simulator @@ -29,6 +31,8 @@ class WheeledSystem(): self.NUM_STATE = 4 self.xs = np.zeros(self.NUM_STATE) + self.tau = 0.01 + self.FRONT_WHEELE_BASE = 1.0 self.REAR_WHEELE_BASE = 1.0 @@ -89,7 +93,9 @@ class WheeledSystem(): u_2 : float system input """ - y_dot = u_1 * math.cos(y_3 + y_4) + # y_dot = u_1 * math.cos(y_3 + y_4) + y_dot = u_1 * math.cos(y_3) + return y_dot def _func_x_2(self, y_1, y_2, y_3, y_4, u_1, u_2): @@ -104,7 +110,9 @@ class WheeledSystem(): u_2 : float system input """ - y_dot = u_1 * math.sin(y_3 + y_4) + # y_dot = u_1 * math.sin(y_3 + y_4) + y_dot = u_1 * math.sin(y_3) + return y_dot def _func_x_3(self, y_1, y_2, y_3, y_4, u_1, u_2): @@ -119,135 +127,245 @@ class WheeledSystem(): u_2 : float system input """ - y_dot = u_1 / self.REAR_WHEELE_BASE * math.sin(y_4) + # y_dot = u_1 / self.REAR_WHEELE_BASE * math.sin(y_4) + y_dot = u_1 * math.tan(y_4) / (self.REAR_WHEELE_BASE + self.FRONT_WHEELE_BASE) + return y_dot def _func_x_4(self, y_1, y_2, y_3, y_4, u_1, u_2): + """Ad, Bd, W_D, Q, R + ParAd, Bd, W_D, Q, R + ---Ad, Bd, W_D, Q, R + y_1 : float + y_2 : float + y_3 : float + u_1 : float + system input + u_2 : float + system input """ - """ - y_dot = math.atan2(self.REAR_WHEELE_BASE * math.tan(u_2) ,self.REAR_WHEELE_BASE + self.FRONT_WHEELE_BASE) + # y_dot = math.atan2(self.REAR_WHEELE_BASE * math.tan(u_2) ,self.REAR_WHEELE_BASE + self.FRONT_WHEELE_BASE) + y_dot = - 1. / self.tau * (y_4 - u_2) return y_dot +class SystemModel(): + """ + Attributes + ----------- + WHEEL_BASE : float + wheel base of the car + Ad_s : list + list of system model matrix Ad + Bd_s : list + list of system model matrix Bd + W_D_s : list + list of system model matrix W_D_s + Q : numpy.ndarray + R : numpy.ndarray + """ + def __init__(self, tau = 0.15, dt = 0.016): + """ + Parameters + ----------- + tau : time constant, optional + dt : sampling time, optional + """ + self.dt = dt + self.tau = tau + self.WHEEL_BASE = 2.2 + + self.Ad_s = [] + self.Bd_s = [] + self.W_D_s = [] + + def calc_predict_sytem_model(self, V, curvatures, predict_step): + """ + calc next predict systemo models + V : float + curvatures : list + this is the next curvature's list + predict_step : int + predict step of MPC + """ + for i in range(predict_step): + delta_r = math.atan2(self.WHEEL_BASE, 1. / curvatures[i]) + + A12 = (V / self.WHEEL_BASE) / math.cos(delta_r) + A22 = (1. - 1. / self.tau * self.dt) + + Ad = np.array([[1., V * self.dt, 0.], + [0., 1., A12 * self.dt], + [0., 0., A22]]) + + Bd = np.array([[0.], [0.], [1. / self.tau]]) * self.dt + + W_D_0 = (V / self.WHEEL_BASE) * delta_r / (math.cos(delta_r)**2) - V * curvatures[i] + + W_D = np.array([[0.], [W_D_0], [0.]]) * self.dt + + self.Ad_s.append(Ad) + self.Bd_s.append(Bd) + self.W_D_s.append(W_D) + + # return self.Ad_s, self.Bd_s, self.W_D_s + +def search_nearest_point(points, base_point): + """ + Parameters + ----------- + points : numpy.ndarray, shape is (2, N) + base_point : numpy.ndarray, shape is (2, 1) + + Returns + ------- + nearest_index : + nearest_point : + """ + distance_mat = np.sqrt(np.sum((points - base_point)**2, axis=0)) + + index_min = np.argmin(distance_mat) + + return index_min, points[:, index_min] + +def calc_curvatures(traj_ref, predict_step, num_skip): + """ + Parameters + ----------- + points : numpy.ndarray, shape is (2, predict_step + num_skip) + predict_step : int + num_skip : int + + Returns + ------- + angles : list + list of angle + curvatures : list + list of curvature + """ + angles = [] + curvatures = [] + + for i in range(predict_step): + + temp = traj_ref[:, i + num_skip] - traj_ref[:, i] + alpha = math.atan2(temp[1], temp[0]) + + angles.append(alpha) + + distance = np.linalg.norm(temp) + R = distance / (2. * math.sin(alpha)) + curvatures.append(1. / R) + + # print("curvatures = {}".format(curvatures)) + + return angles, curvatures + def main(): - dt = 0.016 + # parameters + dt = 0.01 simulation_time = 10 # in seconds + PREDICT_STEP = 15 iteration_num = int(simulation_time / dt) - # you must be care about this matrix - # these A and B are for continuos system if you want to use discret system matrix please skip this step - # lineared car system - WHEEL_BASE = 2.2 - tau = 0.01 - - V = 5.0 # initialize - - - delta_r = 0. - - A12 = (V / WHEEL_BASE) / (math.cos(delta_r)**2) - A22 = (1. - 1. / tau) - - Ad = np.array([[1., V, 0.], - [0., 1., A12], - [0., 0., A22]]) * dt - - Bd = np.array([[0.], [0.], [1. / tau]]) * dt - - W_D_0 = - (V / WHEEL_BASE) * delta_r / (math.cos(delta_r)**2) - - W_D = np.array([[0.], [W_D_0], [0.]]) * dt - # make simulator with coninuous matrix - init_xs_lead = np.array([5., 0., 0. ,0.]) + init_xs_lead = np.array([0., 0., 0. ,0.]) init_xs_follow = np.array([0., 0., 0., 0.]) lead_car = WheeledSystem(init_states=init_xs_lead) follow_car = WheeledSystem(init_states=init_xs_follow) + # make system model + lead_car_system_model = SystemModel() + follow_car_system_model = SystemModel() + + # reference + traj_ref_xs, traj_ref_ys = make_sample_traj(int(simulation_time/dt)) + traj_ref = np.array([traj_ref_xs, traj_ref_ys]) + + # nearest point + index_min, nearest_point = search_nearest_point(traj_ref, lead_car.xs[:2].reshape(2, 1)) + + # get traj's curvature + NUM_SKIP = 5 + angles, curvatures = calc_curvatures(traj_ref[:, index_min:index_min + PREDICT_STEP + NUM_SKIP], PREDICT_STEP, NUM_SKIP) + # evaluation function weight Q = np.diag([1., 1., 1.]) R = np.diag([5.]) - pre_step = 15 + + # System model update + V = 4.0 # in pratical we should calc from the state + lead_car_system_model.calc_predict_sytem_model(V, curvatures, PREDICT_STEP) + follow_car_system_model.calc_predict_sytem_model(V, curvatures, PREDICT_STEP) # make controller with discreted matrix - # please check the solver, if you want to use the scipy, set the MpcController_scipy - lead_controller = MpcController_cvxopt(Ad, Bd, W_D, Q, R, pre_step, + lead_controller = IterativeMpcController(lead_car_system_model, Q, R, PREDICT_STEP, dt_input_upper=np.array([30 * dt]), dt_input_lower=np.array([-30 * dt]), input_upper=np.array([30.]), input_lower=np.array([-30.])) - follow_controller = MpcController_cvxopt(Ad, Bd, W_D, Q, R, pre_step, + follow_controller = IterativeMpcController(follow_car_system_model, Q, R, PREDICT_STEP, dt_input_upper=np.array([30 * dt]), dt_input_lower=np.array([-30 * dt]), input_upper=np.array([30.]), input_lower=np.array([-30.])) + # initialize lead_controller.initialize_controller() follow_controller.initialize_controller() - - # reference - lead_reference = np.array([[0., 0., 0.] for _ in range(pre_step)]).flatten() - ref = np.array([[0.], [0.]]) - + for i in range(iteration_num): print("simulation time = {0}".format(i)) - # make lead car's move - if i > int(iteration_num / 3): - ref = np.array([[0.], [4.]]) - ## lead # world traj lead_states = lead_car.xs + # nearest point + index_min, nearest_point = search_nearest_point(traj_ref, lead_car.xs[:2].reshape(2, 1)) + # end check + if len(traj_ref_ys) <= index_min + 10: + print("break") + break + + # get traj's curvature + NUM_SKIP = 2 + angles, curvatures = calc_curvatures(traj_ref[:, index_min:index_min + PREDICT_STEP + NUM_SKIP], PREDICT_STEP, NUM_SKIP) + + # System model update + V = 4.0 # in pratical we should calc from the state + lead_car_system_model.calc_predict_sytem_model(V, curvatures, PREDICT_STEP) + # transformation - relative_ref = coordinate_transformation_in_position(ref, lead_states[:2]) - relative_ref = coordinate_transformation_in_angle(relative_ref, lead_states[2]) + # car + relative_car_position = coordinate_transformation_in_position(lead_states[:2].reshape(2, 1), nearest_point) + relative_car_position = coordinate_transformation_in_angle(relative_car_position, angles[0]) + + relative_car_angle = lead_states[2] - angles[0] + relative_car_state = np.hstack((relative_car_position[1], relative_car_angle, lead_states[-1])) # make ref - lead_reference = np.array([[ref[1, 0], 0., 0.] for _ in range(pre_step)]).flatten() + lead_reference = np.array([[0., 0., 0.] for i in range(PREDICT_STEP)]).flatten() - alpha = math.atan2(relative_ref[1], relative_ref[0]) - R = np.linalg.norm(relative_ref) / 2 * math.sin(alpha) - - print(R) - input() - - V = 7.0 - delta_r = math.atan2(WHEEL_BASE, R) - - A12 = (V / WHEEL_BASE) / (math.cos(delta_r)**2) - A22 = (1. - 1. / tau) - - Ad = np.array([[1., V, 0.], - [0., 1., A12], - [0., 0., A22]]) * dt - - Bd = np.array([[0.], [0.], [1. / tau]]) * dt - - W_D_0 = - (V / WHEEL_BASE) * delta_r / (math.cos(delta_r)**2) - - W_D = np.array([[0.], [W_D_0], [0.]]) * dt + print("relative car state = {}".format(relative_car_state)) + print("nearest point = {}".format(nearest_point)) + # input() # update system matrix - lead_controller.update_system_model(Ad, Bd, W_D) + lead_controller.update_system_model(lead_car_system_model) + + lead_opt_u = lead_controller.calc_input(relative_car_state, lead_reference) - lead_opt_u = lead_controller.calc_input(np.zeros(3), lead_reference) lead_opt_u = np.hstack((np.array([V]), lead_opt_u)) - - ## follow - # make follow car - follow_reference = np.array([lead_states[1:] for _ in range(pre_step)]).flatten() - follow_states = follow_car.xs - - follow_opt_u = follow_controller.calc_input(follow_states[1:], follow_reference) - follow_opt_u = np.hstack((np.array([V]), follow_opt_u)) + print("opt_u = {}".format(lead_opt_u)) + # input() lead_car.update_state(lead_opt_u, dt=dt) - follow_car.update_state(follow_opt_u, dt=dt) + follow_car.update_state(lead_opt_u, dt=dt) # figures and animation lead_history_states = np.array(lead_car.history_xs) follow_history_states = np.array(follow_car.history_xs) + """ time_history_fig = plt.figure() x_fig = time_history_fig.add_subplot(311) y_fig = time_history_fig.add_subplot(312) @@ -298,6 +416,7 @@ def main(): input_history_fig.tight_layout() plt.show() + """ animdrawer = AnimDrawer([lead_history_states, follow_history_states]) animdrawer.draw_anim() diff --git a/mpc/with_disturbance/traj_func.py b/mpc/with_disturbance/traj_func.py new file mode 100644 index 0000000..ddf6304 --- /dev/null +++ b/mpc/with_disturbance/traj_func.py @@ -0,0 +1,32 @@ +import numpy as np +import matplotlib.pyplot as plt +import math + +def make_sample_traj(NUM, dt=0.01, a=1.): + """ + make sample trajectory + Parameters + ------------ + NUM : int + dt : float + a : float + + Returns + ---------- + traj_xs : list + traj_ys : list + """ + DELAY = 2. + traj_xs = [] + traj_ys = [] + + for i in range(NUM): + traj_xs.append(dt * i) + traj_ys.append(a * math.sin(dt * i / DELAY)) + + + plt.plot(traj_xs, traj_ys) + plt.show() + + return traj_xs, traj_ys +