Add runge kutta
This commit is contained in:
parent
969fee7e73
commit
f49ed382a4
|
@ -41,4 +41,60 @@ def fit_angle_in_range(angles, min_angle=-np.pi, max_angle=np.pi):
|
||||||
output += min_angle
|
output += min_angle
|
||||||
|
|
||||||
output = np.minimum(max_angle, np.maximum(min_angle, output))
|
output = np.minimum(max_angle, np.maximum(min_angle, output))
|
||||||
return output.reshape(output_shape)
|
return output.reshape(output_shape)
|
||||||
|
|
||||||
|
def update_state_with_Runge_Kutta(state, u, functions, dt=0.01):
|
||||||
|
""" update state in Runge Kutta methods
|
||||||
|
Args:
|
||||||
|
state (array-like): state of system
|
||||||
|
u (array-like): input of system
|
||||||
|
functions (list): update function of each state,
|
||||||
|
each function will be called like func(*state, *u)
|
||||||
|
We expect that this function returns differential of each state
|
||||||
|
dt (float): float in seconds
|
||||||
|
|
||||||
|
Returns:
|
||||||
|
next_state (np.array): next state of system
|
||||||
|
|
||||||
|
Notes:
|
||||||
|
sample of function is as follows:
|
||||||
|
|
||||||
|
def func_x(self, x_1, x_2, u):
|
||||||
|
x_dot = (1. - x_1**2 - x_2**2) * x_2 - x_1 + u
|
||||||
|
return x_dot
|
||||||
|
|
||||||
|
Note that the function return x_dot.
|
||||||
|
"""
|
||||||
|
state_size = len(state)
|
||||||
|
assert state_size == len(functions), \
|
||||||
|
"Invalid functions length, You need to give the state size functions"
|
||||||
|
|
||||||
|
k0 = np.zeros(state_size)
|
||||||
|
k1 = np.zeros(state_size)
|
||||||
|
k2 = np.zeros(state_size)
|
||||||
|
k3 = np.zeros(state_size)
|
||||||
|
|
||||||
|
inputs = np.concatenate([state, u])
|
||||||
|
|
||||||
|
for i, func in enumerate(functions):
|
||||||
|
k0[i] = dt * func(*inputs)
|
||||||
|
|
||||||
|
add_state = state + k0 / 2.
|
||||||
|
inputs = np.concatenate([add_state, u])
|
||||||
|
|
||||||
|
for i, func in enumerate(functions):
|
||||||
|
k1[i] = dt * func(*inputs)
|
||||||
|
|
||||||
|
add_state = state + k1 / 2.
|
||||||
|
inputs = np.concatenate([add_state, u])
|
||||||
|
|
||||||
|
for i, func in enumerate(functions):
|
||||||
|
k2[i] = dt * func(*inputs)
|
||||||
|
|
||||||
|
add_state = state + k2
|
||||||
|
inputs = np.concatenate([add_state, u])
|
||||||
|
|
||||||
|
for i, func in enumerate(functions):
|
||||||
|
k3[i] = dt * func(*inputs)
|
||||||
|
|
||||||
|
return (k0 + 2. * k1 + 2. * k2 + k3) / 6.
|
|
@ -0,0 +1,98 @@
|
||||||
|
import numpy as np
|
||||||
|
import scipy
|
||||||
|
from scipy import integrate
|
||||||
|
from .env import Env
|
||||||
|
from ..common.utils import update_state_with_Runge_Kutta
|
||||||
|
|
||||||
|
class NonlinearSampleEnv(Env):
|
||||||
|
""" Nonlinear Sample Env
|
||||||
|
"""
|
||||||
|
def __init__(self):
|
||||||
|
"""
|
||||||
|
"""
|
||||||
|
self.config = {"state_size" : 2,\
|
||||||
|
"input_size" : 1,\
|
||||||
|
"dt" : 0.01,\
|
||||||
|
"max_step" : 250,\
|
||||||
|
"input_lower_bound": [-0.5],\
|
||||||
|
"input_upper_bound": [0.5],
|
||||||
|
}
|
||||||
|
|
||||||
|
super(NonlinearSampleEnv, self).__init__(self.config)
|
||||||
|
|
||||||
|
def reset(self, init_x=np.array([2., 0.])):
|
||||||
|
""" reset state
|
||||||
|
Returns:
|
||||||
|
init_x (numpy.ndarray): initial state, shape(state_size, )
|
||||||
|
info (dict): information
|
||||||
|
"""
|
||||||
|
self.step_count = 0
|
||||||
|
|
||||||
|
self.curr_x = np.zeros(self.config["state_size"])
|
||||||
|
|
||||||
|
if init_x is not None:
|
||||||
|
self.curr_x = init_x
|
||||||
|
|
||||||
|
# goal
|
||||||
|
self.g_x = np.array([0., 0.])
|
||||||
|
|
||||||
|
# clear memory
|
||||||
|
self.history_x = []
|
||||||
|
self.history_g_x = []
|
||||||
|
|
||||||
|
return self.curr_x, {"goal_state": self.g_x}
|
||||||
|
|
||||||
|
def step(self, u):
|
||||||
|
"""
|
||||||
|
Args:
|
||||||
|
u (numpy.ndarray) : input, shape(input_size, )
|
||||||
|
Returns:
|
||||||
|
next_x (numpy.ndarray): next state, shape(state_size, )
|
||||||
|
cost (float): costs
|
||||||
|
done (bool): end the simulation or not
|
||||||
|
info (dict): information
|
||||||
|
"""
|
||||||
|
# clip action
|
||||||
|
u = np.clip(u,
|
||||||
|
self.config["input_lower_bound"],
|
||||||
|
self.config["input_upper_bound"])
|
||||||
|
|
||||||
|
funtions = [self._func_x_1, self._func_x_2]
|
||||||
|
|
||||||
|
next_x = update_state_with_Runge_Kutta(self._curr_x, u,
|
||||||
|
functions, self.config["dt"])
|
||||||
|
|
||||||
|
# cost
|
||||||
|
cost = 0
|
||||||
|
cost = np.sum(u**2)
|
||||||
|
cost += np.sum((self.curr_x - self.g_x)**2)
|
||||||
|
|
||||||
|
# save history
|
||||||
|
self.history_x.append(next_x.flatten())
|
||||||
|
self.history_g_x.append(self.g_x.flatten())
|
||||||
|
|
||||||
|
# update
|
||||||
|
self.curr_x = next_x.flatten()
|
||||||
|
# update costs
|
||||||
|
self.step_count += 1
|
||||||
|
|
||||||
|
return next_x.flatten(), cost, \
|
||||||
|
self.step_count > self.config["max_step"], \
|
||||||
|
{"goal_state" : self.g_x}
|
||||||
|
|
||||||
|
def _func_x_1(self, x_1, x_2, u):
|
||||||
|
"""
|
||||||
|
"""
|
||||||
|
x_dot = x_2
|
||||||
|
return x_dot
|
||||||
|
|
||||||
|
def _func_x_2(self, x_1, x_2, u):
|
||||||
|
"""
|
||||||
|
"""
|
||||||
|
x_dot = (1. - x_1**2 - x_2**2) * x_2 - x_1 + u
|
||||||
|
return x_dot
|
||||||
|
|
||||||
|
def plot_func(self, to_plot, i=None, history_x=None, history_g_x=None):
|
||||||
|
"""
|
||||||
|
"""
|
||||||
|
raise ValueError("NonlinearSampleEnv does not have animation")
|
Loading…
Reference in New Issue