657 lines
19 KiB
Python
657 lines
19 KiB
Python
import numpy as np
|
|
import matplotlib.pyplot as plt
|
|
import math
|
|
import copy
|
|
|
|
class SampleSystem():
|
|
"""SampleSystem, this is the simulator
|
|
Attributes
|
|
-----------
|
|
x_1 : float
|
|
system state 1
|
|
x_2 : float
|
|
system state 2
|
|
history_x_1 : list
|
|
time history of system state 1 (x_1)
|
|
history_x_2 : list
|
|
time history of system state 2 (x_2)
|
|
"""
|
|
def __init__(self, init_x_1=0., init_x_2=0.):
|
|
"""
|
|
Palameters
|
|
-----------
|
|
init_x_1 : float, optional
|
|
initial value of x_1, default is 0.
|
|
init_x_2 : float, optional
|
|
initial value of x_2, default is 0.
|
|
"""
|
|
self.x_1 = init_x_1
|
|
self.x_2 = init_x_2
|
|
self.history_x_1 = [init_x_1]
|
|
self.history_x_2 = [init_x_2]
|
|
|
|
def update_state(self, u, dt=0.01):
|
|
"""
|
|
Palameters
|
|
------------
|
|
u : float
|
|
input of system in some cases this means the reference
|
|
dt : float in seconds, optional
|
|
sampling time of simulation, default is 0.01 [s]
|
|
"""
|
|
# for theta 1, theta 1 dot, theta 2, theta 2 dot
|
|
k0 = [0.0 for _ in range(2)]
|
|
k1 = [0.0 for _ in range(2)]
|
|
k2 = [0.0 for _ in range(2)]
|
|
k3 = [0.0 for _ in range(2)]
|
|
|
|
functions = [self._func_x_1, self._func_x_2]
|
|
|
|
# solve Runge-Kutta
|
|
for i, func in enumerate(functions):
|
|
k0[i] = dt * func(self.x_1, self.x_2, u)
|
|
|
|
for i, func in enumerate(functions):
|
|
k1[i] = dt * func(self.x_1 + k0[0]/2., self.x_2 + k0[1]/2., u)
|
|
|
|
for i, func in enumerate(functions):
|
|
k2[i] = dt * func(self.x_1 + k1[0]/2., self.x_2 + k1[1]/2., u)
|
|
|
|
for i, func in enumerate(functions):
|
|
k3[i] = dt * func(self.x_1 + k2[0], self.x_2 + k2[1], u)
|
|
|
|
self.x_1 += (k0[0] + 2. * k1[0] + 2. * k2[0] + k3[0]) / 6.
|
|
self.x_2 += (k0[1] + 2. * k1[1] + 2. * k2[1] + k3[1]) / 6.
|
|
|
|
# save
|
|
self.history_x_1.append(copy.deepcopy(self.x_1))
|
|
self.history_x_2.append(copy.deepcopy(self.x_2))
|
|
|
|
def _func_x_1(self, y_1, y_2, u):
|
|
"""
|
|
Parameters
|
|
------------
|
|
y_1 : float
|
|
y_2 : float
|
|
u : float
|
|
system input
|
|
"""
|
|
y_dot = y_2
|
|
return y_dot
|
|
|
|
def _func_x_2(self, y_1, y_2, u):
|
|
"""
|
|
Parameters
|
|
------------
|
|
y_1 : float
|
|
y_2 : float
|
|
u : float
|
|
system input
|
|
"""
|
|
y_dot = (1. - y_1**2 - y_2**2) * y_2 - y_1 + u
|
|
return y_dot
|
|
|
|
|
|
class NMPCSimulatorSystem():
|
|
"""SimulatorSystem for nmpc, this is the simulator of nmpc
|
|
the reason why I seperate the real simulator and nmpc's simulator is sometimes the modeling error, disturbance can include in real simulator
|
|
Attributes
|
|
-----------
|
|
|
|
"""
|
|
def __init__(self):
|
|
"""
|
|
Parameters
|
|
-----------
|
|
None
|
|
"""
|
|
pass
|
|
|
|
def calc_predict_and_adjoint_state(self, x_1, x_2, us, N, dt):
|
|
"""main
|
|
Parameters
|
|
------------
|
|
x_1 : float
|
|
current state
|
|
x_2 : float
|
|
current state
|
|
us : list of float
|
|
estimated optimal input Us for N steps
|
|
N : int
|
|
predict step
|
|
dt : float
|
|
sampling time
|
|
|
|
Returns
|
|
--------
|
|
x_1s : list of float
|
|
predicted x_1s for N steps
|
|
x_2s : list of float
|
|
predicted x_2s for N steps
|
|
lam_1s : list of float
|
|
adjoint state of x_1s, lam_1s for N steps
|
|
lam_2s : list of float
|
|
adjoint state of x_2s, lam_2s for N steps
|
|
"""
|
|
|
|
x_1s, x_2s = self._calc_predict_states(x_1, x_2, us, N, dt) # by usin state equation
|
|
lam_1s, lam_2s = self._calc_adjoint_states(x_1s, x_2s, us, N, dt) # by using adjoint equation
|
|
|
|
return x_1s, x_2s, lam_1s, lam_2s
|
|
|
|
def _calc_predict_states(self, x_1, x_2, us, N, dt):
|
|
"""
|
|
Parameters
|
|
------------
|
|
x_1 : float
|
|
current state
|
|
x_2 : float
|
|
current state
|
|
us : list of float
|
|
estimated optimal input Us for N steps
|
|
N : int
|
|
predict step
|
|
dt : float
|
|
sampling time
|
|
|
|
Returns
|
|
--------
|
|
x_1s : list of float
|
|
predicted x_1s for N steps
|
|
x_2s : list of float
|
|
predicted x_2s for N steps
|
|
"""
|
|
# initial state
|
|
x_1s = np.zeros(N+1)
|
|
x_2s = np.zeros(N+1)
|
|
|
|
# input initial state
|
|
x_1s[0] = x_1
|
|
x_2s[0] = x_2
|
|
|
|
for i in range(N):
|
|
temp_x_1, temp_x_2 = self._predict_state_with_oylar(x_1s[i], x_2s[i], us[i], dt)
|
|
x_1s[i+1] = temp_x_1
|
|
x_2s[i+1] = temp_x_2
|
|
|
|
return x_1s, x_2s
|
|
|
|
def _calc_adjoint_states(self, x_1s, x_2s, us, N, dt):
|
|
"""
|
|
Parameters
|
|
------------
|
|
x_1s : list of float
|
|
predicted x_1s for N steps
|
|
x_2s : list of float
|
|
predicted x_2s for N steps
|
|
us : list of float
|
|
estimated optimal input Us for N steps
|
|
N : int
|
|
predict step
|
|
dt : float
|
|
sampling time
|
|
|
|
Returns
|
|
--------
|
|
lam_1s : list of float
|
|
adjoint state of x_1s, lam_1s for N steps
|
|
lam_2s : list of float
|
|
adjoint state of x_2s, lam_2s for N steps
|
|
"""
|
|
# final state
|
|
# final_state_func
|
|
lam_1s = np.zeros(N)
|
|
lam_2s = np.zeros(N)
|
|
|
|
# input final state
|
|
lam_1s[-1] = x_1s[-1]
|
|
lam_2s[-1] = x_2s[-1]
|
|
|
|
for i in range(N-1, 0, -1):
|
|
temp_lam_1, temp_lam_2 = self._adjoint_state_with_oylar(x_1s[i], x_2s[i], lam_1s[i] ,lam_2s[i], us[i], dt)
|
|
lam_1s[i-1] = temp_lam_1
|
|
lam_2s[i-1] = temp_lam_2
|
|
|
|
return lam_1s, lam_2s
|
|
|
|
def final_state_func(self):
|
|
"""this func usually need
|
|
"""
|
|
pass
|
|
|
|
def _predict_state_with_oylar(self, x_1, x_2, u, dt):
|
|
"""in this case this function is the same as simulator
|
|
Parameters
|
|
------------
|
|
x_1 : float
|
|
system state
|
|
x_2 : float
|
|
system state
|
|
u : float
|
|
system input
|
|
dt : float in seconds
|
|
sampling time
|
|
Returns
|
|
--------
|
|
next_x_1 : float
|
|
next state, x_1 calculated by using state equation
|
|
next_x_2 : float
|
|
next state, x_2 calculated by using state equation
|
|
"""
|
|
k0 = [0. for _ in range(2)]
|
|
|
|
functions = [self.func_x_1, self.func_x_2]
|
|
|
|
for i, func in enumerate(functions):
|
|
k0[i] = dt * func(x_1, x_2, u)
|
|
|
|
next_x_1 = x_1 + k0[0]
|
|
next_x_2 = x_2 + k0[1]
|
|
|
|
return next_x_1, next_x_2
|
|
|
|
def func_x_1(self, y_1, y_2, u):
|
|
"""calculating \dot{x_1}
|
|
Parameters
|
|
------------
|
|
y_1 : float
|
|
means x_1
|
|
y_2 : float
|
|
means x_2
|
|
u : float
|
|
means system input
|
|
Returns
|
|
---------
|
|
y_dot : float
|
|
means \dot{x_1}
|
|
"""
|
|
y_dot = y_2
|
|
return y_dot
|
|
|
|
def func_x_2(self, y_1, y_2, u):
|
|
"""calculating \dot{x_2}
|
|
Parameters
|
|
------------
|
|
y_1 : float
|
|
means x_1
|
|
y_2 : float
|
|
means x_2
|
|
u : float
|
|
means system input
|
|
Returns
|
|
---------
|
|
y_dot : float
|
|
means \dot{x_2}
|
|
"""
|
|
y_dot = (1. - y_1**2 - y_2**2) * y_2 - y_1 + u
|
|
return y_dot
|
|
|
|
def _adjoint_state_with_oylar(self, x_1, x_2, lam_1, lam_2, u, dt):
|
|
"""
|
|
Parameters
|
|
------------
|
|
x_1 : float
|
|
system state
|
|
x_2 : float
|
|
system state
|
|
lam_1 : float
|
|
adjoint state
|
|
lam_2 : float
|
|
adjoint state
|
|
u : float
|
|
system input
|
|
dt : float in seconds
|
|
sampling time
|
|
Returns
|
|
--------
|
|
pre_lam_1 : float
|
|
pre, 1 step before lam_1 calculated by using adjoint equation
|
|
pre_lam_2 : float
|
|
pre, 1 step before lam_2 calculated by using adjoint equation
|
|
"""
|
|
k0 = [0. for _ in range(2)]
|
|
|
|
functions = [self._func_lam_1, self._func_lam_2]
|
|
|
|
for i, func in enumerate(functions):
|
|
k0[i] = dt * func(x_1, x_2, lam_1, lam_2, u)
|
|
|
|
pre_lam_1 = lam_1 + k0[0]
|
|
pre_lam_2 = lam_2 + k0[1]
|
|
|
|
return pre_lam_1, pre_lam_2
|
|
|
|
def _func_lam_1(self, y_1, y_2, y_3, y_4, u):
|
|
"""calculating -\dot{lam_1}
|
|
Parameters
|
|
------------
|
|
y_1 : float
|
|
means x_1
|
|
y_2 : float
|
|
means x_2
|
|
y_3 : float
|
|
means lam_1
|
|
y_4 : float
|
|
means lam_2
|
|
u : float
|
|
means system input
|
|
Returns
|
|
---------
|
|
y_dot : float
|
|
means -\dot{lam_1}
|
|
"""
|
|
y_dot = y_1 - (2. * y_1 * y_2 + 1.) * y_4
|
|
return y_dot
|
|
|
|
def _func_lam_2(self, y_1, y_2, y_3, y_4, u):
|
|
"""calculating -\dot{lam_2}
|
|
Parameters
|
|
------------
|
|
y_1 : float
|
|
means x_1
|
|
y_2 : float
|
|
means x_2
|
|
y_3 : float
|
|
means lam_1
|
|
y_4 : float
|
|
means lam_2
|
|
u : float
|
|
means system input
|
|
Returns
|
|
---------
|
|
y_dot : float
|
|
means -\dot{lam_2}
|
|
"""
|
|
y_dot = y_2 + y_3 + (-3. * (y_2**2) - y_1**2 + 1. ) * y_4
|
|
return y_dot
|
|
|
|
|
|
def calc_numerical_gradient(forward_prop, grad_f, all_us, shape, input_size):
|
|
"""
|
|
Parameters
|
|
------------
|
|
forward_prop : function
|
|
forward prop
|
|
grad_f : function
|
|
gradient function
|
|
all_us : numpy.ndarray, shape(pred_len, input_size*3)
|
|
all inputs including with dummy input
|
|
shape : tuple
|
|
shape of Jacobian
|
|
input_size : int
|
|
input size of system
|
|
|
|
Returns
|
|
---------
|
|
grad : numpy.ndarray, shape is the same as shape
|
|
results of numercial gradient of the input
|
|
|
|
References
|
|
-----------
|
|
- oreilly japan 0 から作るdeeplearning
|
|
https://github.com/oreilly-japan/deep-learning-from-scratch/blob/master/common/gradient.py
|
|
"""
|
|
h = 1e-3 # 0.01
|
|
grad = np.zeros(shape)
|
|
grad_idx = 0
|
|
|
|
it = np.nditer(all_us, flags=['multi_index'], op_flags=['readwrite'])
|
|
while not it.finished:
|
|
# get index
|
|
idx = it.multi_index
|
|
# save and return
|
|
tmp_val = all_us[idx]
|
|
|
|
# 差分を取る
|
|
# 上側の差分
|
|
all_us[idx] = float(tmp_val) + h
|
|
us = all_us[:, :input_size]
|
|
x_1s, x_2s, lam_1s, lam_2s = forward_prop(us) # forward
|
|
fuh1 = grad_f(x_1s, x_2s, lam_1s, lam_2s, all_us)
|
|
|
|
# 下側の差分
|
|
all_us[idx] = float(tmp_val) - h
|
|
us = all_us[:, :input_size]
|
|
x_1s, x_2s, lam_1s, lam_2s = forward_prop(us) # forward
|
|
fuh2 = grad_f(x_1s, x_2s, lam_1s, lam_2s, all_us)
|
|
|
|
grad[:, grad_idx] = ((fuh1 - fuh2) / (2.*h)).flatten() # to flat でgradに代入できるように
|
|
|
|
all_us[idx] = tmp_val
|
|
it.iternext()
|
|
grad_idx += 1
|
|
|
|
return np.array(grad)
|
|
|
|
class NMPCControllerWithNewton():
|
|
"""
|
|
Attributes
|
|
------------
|
|
N : int
|
|
predicte step, discritize value
|
|
threshold : float
|
|
newton's threshold value
|
|
NUM_INPUT : int
|
|
system input length, this should include dummy u and constraint variables
|
|
MAX_ITERATION : int
|
|
decide by the solved matrix size
|
|
simulator : NMPCSimulatorSystem class
|
|
us : list of float
|
|
estimated optimal system input
|
|
dummy_us : list of float
|
|
estimated dummy input
|
|
raws : list of float
|
|
estimated constraint variable
|
|
history_u : list of float
|
|
time history of actual system input
|
|
history_dummy_u : list of float
|
|
time history of actual dummy u
|
|
history_raw : list of float
|
|
time history of actual raw
|
|
history_f : list of float
|
|
time history of error of optimal
|
|
"""
|
|
def __init__(self):
|
|
"""
|
|
Parameters
|
|
-----------
|
|
None
|
|
"""
|
|
# parameters
|
|
self.N = 10 # time step
|
|
self.threshold = 0.0001 # break
|
|
|
|
self.NUM_ALL_INPUT = 3 # u with dummy, and 制約条件に対するrawにも合わせた入力の数
|
|
self.NUM_INPUT = 1 # u with dummy, and 制約条件に対するrawにも合わせた入力の数
|
|
self.Jacobian_size = self.NUM_ALL_INPUT * self.N
|
|
|
|
# newton parameters
|
|
self.MAX_ITERATION = 100
|
|
|
|
# simulator
|
|
self.simulator = NMPCSimulatorSystem()
|
|
|
|
# initial
|
|
self.us = np.zeros((self.N, self.NUM_INPUT))
|
|
self.dummy_us = np.ones((self.N, self.NUM_INPUT)) * 0.25
|
|
self.raws = np.ones((self.N, self.NUM_INPUT)) * 0.01
|
|
|
|
# for fig
|
|
self.history_u = []
|
|
self.history_dummy_u = []
|
|
self.history_raw = []
|
|
self.history_f = []
|
|
|
|
def calc_input(self, x_1, x_2, time):
|
|
"""
|
|
Parameters
|
|
------------
|
|
x_1 : float
|
|
current state
|
|
x_2 : float
|
|
current state
|
|
time : float in seconds
|
|
now time
|
|
Returns
|
|
--------
|
|
us : list of float
|
|
estimated optimal system input
|
|
"""
|
|
# calculating sampling time
|
|
dt = 0.01
|
|
|
|
# concat all us, shape (pred_len, input_size)
|
|
all_us = np.hstack((self.us, self.dummy_us, self.raws))
|
|
|
|
# Newton method
|
|
for i in range(self.MAX_ITERATION):
|
|
# calc all state
|
|
x_1s, x_2s, lam_1s, lam_2s = self.simulator.calc_predict_and_adjoint_state(x_1, x_2, self.us, self.N, dt)
|
|
|
|
# F
|
|
F_hat = self._calc_f(x_1s, x_2s, lam_1s, lam_2s, all_us, self.N, dt)
|
|
|
|
# judge
|
|
if np.linalg.norm(F_hat) < self.threshold:
|
|
# print("break!!")
|
|
break
|
|
|
|
grad_f = lambda x_1s, x_2s, lam_1s, lam_2s, all_us : self._calc_f(x_1s, x_2s, lam_1s, lam_2s, all_us, self.N, dt)
|
|
forward_prop_f = lambda us : self.simulator.calc_predict_and_adjoint_state(x_1, x_2, us, self.N, dt)
|
|
grads = calc_numerical_gradient(forward_prop_f, grad_f, all_us, (self.Jacobian_size, self.Jacobian_size), self.NUM_INPUT)
|
|
|
|
# make jacobian and calc inverse of it
|
|
# grads += np.eye(self.Jacobian_size) * 1e-8
|
|
try:
|
|
all_us = all_us.reshape(-1, 1) - np.dot(np.linalg.inv(grads), F_hat.reshape(-1, 1))
|
|
except np.linalg.LinAlgError:
|
|
print("Warning : singular matrix!!")
|
|
grads += np.eye(self.Jacobian_size) * 1e-10 # add noise
|
|
all_us = all_us.reshape(-1, 1) - np.dot(np.linalg.inv(grads), F_hat.reshape(-1, 1))
|
|
|
|
all_us = all_us.reshape(self.N, self.NUM_ALL_INPUT)
|
|
|
|
# update
|
|
self.us = all_us[:, :self.NUM_INPUT]
|
|
self.dummy_us = all_us[:, self.NUM_INPUT:2*self.NUM_INPUT]
|
|
self.raws = all_us[:, 2*self.NUM_INPUT:]
|
|
|
|
# final insert
|
|
self.us = all_us[:, :self.NUM_INPUT]
|
|
self.dummy_us = all_us[:, self.NUM_INPUT:2*self.NUM_INPUT]
|
|
self.raws = all_us[:, 2*self.NUM_INPUT:]
|
|
|
|
x_1s, x_2s, lam_1s, lam_2s = self.simulator.calc_predict_and_adjoint_state(x_1, x_2, self.us, self.N, dt)
|
|
|
|
F = self._calc_f(x_1s, x_2s, lam_1s, lam_2s, all_us, self.N, dt)
|
|
|
|
# for save
|
|
self.history_f.append(np.linalg.norm(F))
|
|
self.history_u.append(self.us[0])
|
|
self.history_dummy_u.append(self.dummy_us[0])
|
|
self.history_raw.append(self.raws[0])
|
|
|
|
return self.us
|
|
|
|
def _calc_f(self, x_1s, x_2s, lam_1s, lam_2s, all_us, N, dt):
|
|
"""
|
|
Parameters
|
|
------------
|
|
x_1s : list of float
|
|
predicted x_1s for N steps
|
|
x_2s : list of float
|
|
predicted x_2s for N steps
|
|
lam_1s : list of float
|
|
adjoint state of x_1s, lam_1s for N steps
|
|
lam_2s : list of float
|
|
adjoint state of x_2s, lam_2s for N steps
|
|
us : list of float
|
|
estimated optimal system input
|
|
dummy_us : list of float
|
|
estimated dummy input
|
|
raws : list of float
|
|
estimated constraint variable
|
|
N : int
|
|
predict time step
|
|
dt : float
|
|
sampling time of system
|
|
"""
|
|
F = np.zeros((N, self.NUM_INPUT*3))
|
|
|
|
us = all_us[:, :self.NUM_INPUT].flatten()
|
|
dummy_us = all_us[:, self.NUM_INPUT:2*self.NUM_INPUT].flatten()
|
|
raws = all_us[:, 2*self.NUM_INPUT:].flatten()
|
|
|
|
for i in range(N):
|
|
F_u = 0.5 * us[i] + lam_2s[i] + 2. * raws[i] * us[i]
|
|
F_dummy = -0.01 + 2. * raws[i] * dummy_us[i]
|
|
F_raw = us[i]**2 + dummy_us[i]**2 - 0.5**2
|
|
|
|
F[i] = np.array([F_u, F_dummy, F_raw])
|
|
|
|
return np.array(F)
|
|
|
|
def main():
|
|
# simulation time
|
|
dt = 0.01
|
|
iteration_time = 20.
|
|
iteration_num = int(iteration_time/dt)
|
|
|
|
# plant
|
|
plant_system = SampleSystem(init_x_1=2., init_x_2=0.)
|
|
|
|
# controller
|
|
controller = NMPCControllerWithNewton()
|
|
|
|
# for i in range(iteration_num)
|
|
for i in range(1, iteration_num):
|
|
print("iteration = {}".format(i))
|
|
time = float(i) * dt
|
|
x_1 = plant_system.x_1
|
|
x_2 = plant_system.x_2
|
|
# make input
|
|
us = controller.calc_input(x_1, x_2, time)
|
|
# update state
|
|
plant_system.update_state(us[0])
|
|
|
|
# figure
|
|
fig = plt.figure()
|
|
|
|
x_1_fig = fig.add_subplot(321)
|
|
x_2_fig = fig.add_subplot(322)
|
|
u_fig = fig.add_subplot(323)
|
|
dummy_fig = fig.add_subplot(324)
|
|
raw_fig = fig.add_subplot(325)
|
|
f_fig = fig.add_subplot(326)
|
|
|
|
x_1_fig.plot(np.arange(iteration_num)*dt, plant_system.history_x_1)
|
|
x_1_fig.set_xlabel("time [s]")
|
|
x_1_fig.set_ylabel("x_1")
|
|
|
|
x_2_fig.plot(np.arange(iteration_num)*dt, plant_system.history_x_2)
|
|
x_2_fig.set_xlabel("time [s]")
|
|
x_2_fig.set_ylabel("x_2")
|
|
|
|
u_fig.plot(np.arange(iteration_num - 1)*dt, controller.history_u)
|
|
u_fig.set_xlabel("time [s]")
|
|
u_fig.set_ylabel("u")
|
|
|
|
dummy_fig.plot(np.arange(iteration_num - 1)*dt, controller.history_dummy_u)
|
|
dummy_fig.set_xlabel("time [s]")
|
|
dummy_fig.set_ylabel("dummy u")
|
|
|
|
raw_fig.plot(np.arange(iteration_num - 1)*dt, controller.history_raw)
|
|
raw_fig.set_xlabel("time [s]")
|
|
raw_fig.set_ylabel("raw")
|
|
|
|
f_fig.plot(np.arange(iteration_num - 1)*dt, controller.history_f)
|
|
f_fig.set_xlabel("time [s]")
|
|
f_fig.set_ylabel("optimal error")
|
|
|
|
fig.tight_layout()
|
|
|
|
plt.show()
|
|
|
|
|
|
if __name__ == "__main__":
|
|
main() |