Download the files for this lab from the following links:
After downloading the lab, uncompress it and move it to the ese2000-dynamical-systems folder
Lab 8B: Closed Loop Control¶
1 Model Predictive Control (MPC)¶
In Lab 8A we tried to control a dynamical system with model based optimization. This strategy failed because of the mismatch between the model and the true system.
In model based optimization we find control actions $\hat{u}(t)$ that are optimal for the model of the true dynamical system. This is represented by the bottom part of the diagram where we show that the controller is optimizing with respect to model trajectories $\hat{x}(t)$. Once we find actions that are optimal for the model, we turn around and implement them in the true model. This is shown in the top part of the diagram where the control input of the true dynamical system is $u(t) = \hat{u}^*(t)$.
If we initialize the model and the true system with the same initial conditions, the trajectory $\hat{x}(t)$ of the model and the trajectory $x(t)$ of the true system are initially close. The first few control actions $\hat{u}^*(t)$ are therefore good control actions for the true system. Eventually, however, the trajectory $\hat{x}(t)$ of the model and the trajectory $ x(t)$ of the true system deviate. At this point actions $\hat{u}^*(t)$ that are chosen because they are optimal for the model are far from good for the true system.
This explanation suggests a solution. What we need to do is reinitialize the model every so often. This is called model predictive control (MPC).
In MPC we define a window of length $W$ and solve the optimization problem,
\begin{align} \hat{u}^*(t:t+W) ~=~ & \text{argmin}_{\hat{u}(s)} ~ \frac{1}{W} \sum_{~s=t+1~}^{~t+W~} \ell\, \Big( \, x_{R}(s) , \, \hat{x}(s) \, \Big) , \nonumber \\ & \text{ with } \quad ~ \hat{x}(s+1) = A \hat{x}(s) + B \hat{u}(s), \phantom{\Big(}\nonumber \\ & \phantom{\text{with}} ~ \hat{x}(t)= x(t) .\phantom{\Big(} \end{align}This is the same model based optimization problem we encountered in Lab 8A, except for two modifications: (i) The initial condition is $\hat{x}(t)= x(t)$. (ii) The merit of the trajectory is evaluated between times $s=t+1$ and $s=t+W$. These are not significant differences. Modification (ii) is just a practical consideration to reduce the complexity of the optimization problem. This is achieved by reducing the time horizon from $T$ to $W$. Modification (i) is just a shift in time. We initialize the model with the state at time $t$ instead of initializing the model with the state at time $0$
The significant difference is that we now extract the control action $ u^*(t)$ and execute this action in the true dynamical system. The remaining actions $ u^*(t+1:t+W)$ that are also part of the solution are discarded. We then proceed to advance time to $t+1$ and solve the model based optimization with initial condition $\hat{x}(t+1)= x(t+1)$.
The MPC controller is what we call a closed loop control system. We observe the state $ x(t)$ at time $t$ and use this information to select the control action $ u(t)=\hat{u}^*(t)$. The effect of this control action is to move the true system into state $ x(t+1)$. The terminology of closing the loop refers to the fact that we now observe the state $ x(t+1)$ and use this observation to select the control action $ u(t+1)=\hat{u}^*(t+1)$. This is different from the open loop control strategy in which we observe the initial condition $ x(0)= x_0$ but never again observe the state of the true system.
# This is the same as in Lab 8A
!git clone https://github.com/Damowerko/ese2000-dynamical-systems.git
import sys
sys.path.append('./ese2000-dynamical-systems/')
fatal: destination path 'ese2000-dynamical-systems' already exists and is not an empty directory.
from pathlib import Path
import matplotlib.style
import matplotlib.pyplot as plt
import numpy as np
import numpy.typing as npt
import torch
from torch.utils.data import TensorDataset, DataLoader
torch.set_float32_matmul_precision("medium")
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
from ese2000_dynamical.config import Config
from ese2000_dynamical.track import load_track, Track
from ese2000_dynamical.simulator import Simulator
# matplotlib settings
matplotlib.style.use("seaborn-v0_8-colorblind")
plt.rcParams["figure.dpi"] = 150
data_path = Path("./ese2000-dynamical-systems/data")
figure_path = Path("figures")
Load data and functions from Lab 8A¶
# Select an example expert trajectory for the sake of plotting
expert_idx = 5
# Load the dataset and the track
t = np.arange(0, Config.duration, Config.time_step)
x_experts = np.load(data_path / "states.npy")
u_experts = np.load(data_path / "inputs.npy")
track = load_track(data_path / "track.npz")
# Load the pre-trained model from Lab 8A
# We've given you a parameterization but you can also save the model from your previous lab and load it here.
A_learnt = np.load("A.npy")
B_learnt = np.load("B.npy")
def plot_vs_expert(x, x_label: str, x_expert, track: Track):
"""
Plot a given trajectory and expert trajectory on the same plot.
Args:
x: The trajectory to plot.
x_label: The label for the trajectory.
x_expert: The expert trajectory.
track: The track to plot.
"""
plt.figure()
track.plot()
plt.grid(True)
plt.xlabel("x (m)")
plt.ylabel("y (m)")
plt.plot(x_expert[:, 0], x_expert[:, 1], "--", label="Expert")
plt.plot(x[:, 0], x[:, 1], "-", label=x_label)
plt.legend(loc="upper right", framealpha=1.0)
def rollout(A, B, u, x0):
"""
Integrate the system dynamics to predict the trajectory.
Args:
A: (4, 4) The state transition matrix.
B: (4, 2) The input matrix.
u: (T, 2) The control inputs.
x0: (4,) The initial state.
"""
x = torch.zeros((len(u) + 1, A.shape[0]), device=A.device)
x[0] = x0
for i in range(len(u)):
x[i + 1] = x[i] @ A.T + u[i] @ B.T
return x[1:]
Task 1¶
Leverage the model based optimization code of Lab 8A to run MPC control on the car simulator. Implementing MPC requires that you implement the following steps at all times $t$,
Observe the current state $ x(t)$ of the true system.
Initialize the model system with $\hat{x}(t)= x(t)$.
Solve the MPC problem. The outcome is a sequence $ u^*(t:t+W)$.
Implement control action $ u(t)=\hat{u}^*(t)$ in the true system.
Step (S3) is where we compute the control action $ u(t)=\hat{u}^*(t)$. We actually compute a sequence of control actions $ u^*(t:t+W)$ but we discard all entries of the sequence but the first. Steps (S4) implements the control action in the true system. Steps (S1) and (S2) close the control loop by observing the current state of the true system and feeding it as an input to the model system.
Plot the reference trajectory $ x_{R}(t)$ and the trajectory $ x(t)$ generated by implementation of the MPC controller. Slow down the plot to create a movie of the controlled car following the reference trajectory.
Evaluate and plot the loss between the generated trajectory and the reference trajectory.
def calculate_reference(
state: npt.NDArray[np.floating],
x_expert: npt.NDArray[np.floating],
horizon: int,
):
"""
Calculate the reference trajectory based on the expert trajectory and time horizon.
Args:
state: (4,) array with current state (x, y, vx, vy)
x_expert: (T, 4) array with expert trajectory
horizon: int with the time horizon
Returns:
x_ref: (horizon, 4) array with reference trajectory
"""
# Find the nearest point in the expert trajectory based on current position
nearest_idx = np.argmin(np.linalg.norm(x_expert[:, :2] - state[:2], axis=1))
# Get the windowed reference trajectory based on the nearest point
ref_idx = np.arange(nearest_idx + 1, nearest_idx + horizon + 1) return x_expert[ref_idx]
def mpc_controller(
state: npt.NDArray[np.floating],
state_matrix: npt.NDArray[np.floating],
input_matrix: npt.NDArray[np.floating],
x_ref: npt.NDArray[np.floating],
):
"""MPC controller.
Args:
x: (4,) array with current state
A: (4,4) array with state matrix
B: (4,2) array with input matrix
x_ref: (horizon,4) array with reference trajectory
Returns:
u: (M,) array with control input
"""
# We learnt the A and B matrices in Lab 8A
# Now we keep them fixed and learn actions, u
A = torch.from_numpy(state_matrix).to(device)
B = torch.from_numpy(input_matrix).to(device)
x_ref_tensor = torch.tensor(x_ref, device=device).float()
# Initialize x to the reference trajectory
x0 = torch.tensor(state, device=device).float()
# Initialize u to zero
# Notice that u is going to be learned
# u is also sized to match the input matrix and the horizon length
u = (
torch.zeros(len(x_ref), input_matrix.shape[1], device=device)
.float()
.requires_grad_(True)
)
# Use Adam optimizer to learn u
optimizer = torch.optim.Adam([u], lr=1.0)
loss_mpc = 0
for _ in range(50):
optimizer.zero_grad()
# Rollout the system (predict the trajectory)
x = rollout(A, B, u, x0)
# Quadratic loss
loss = (x - x_ref_tensor).pow(2).mean()
# Backpropagation
loss.backward()
# Loss for logging
loss_mpc += loss.item()
# Update u
optimizer.step()
# Return the control input and the loss
# Notice that we detach u from the computational graph
return u.detach().cpu().numpy(), loss_mpc/50
# Simulate the trajectory
simulator = Simulator()
x_expert = x_experts[expert_idx]
# Define parameters and initialize tensors
n_laps = 3
t = np.arange(0, Config.duration * n_laps, Config.time_step)
x_mpc = np.zeros((len(t), A_learnt.shape[0]))
# use the same starting point as the expert
x_mpc[0] = x_expert[0]
loss_evolution = []
for i in range(len(t) - 1):
# Compute reference
x_ref = calculate_reference(x_mpc[i], x_expert, horizon=10)
# Find best action u
u_mpc, loss = mpc_controller(x_mpc[i], A_learnt, B_learnt, x_ref)
loss_evolution.append(loss)
# Simulate next state
x_mpc[i + 1] = simulator.step(x_mpc[i], u_mpc[0])
plot_vs_expert(x_mpc, "MPC", x_expert, track)
# Plot the loss evolution
plt.figure()
plt.plot(loss_evolution)
plt.xlabel("Step")
plt.ylabel("MSE")
plt.title(f"MPC Loss Along the Trajectory (Laps = {n_laps})")
plt.grid(True)
plt.show()
from matplotlib.animation import FuncAnimation
from IPython.core.display import HTML
# Get the color palette
prop_cycle = plt.rcParams["axes.prop_cycle"]
colors = prop_cycle.by_key()["color"]
# For visualization purposes, we will plot the last 10 steps
trail_length = 10
# Plot the track and the expert trajectory
plt.figure()
track.plot(plt.gca())
plt.xlabel("x (m)")
plt.ylabel("y (m)")
plt.plot(x_expert[:, 0], x_expert[:, 1], "--", label="Expert")
trail = plt.plot([], [], "-", c=colors[1], label="MPC")[0]
current_position = plt.plot([], [], "o", c=colors[1])[0]
plt.legend(loc="upper right", framealpha=1.0)
# Animate our model's trajectory
def animate(i: int):
trail_idx = slice(max(i - trail_length, 0), i + 1)
trail.set_data(x_mpc[trail_idx, 0], x_mpc[trail_idx, 1])
current_position.set_data([x_mpc[i, 0]], [x_mpc[i, 1]])
return trail, current_position
# HTML5 video
ani = FuncAnimation(
plt.gcf(), animate, frames=len(t), interval=int(1000 * Config.time_step), blit=True
)
display(HTML(ani.to_html5_video()))
plt.close()
The trajectory generated by the MPC controller follows the reference trajectory with small loss. This is because closing the loop precludes the accumulation of errors.
It is interesting to think of the model part of the controller as a simulation of the true dynamical system. We use this simulation to evaluate the merit of different control actions as a way of deciding which control action to execute. This is a very human way of thinking. We use models to simulate the effects of different courses of action. We then execute the action that our simulations deem best. Because simulations and reality are in the end different, we observe the outcome of our actions and repeat the simulation to select a new course of action.
2 Imitation Learning¶
The MPC controller has a high computation cost as it requires the solution of an optimization problem in each time step. This is a challenge because the time between time steps can be short. In the car driving scenario we are considering, the sampling time is $T_s=50ms$.
We can avoid this computation cost using learning. One approach is to consider the MPC solutions and train a learning parametrization that maps the initialization $\hat{x}(t)= x(t)$ into the control action $ u(t)=\hat{u}^*(t)$. We do that by introducing a learning parametrization $\pi( x(t);\theta)$ and defining the optimization problem,
\begin{align} \theta^* ~=~ & \text{argmin}_{\theta} ~ \frac{1}{T} \sum_{t=1}^{T} \ell\, \Big( \, \hat{u}^*(t),\, \pi\big( x(t);\theta\big) \, \Big) . \end{align}In this problem $ x(t)$ is the initialization of the MPC controller and $\hat{u}^*(t)$ is the optimal control action produced as the corresponding solution of the optimization problem.
We say that this is an imitation problem because we are training a policy $\pi( x(t);\theta)$ that imitates the actions of the MPC controller. The advantage of training this imitation policy is that we can train it offline, as we always do. At execution time we just evaluate the policy $\pi( x(t);\theta^*)$ and execute the corresponding control action.
We can consider the observed states $ x_q(t)$ and undertake
\begin{align} \theta^* ~=~ & \text{argmin}_{\theta} ~ \frac{1}{Q} \sum_{q=1}^{Q} ~ \frac{1}{T} \sum_{t=1}^{T} \ell\, \Big( \, \hat{u}_q^*(t),\, \pi\big( x_q(t);\theta\big) \, \Big), \end{align}in which $\hat{u}_q^*(t)$ is the solution of the MPC control optimization problem when the initial condition is $\hat{x}(t)= x_q(t)$.
A third possibility is to dither the expert trajectories. That is, for each state $ x_q(t)$ we generate $D$ states $\tilde{x}_{qd}(t)$ by adding a random perturbation $ w_{qd}(t)$,
\begin{align} \tilde{x}_{qd}(t) = x_q(t) + w_{qd}(t). \end{align}We then reformulate by considering all trajectories, all points in time, and all dithered states. This leads to the optimization problem,
\begin{align} \theta^* ~=~ & \text{argmin}_{\theta} ~ \frac{1}{Q} \sum_{q=1}^{Q} ~ \frac{1}{T} \sum_{t=1}^{T} ~ \frac{1}{D} \sum_{d=1}^{D} \ell\, \Big( \, \hat{u}_q^*(t),\, \pi\big(\tilde{x}_{qd}(t); \theta\big) \, \Big), \end{align}in which $\hat{u}_q^*(t)$ is reinterpreted as the solution of the MPC control optimization problem when the initial condition is $ x(t)=\tilde{x}_{qd}(t)$. Adding dithering to the training set improves the robustness of the learned controller.
The three optimization problems have the same objective but take averages over different datasets. Thus, their relative merits depend on which of the two set of input states is more representative of the states that are to be expected at execution time. The possible mismatch between the states that we choose for optimizing $\theta$ and the states that are actually seen at execution time is a significant challenge when learning to imitate control policies of dynamical systems.
Task 2¶
Learn a policy that imitates the MPC controller. You can choose any parametrization you like, but to keep matters simple we recommend that you use a linear parameterization,
\begin{align} \pi\big( x(t);\theta\big) = \Theta ( x(t)- x_{R}(t)) \end{align}You can also use any loss you want. If you use a linear parameterization, a quadratic loss is a good idea.Train the imitation policy. Consider $D=1$ dithered state per $ x_{qd}(t)$ and use white Gaussian noise $ w_{qd}(t)$ with mean $\mathbb{E}[ w_{qd}(t)]=0$ and variance $\mathbb{E}[ w^2_{qd}(t)]=0.09$.
Evaluate the control policy $\pi( x(t);\theta^*)$ on the car simulator. Plot the reference trajectory $ x_R(t)$ and the trajectory $ x(t)$ generated by the imitation policy. Evaluate and plot the loss between the generated trajectory and the reference trajectory.
# First, we need to generate a dataset for training a new model that imitates the MPC controller we just implemented.
# Remember, we're doing this because the MPC controller is computationally expensive and we want to learn a faster model.
rng = np.random.default_rng()
std_noise = 0.3
# Generate the normal noise with mean 0 and standard deviation 0.3
means = np.zeros_like(x_expert)
noise = rng.normal(means, scale=std_noise)
# Generate perturbed initial states (Dithering)
x0_imitation = x_expert + noise
# Initialize reference and action tensors
x_ref_imitation = np.zeros_like(x_expert)
u_imitation = np.zeros((len(x_expert), B_learnt.shape[1]))
for i in range(len(x0_imitation)):
# Compute reference
x_ref = calculate_reference(x0_imitation[i], x_expert, horizon=10)
# Compute MPC action
u_mpc, _ = mpc_controller(x0_imitation[i], A_learnt, B_learnt, x_ref)
# Save imitation learning point
x_ref_imitation[i] = x_ref[0]
u_imitation[i] = u_mpc[0]
# PyTorch tensors for training
error_train = (
torch.from_numpy(x0_imitation - x_ref_imitation).to(device).float().reshape(-1, 4)
)
u_train = torch.from_numpy(u_imitation).to(device).float().reshape(-1, 2)
imitation_dataset = TensorDataset(error_train, u_train)
# Let's train a model to imitate the MPC controller. We will use a simple linear model for this task.
# The inputs are the error between the current state and the reference state.
# The outputs are the control inputs to our car, u.
# Training parameters
lr = 0.1
n_epochs = 100
batch_size = 32
# Intitialize theta (our parameterization)
# Notice, we are going to learn theta
theta_tensor = torch.randn(2, 4, device=device, requires_grad=True)
optimizer = torch.optim.Adam([theta_tensor], lr=lr)
# Create the dataloader
imitation_loader = DataLoader(imitation_dataset, batch_size=batch_size, shuffle=True)
epoch_loss = np.zeros(n_epochs)
print_every = 10
# Full passes over the dataset
for e in range(n_epochs):
for error, u in imitation_loader:
optimizer.zero_grad()
# Predict action u
u_hat = error @ theta_tensor.T
# Evaluate quadratic loss
loss = (u - u_hat).pow(2).mean()
# Compute gradients
loss.backward()
# Update parameters
optimizer.step()
epoch_loss[e] += loss.item()
# Print progress
if eprint(f" Epoch: {e} \tLoss: {epoch_loss[e]/len(imitation_loader):.2f}")
epoch_loss /= len(imitation_loader)
Epoch: 0 Loss: 139.66 Epoch: 10 Loss: 53.04 Epoch: 20 Loss: 32.82 Epoch: 30 Loss: 25.95 Epoch: 40 Loss: 25.56 Epoch: 50 Loss: 24.72 Epoch: 60 Loss: 24.12 Epoch: 70 Loss: 27.60 Epoch: 80 Loss: 23.97 Epoch: 90 Loss: 26.14
plt.plot(epoch_loss)
plt.xlabel("Epoch")
plt.ylabel("MSE")
plt.title("Imitation Learning Loss Evolution")
plt.grid()
# Simulate the trajectory
n_laps = 1
simulator = Simulator()
t = np.arange(0, Config.duration * n_laps, Config.time_step)
x_imitation = np.zeros((len(t), A_learnt.shape[0]))
# Use the same starting point as the expert
x_imitation[0] = x_expert[0]
# Detaching our model from the computational graph
theta = theta_tensor.detach().cpu().numpy()
error = np.zeros(len(t) - 1)
with torch.no_grad():
for i in range(len(t) - 1):
x_ref = calculate_reference(x_imitation[i], x_expert, horizon=1)[0]
diff = x_imitation[i] - x_ref
error[i] = (diff[:2]**2).mean() + (diff[2:]**2).mean()
u_imitation = (diff) @ theta.T
x_imitation[i + 1] = simulator.step(x_imitation[i], u_imitation)
plot_vs_expert(x_imitation, "Imitation", x_expert, track)
# Plot the error evolution
print(error.shape)
plt.figure()
plt.plot(error)
plt.xlabel("Step")
plt.ylabel("MSE")
plt.title("Error Along the Trajectory")
plt.legend()
plt.grid(True)
plt.show()
(199,)
/var/folders/02/rwff5ttj1r9_5hnh3k8rm7tr0000gq/T/ipykernel_44968/2455660592.py:8: UserWarning: No artists with labels found to put in legend. Note that artists whose label start with an underscore are ignored when legend() is called with no argument. plt.legend()
3 Model Predictive Control Learning¶
A second alternative to learn a control policy is direct incorporation of the parameterization from section 1. To do that, we consider a fixed parameter $\theta$ and the policy $\pi(\mathbf{x};\theta)$ that is generated by this parameter. If we execute this policy on the system’s model we incur the cost
\begin{align} \ell_{\text{MPC}}\big(\mathbf{x}(t); \theta\big) = & \frac{1}{W} \sum_{s=t+1}^{t+W} \ell\, \big( \, \mathbf{x}(s) , \, \hat{\mathbf{x}}(s) \, \big) , \\ & \text{with} ~ \hat{\mathbf{x}}(s+1) = \mathbf{A} \hat{\mathbf{x}}(s) + \mathbf{B} \mathbf{u}(s), \\ & \phantom{\text{with}} ~ \hat{\mathbf{x}}(t) = \mathbf{x}(t) , \\ & \phantom{\text{with}} ~ \mathbf{u}(t) = \pi\big(\mathbf{x}(t);\theta\big). \end{align}As is has been true throughout this chapter, this is not the true cost incurred by the policy $\pi(\mathbf{x};\theta)$ in the actual system. To evaluate this cost we would need to propagate actions through the system function $f(x(t),u(t))$. This is impossible because we do not know this function. The first merit of this loss function is that it can be evaluated. Its second merit is that if model and reality are not that different, a policy that works well in the model works well in the true system.
The loss function defined above is different from loss functions that we have encountered before because it evaluates the merit of the policy $\pi(\mathbf{x};\theta)$ over a trajectory of $W$ time units. A strange loss function it may be, but a loss function nonetheless. We can then use it to propose the learning problem,
\begin{align} \theta^* ~=~ \arg\min_\theta \frac{1}{T} \sum_{t=1}^T ~ \ell_{\text{MPC}}\big(\mathbf{x}(t); \theta\big). \end{align}As is also the case of Section 2 this does not have to be the case. We can train by minimizing the MPC loss over any set of states $\mathbf{x}(t)$. In particular, if we use expert trajectories to define the optimal parameter $\theta^*$ then we have,
\begin{align} \theta^* ~=~ & \arg\min_{\theta} ~ \frac{1}{Q} \sum_{q=1}^{Q} ~ \frac{1}{T} \sum_{t=1}^{T} \ell_{\text{MPC}}\big(\mathbf{x}_q(t); \theta\big). \end{align}Observe that this formulation is akin to out imitation from expert trajectories. Except that instead of searching for a parameter $\theta$ that imitates the MPC policy $\hat{\mathbf{y}}_q^*(t)$, we search for a parameter $\theta$ that minimizes the MPC cost
As the reader may be expecting, the use of dithered trajectories is also justified
\begin{align} \theta^* ~=~ & \arg\min_{\theta} ~ \frac{1}{Q} \sum_{q=1}^{Q} ~ \frac{1}{T} \sum_{t=1}^{T} ~ \frac{1}{D} \sum_{d=1}^{D} \ell_{\text{MPC}}\big(\mathbf{x}_{qd}(t); \theta\big). \end{align}This formulation is akin to the holistic formulation seen earlier in section 2, but we seek to minimize the MPC cost instead of imitating the MPC policy $\hat{\mathbf{u}}_q^*(t)$. As in imitation learning, adding dithering to the training set improves the robustness of the learned controller.
Task 3¶
Learn a policy that minimizes the MPC loss defined. You can choose any parametrization you like, but to keep matters simple we recommend that you use a linear parameterization as in. You can train the imitation policy by solving any of the three learning problems. Whichever you use, explain your reasons for choosing it.
Evaluate the control policy $\pi(\mathbf{x}(t);\theta^*)$ on the car simulator. Plot the reference trajectory $\mathbf{x}_R(t)$ and the trajectory $\mathbf{x}(t)$ generated by the learned policy. Slow down the plot to create a movie of the controlled car following the reference trajectory.
Evaluate and plot the loss between the generated trajectory and the reference trajectory. \hfill$\blacksquare$
from torch.utils.data import TensorDataset, DataLoader
def GenerateDataset(x_expert: npt.NDArray[np.floating], W: int, std_noise: float = 0.3):
"""
Generate a dataset for imitation learning.
Args:
x_expert: (T, 4) array with expert trajectory
A: (4, 4) array with state matrix
B: (4, 2) array with input matrix
W: int with the time horizon
"""
# First, we need to generate a dataset for training a new model that imitates the MPC controller we just implemented.
# Remember, we're doing this because the MPC controller is computationally expensive and we want to learn a faster model.
rng = np.random.default_rng()
# Generate the normal noise with mean 0 and standard deviation 0.3
means = np.zeros_like(x_expert)
noise = rng.normal(means, scale=std_noise)
# Generate perturbed initial states (Dithering)
x0_imitation = x_expert + noise
# Initialize reference and action tensors
x_ref_imitation = np.zeros((len(x0_imitation), W, 4))
for i in range(len(x0_imitation)):
# Compute reference
x_ref = calculate_reference(x0_imitation[i], x_expert, horizon=W)
# Save imitation learning point
x_ref_imitation[i] = x_ref
x0_imitation_tensor = torch.from_numpy(x0_imitation).to(device).float().reshape(-1, 4)
x_ref_imitation_tensor = torch.from_numpy(x_ref_imitation).to(device).float().reshape(-1, W, 4)
return x0_imitation_tensor, x_ref_imitation_tensor
# Empty tensors which we can concatenate to
x0_imitation = torch.empty(0, 4)
x_ref_imitation = torch.empty(0, 10, 4)
W = 10
for xprt in x_experts:
x0, x_ref = GenerateDataset(xprt, W=W, std_noise=0.95)
x0_imitation = torch.cat((x0_imitation, x0), dim=0)
x_ref_imitation = torch.cat((x_ref_imitation, x_ref), dim=0)
task3_dataset = TensorDataset(x0_imitation, x_ref_imitation)
def MPCLoss(x_ref, x_0, A, B, theta, W=10):
"""
Compute the MPC loss.
Args:
x_ref: The reference trajectory.
x_hat: The predicted trajectory.
A: The state matrix.
B: The input matrix.
theta: The parameterization.
W: The time horizon.
Returns:
loss: The MPC loss.
"""
loss_mpc = 0
x_hat = torch.zeros_like(x_ref)
x_hat[:,0] = x_0
for s in range(W-1):
loss_mpc += (x_hat[:,s] - x_ref[:,s]).pow(2).mean()
u_hat = (x_hat[:,s] - x_ref[:,s]) @ theta.T
x_hat[:,s + 1] = x_hat[:,s] @ A.T + u_hat @ B.T
return loss_mpc / W
# Training parameters
lr = 0.1
n_epochs = 100
batch_size = 32
# Intitialize theta (our parameterization)
theta_task3 = torch.randn(2, 4, device=device, requires_grad=True)
optimizer = torch.optim.Adam([theta_task3], lr=lr)
# Create the dataloader
imitation_loader = DataLoader(task3_dataset, batch_size=batch_size, shuffle=True)
epoch_loss = np.zeros(n_epochs)
print_every = 10
A = torch.from_numpy(A_learnt).to(device)
B = torch.from_numpy(B_learnt).to(device)
# Full passes over the dataset
for e in range(n_epochs):
for x0_imi, x_ref_imi in imitation_loader:
optimizer.zero_grad()
loss = MPCLoss(x_ref_imi, x0_imi, A, B, theta_task3, W=W)
# Compute gradients
loss.backward()
# Update parameters
optimizer.step()
epoch_loss[e] += loss.item()
# Print progress
if eprint(f" Epoch: {e} \tLoss: {epoch_loss[e]/len(imitation_loader):.2f}")
epoch_loss /= len(imitation_loader)
Epoch: 0 Loss: 3.55 Epoch: 10 Loss: 0.68 Epoch: 20 Loss: 0.58 Epoch: 30 Loss: 0.55 Epoch: 40 Loss: 0.54 Epoch: 50 Loss: 0.53 Epoch: 60 Loss: 0.53 Epoch: 70 Loss: 0.53 Epoch: 80 Loss: 0.53 Epoch: 90 Loss: 0.53
# Simulate the trajectory
n_laps = 1
simulator = Simulator()
t = np.arange(0, Config.duration * n_laps, Config.time_step)
x_imitation = np.zeros((len(t), A_learnt.shape[0]))
# Use the same starting point as the expert
x_imitation[0] = x_expert[0]
# Detaching our model from the computational graph
theta = theta_task3.detach().cpu().numpy()
error = np.zeros(len(t) - 1)
with torch.no_grad():
for i in range(len(t) - 1):
x_ref = calculate_reference(x_imitation[i], x_expert, horizon=1)[0]
diff = x_imitation[i] - x_ref
error[i] = (diff[:2]**2).mean() + (diff[2:]**2).mean()
u_imitation = (diff) @ theta.T
x_imitation[i + 1] = simulator.step(x_imitation[i], u_imitation)
plot_vs_expert(x_imitation, "Imitation", x_expert, track)
# Plot the loss evolution
plt.plot(epoch_loss)
plt.xlabel("Epoch")
plt.ylabel("MPC Loss")
plt.title("Imitation Learning Loss Evolution")
plt.grid()
# Plot the trajectory error
plt.figure()
plt.plot(error)
plt.xlabel("Step")
plt.ylabel("MSE")
plt.title("Error Along the Trajectory")
plt.legend()
plt.grid(True)
plt.show()
/var/folders/02/rwff5ttj1r9_5hnh3k8rm7tr0000gq/T/ipykernel_44968/3110119893.py:7: UserWarning: No artists with labels found to put in legend. Note that artists whose label start with an underscore are ignored when legend() is called with no argument. plt.legend()