Download the Jupyter Notebook for this lab in the following link.
Download the data for the lab in the following link.
Download the pdf assignment for the lab in the following link.
Lab 2: Signals in Time: Audio Processing and Classification¶
Audio is mathematically modeled as a function $x(t)$ in which $t$ represents time and $x(t)$ is an electric signal that is generated by transforming air pressure waves with a microphone. The same pressure waves can be reconstructed from the electrical signal using a speaker. In this lab we want to use what we learned in Lab 1 to process audio. In particular, we want to clean up an audio signal (Labs 2A and 2B) and we want to recognize spoken words (Lab 2C).
Lab 2C: Classification¶
0. Environment setup¶
# To run this notebook on a local computer, go back to the Jupyter home page and click the upload
# button. A dialog box will appear. Locate the file data_lab_2A.pt and upload.
#
# To run on Google Colab, upload the data_lab_2A.pt file to a folder in your Google Drive and
# uncomment the next four lines of code. The code assumes the data_lab_2A.pt file is in a folder
# labeled "ese2000/Lab2A." Change the name to the proper folder if needed.
## import os
# from google.colab import drive
## Mount the drive. It prompts for an authorization.
# drive.mount('/content/drive')
## Specify the directory where the data is located. Change "ese2000/Lab2A" to your own folder name.
# folder_path = '/content/drive/My Drive/ese2000/Lab2A'
## Change the directory to this folder to perform operations within it
# os.chdir(folder_path)
import torch
import torch.nn as nn
import torch.nn.functional as F
import torch.optim as optim
from torch.utils.data import DataLoader, TensorDataset
import sys
import matplotlib.pyplot as plt
import IPython.display as ipd
import os
import numpy as np
from IPython.display import Audio,display
print('\n Done \n')
# Computations are getting intensive. The amount of samples that we are processing, the size of each
# individual sample and the complexity of the learning parameterization are all much larger than in
# Lab 1. The first line below sets up the notebook to use a GPU if it is available. The second line
# loads the data into the processing device. The GPU if available, the CPU otherwise. Loading the
# data creates a tensor dataA
device = torch.device('cuda:0') if torch.cuda.is_available() else torch.device("cpu")
# Use this line to run on MPS (MacOS Silicon)
#device = torch.device("mps")
print(f"Using device: {device}")
Done Using device: mps
1. Classification Losses¶
A classification is a partition of reality. Given realizations $x$ of some object we ascribe to each of them a unique label $y_c$ that marks them as members of class $c$. In a classification task we want to train an artificial intelligence that when presented with an input $x$ predicts a class label $\hat{y}$ that matches the label $y_c$ ascribed by the real world.
To model classification as empirical risk minimization (ERM) we just need to define a proper loss. The easiest is to define a hit loss that takes the value $\ell(y, \hat{y}) = 1$ when $\hat{y} \neq y$ and the value $\ell(y, \hat{y}) = 0$ when $\hat{y} = y$. To write this loss formally we define the indicator function $I ( y = y_c )$ which takes the values
$$\tag{3} I ( y = y_c ) = 1 \text{~when~} y = y_c, \quad \\ I ( y = y_c ) = 0 \text{~when~} y \neq y_c. $$
1.2 Cross Entropy Loss¶
The loss (3) is a reasonable choice but it is not differentiable with respect to the score variable $\hat{s}$. This is a drawback for implementing gradient descent. To solve this problem we define the soft maximum function
$$ \text{softmax}(\hat{s}) = \log \sum_{d=1}^{C} e^{\hat{s}_d}. $$
The soft maximum is a differentiable approximation of the maximum in the sense that $\text{softmax}(\hat{s}) \approx \max_d \hat{s}_d$. We can therefore replace the maximum $\max_d \hat{s}_d$ in \eqref{eqn_max_loss} by the soft maximum function to define the loss
$$ \ell(y, \hat{s}) = \sum_{c=1}^{C} I [ y = y_c ] \times \Big[ \log \sum_{d=1}^{C} e^{\hat{s}_d} – \hat{s}_c \Big]. $$
This is the cross entropy loss. Its use in classification tasks is standard.
2. Readout Layers¶
In Lab 1B we processed audio with a convolutional neural network (CNN). Our CNN was such that if the input signal contains $N$ entries, the output signal contains $N$ entries as well. This is unsuitable for a classification task in which we want the output to be a vector of scores $s$ containing as many entries as classes.
To sort this out we add a readout layer to match the dimensionalities of the input and output. A readout layer is just a fully connected layer at the output. Instead of a convolutional filter, the last layer is the fully connected layer,
$$ \begin{equation} \Phi(x; \mathcal{H}) ~=~ x_L ~=~ \sigma \Big(\, A_L x_{L-1} \,\Big) . \end{equation} $$ (eqn_readout)
If we have input signals $x=x_0$ of dimension $N$ and we have $C$ classes, the matrix $A_L$ is of dimension $C \times N$. This results in an output $\Phi(x; \mathcal{H}) = x_L$ containing $C$ entries that we can equate to the predicted score $s$.
In (1) we consider a CNN with layers that have a single channel. In practice, we have already seen that we have to use CNNs with multiple channels. This is not a problem but it requires that we reshape the matrix $x_{L-1}$ to turn it into a vector. We do that by stacking the columns of $x_{L-1}$ on top of each other,
$$ \text{vec}(x_{L-1}) = \Big[\, x_{L-1}^1;\, x_{L-1}^2;\, \ldots;\, x_{L-1}^{F_{l-1}} \,\Big] . $$ (2)
This operation has no conceptual significance. It is just a reshaping of data. We need to do this to write the readout layer as the matrix multiplication with $x_{L-1} = \text{vec}(x_{L-1})$.I
Task 1¶
Modify the CNN code of Lab2B to endow it with a method that implements a single convolutional layer. Add another method to implement a readout layer. Use these two methods to modify the CNN class to implement a CNN with readout.
This class receives as initialization parameters the number of layers $L$ and the dimensions $N$ and $C$ of the input features $x$ and the output scores $s$. To specify the $L-1$ convolutional layers we also accept the vectors $[K_1, \ldots, K_{L-1}]$ and $[F_0, F_1, \ldots, F_{L-1}]$ containing the number of taps $K_\ell$ of the filters used at each layer and the number of features $F_\ell$ at the output of each layer.
The forward method of this class takes a matrix $x$ of dimension $N\times F_0$ as an input and returns the output $\Phi(x; \mathcal{H})$ of a CNN with $L-1$ convolutional layers and a readout layer. The dimension of this output vector is $C$.
Use Relu nonliniearities in all layers.
Note: We will use PyTorch’s implementation of 1-D convolutions: nn.Conv1D(). This function takes the same arguments and performs the same operation as the 1-D convolution function we created in lab 1B. However, PyTorch’s implementation is faster and more memory-efficient since it uses a low-level programming language. You can read more about this implemenation in the following link: https://pytorch.org/docs/stable/generated/torch.nn.Conv1d.html.
class ReadoutLayer(nn.Module):
"""
This class implements a readout layer that will be used in our
convolutional neural network below.
"""
def __init__(self, input_length, classes):
"""
Initialize the ReadoutLayer. It is essentially a fully connected layer
Args:
input_length (int): The length of the input vector.
classes (int): The number of output classes.
"""
super().__init__()
self.filter = torch.nn.Parameter(torch.empty((classes, input_length)))
# This line initializes the weights of the filter to random values.
nn.init.xavier_normal_(self.filter)
def forward(self,x):
"""
Forward method for processing input through the readout layer. It is simply a matrix multiplication
of the layer's input by the filter weights.
"""
return torch.matmul(x,self.filter.T)
class CNN(nn.Module):
"""
This class implements a convolutional neural network with two convolutional layers and a readout layer.
"""
def __init__(self,
n_input=1,
n_output=3,
n_channels=[8, 8],
kernel_sizes = [80, 3]
):
"""
Init method. This is where attributes are initialized.
Args:
n_input (int): Number of input features (1 in this case, because our audio is mono).
n_output (int): Number of output classes (3 in this case, because we have 3 digits).
n_channels (list[int]): The number of output features for each convolutional layer.
kernel_sizes (list[int]): List of kernel sizes for each convolutional layer.
"""
super().__init__()
# Initialize the first convolutional layer.
# Pytorch's Conv1d takes the number of input features, the number of output features, and the kernel size (number of taps)
# The parameter 'same' is used to keep the same dimensions as the input.
self.conv1 = nn.Conv1d(n_input,
n_channels[0],
kernel_size=kernel_sizes[0],
padding='same')
# Initialize the second convolutional layer.
# the number of input channels to this layer will be the number of output features of the first layer.
self.conv2 = nn.Conv1d(n_channels[0],
n_channels[1],
kernel_size=kernel_sizes[1],
padding='same')
# Initialize the readout layer. The input to the readout layer is all of the outputs
# of the last convolutional layer, for every output feature.
# So it's size is n_channels[1] * sample_rate
readout_input_size = n_channels[1]*sample_rate
self.readout = ReadoutLayer(readout_input_size, n_output)
def forward(self, x0):
# First layer takes x0 as input (passed when calling the function) and gives x1 as output
x1 = self.conv1(x0)
x1 = F.relu(x1)
# Second layer takes x1 as input and gives x2 as output
x2 = self.conv2(x1)
x2 = F.relu(x2)
# After the last convolutional layer, we flatten the tensor to prepare for the readout layer.
x3 = torch.flatten(x2, start_dim=1)
x3 = self.readout(x3)
return x3
# This is the convolution function that we implemented in Lab 2A
# WE WILL NOT USE this function. Instead, we will use Pytorch's implementation of convolution (Conv1D)
# which is faster and more memory-efficient.
# We leave our old implementation here for you to remember that this is the operation under the hood.
def convolution_1D(signal, conv_filter):
"""DO NOT USE."""
taps = conv_filter.shape[0]
out_features = conv_filter.shape[1]
N = signal.shape[-1]
z = torch.zeros((out_features, N))
for k in range(taps):
z = z + torch.matmul(conv_filter[k] , signal)
signal = signal.roll(1,)
signal[:,0] = 0
return z
2.1 Classification of Time Signals¶
The dimension of the output of a CNN with a readout layer can be chosen to have dimension $C$ equal to the number of classes. We can then train a CNN with readout to classify time signals by solving the empirical risk minimization problem
$$ \begin{equation} \tag{11} \mathcal{H}^* ~=~ \argmin_\mathcal{H} ~ \frac{1}{Q} \, \sum_{q=1}^{Q} \, \ell \Big( y_q, \mathbb{\Phi}(\mathbb{x_q}; \mathcal{H}) \Big) ~. \end{equation} $$
In (11) , the tensor $\mathcal{H}$ contains the convolutional filters in Layers $\ell=1$ through $\ell=L-1$ as well as the matrix $\mathbf{A}_L$ that defines the readout layer. The function $\ell ( y_q, \mathbb{\Phi}(\mathbb{x_q}; \mathcal{H}) )$ is the cross entropy loss between the available class label $y_q$ and the vector of scores $\mathbf{s}_q = \mathbb{\Phi}(\mathbb{x_q}; \mathcal{H})$.
When this CNN is deployed to make operational classifications, the output is a vector of scores $\mathbf{s} = \mathbb{\Phi}(\mathbb{x}; \mathcal{H})$. Entry $s_c$ in this vector is the likelihood we assign to class $c$. This likelihood can be converted to a hard class decision by selecting the class with the largest score
$$ \begin{equation}\tag{12} c(\mathbf{x}) ~=~ \argmax_d ~ \Big [ \mathbb{\Phi}(\mathbf{x}; \mathcal{H}) \Big]_d ~. \end{equation} $$
When using CNNs in classification problems we train them to minimize cross entropy losses. However, it is customary to evaluate their performances by counting correct classifications. In particular, if we have a test set with $\tilde{Q}$ entries we evaluate the CNN with the error rate,
$$ \begin{equation}\tag{13} e(\mathcal{H}) ~=~ \frac{1}{\tilde{Q}} \, \sum_{q=1}^{\tilde{Q}} \, \mathbb{I} \Big( y_q \neq \mathbb{\Phi}(\mathbb{x_q}; \mathcal{H}) \Big) ~. \end{equation} $$
This is the percentage of erroneous classifications in the test set.
Observe that we can also convert scores $\mathbb{s}$ to probabilities using (6). This is important when we want to gauge the confidence of a classification. In (12) we assign classes to the highest score. Thus, we predict with the same confidence whether the second highest score is close to the highest score or not. Reporting probabilities distinguishes these two situations by saying that the probability of the most likely class is, say, $90%$ or $60%$.
We must say that since we are training with a cross entropy loss this is the interpretation of the output that is most reasonable. The use of (12) is nevertheless more common.
Task 2¶
Load the data. This data contains audio samples in which people are speaking the digits 0, 1 and 2. Use the class of Task 1 to train a CNN with a readout layer to classify the audio signals into the different possible spoken digits. The CNN should have the following parameters: 2 layers, number of channels per layer: 8, kernel size (number of taps) at each layer: 80 and 3 respectively, relu non-linearities, learning rate: 0.03.
Evaluate the training cross entropy loss and the test accuracy in terms of the relative number of incorrect classifications.
Loading and exploring the data¶
# These four lines load the training and testing data.
# Both training and testing data contain audio signals and their corresponding labels.
train_audios = torch.load('train_audios_3.pt',map_location=device)
train_labels = torch.load('train_labels_3.pt',map_location=device)
test_audios = torch.load('test_audios_3.pt',map_location=device)
test_labels = torch.load('test_labels_3.pt',map_location=device)
# The sample rate for the audio signals is 8000 Hz.
sample_rate = 8000
# Take one audio sample and its corresponding label from the training data to visualize.
sample, label = train_audios[0], train_labels[0]
# Print dimensions and plot the audio signal
print(f"Shape of waveform: {sample.size()}")
print(f"Sample rate of waveform: {sample_rate}")
print(f"\nDimension of audio sample: {sample.shape}")
print(f" This is a vector with {sample.shape[1]} rows and {sample.shape[0]} column(s).")
print(f"Label for this sample is {label}")
print(f" This is a scalar value representing the digit spoken.")
print("\n")
plt.plot(sample.squeeze().cpu().numpy())
# Play the audio signal.
display(Audio(sample.cpu(), rate=sample_rate))
/var/folders/02/rwff5ttj1r9_5hnh3k8rm7tr0000gq/T/ipykernel_52489/1347946355.py:3: FutureWarning: You are using `torch.load` with `weights_only=False` (the current default value), which uses the default pickle module implicitly. It is possible to construct malicious pickle data which will execute arbitrary code during unpickling (See https://github.com/pytorch/pytorch/blob/main/SECURITY.md#untrusted-models for more details). In a future release, the default value for `weights_only` will be flipped to `True`. This limits the functions that could be executed during unpickling. Arbitrary objects will no longer be allowed to be loaded via this mode unless they are explicitly allowlisted by the user via `torch.serialization.add_safe_globals`. We recommend you start setting `weights_only=True` for any use case where you don't have full control of the loaded file. Please open an issue on GitHub for any issues related to this experimental feature. train_audios = torch.load('train_audios_3.pt',map_location=device)
Shape of waveform: torch.Size([1, 8000]) Sample rate of waveform: 8000 Dimension of audio sample: torch.Size([1, 8000]) This is a vector with 8000 rows and 1 column(s). Label for this sample is 1 This is a scalar value representing the digit spoken.
/var/folders/02/rwff5ttj1r9_5hnh3k8rm7tr0000gq/T/ipykernel_52489/1347946355.py:4: FutureWarning: You are using `torch.load` with `weights_only=False` (the current default value), which uses the default pickle module implicitly. It is possible to construct malicious pickle data which will execute arbitrary code during unpickling (See https://github.com/pytorch/pytorch/blob/main/SECURITY.md#untrusted-models for more details). In a future release, the default value for `weights_only` will be flipped to `True`. This limits the functions that could be executed during unpickling. Arbitrary objects will no longer be allowed to be loaded via this mode unless they are explicitly allowlisted by the user via `torch.serialization.add_safe_globals`. We recommend you start setting `weights_only=True` for any use case where you don't have full control of the loaded file. Please open an issue on GitHub for any issues related to this experimental feature. train_labels = torch.load('train_labels_3.pt',map_location=device) /var/folders/02/rwff5ttj1r9_5hnh3k8rm7tr0000gq/T/ipykernel_52489/1347946355.py:5: FutureWarning: You are using `torch.load` with `weights_only=False` (the current default value), which uses the default pickle module implicitly. It is possible to construct malicious pickle data which will execute arbitrary code during unpickling (See https://github.com/pytorch/pytorch/blob/main/SECURITY.md#untrusted-models for more details). In a future release, the default value for `weights_only` will be flipped to `True`. This limits the functions that could be executed during unpickling. Arbitrary objects will no longer be allowed to be loaded via this mode unless they are explicitly allowlisted by the user via `torch.serialization.add_safe_globals`. We recommend you start setting `weights_only=True` for any use case where you don't have full control of the loaded file. Please open an issue on GitHub for any issues related to this experimental feature. test_audios = torch.load('test_audios_3.pt',map_location=device) /var/folders/02/rwff5ttj1r9_5hnh3k8rm7tr0000gq/T/ipykernel_52489/1347946355.py:6: FutureWarning: You are using `torch.load` with `weights_only=False` (the current default value), which uses the default pickle module implicitly. It is possible to construct malicious pickle data which will execute arbitrary code during unpickling (See https://github.com/pytorch/pytorch/blob/main/SECURITY.md#untrusted-models for more details). In a future release, the default value for `weights_only` will be flipped to `True`. This limits the functions that could be executed during unpickling. Arbitrary objects will no longer be allowed to be loaded via this mode unless they are explicitly allowlisted by the user via `torch.serialization.add_safe_globals`. We recommend you start setting `weights_only=True` for any use case where you don't have full control of the loaded file. Please open an issue on GitHub for any issues related to this experimental feature. test_labels = torch.load('test_labels_3.pt',map_location=device)
Evaluate function¶
# We define a function to evaluate the loss. This is an auxiliary function. We do not need it to
# implement or train the neural network. We use it for evaluation after training and/or at some
# intermediate training checkpoints. This is just for visualization.
def evaluate(test_dataloader, estimator):
"""
Evaluate the performance of the estimator on the given dataloader.
Args:
dataloader (torch.utils.data.DataLoader): Data on which to evaluate performance.
estimator (torch.nn.Module): Learning parameterization to evaluate.
Returns:
float: Accuracy of the estimator evaluated on the dataloader.
"""
correct = 0 # Initialize counter for correct predictions
with torch.no_grad(): # Disable gradient computations (not needed for evaluation)
for data, target in test_dataloader: # Sweep over all batches
data = data.to(device) # Move the data tensor to the device (e.g. from CPU memory to GPU memory)
target = target.to(device) # Move the target tensor to the device (e.g. from CPU memory to GPU memory)
output = estimator(data) # Get model predictions for the batch
# Select the most likely index (class) from the output tensor
pred = output.argmax(dim=-1)
# Count the number of correct predictions in the batch
correct += torch.sum(pred == target)
# Calculate overall accuracy: percentage of correct predictions
test_accuracy = correct / len(test_dataloader.dataset)
return test_accuracy
Training the CNN¶
# Instantiate the CNN model. We create a CNN object and move it to the device.
# Similar to how we instantiate a ConvolutionalFilter object in the first code,
# we now instantiate an object from the CNN class.
estimator = CNN(n_input=1,
n_output=3,
n_channels=[8, 8],
kernel_sizes=[80, 3]).to(device)
# Set the parameters of stochastic gradient descent (SGD). These include the learning rate,
# batch size, and number of epochs. These numbers are different from those used in the
# convolutional filter example.
lr = 0.03
batch_size = 64
n_epochs = 20
optimizer = optim.SGD(estimator.parameters(), lr=lr)
# Specify the loss function. For the CNN, we use cross-entropy loss since this is a classification task.
loss = nn.CrossEntropyLoss()
# The train and test dataloaders handle randomized batches in the training set and non-shuffled
# batches in the test set. We keep the same structure as before, where we use these dataloaders
# to handle loading data into memory.
train_loader = DataLoader(
TensorDataset(train_audios, train_labels),
batch_size=batch_size,
shuffle=True,
)
test_loader = DataLoader(
TensorDataset(test_audios, test_labels),
batch_size=batch_size,
shuffle=False,
)
# Initialize null structures for storing the evolution of the training loss and test accuracy
# at the end of each epoch. This is not needed for training, just for displaying results.
losses = []
test_acc = []
print('\n')
# Begin the training loop. This is similar to the stochastic gradient descent (SGD) implementation
# used for the convolutional filter. In the inner loop, we sweep over batches of size batch_size.
# In the outer loop, we sweep over epochs. One epoch is a complete pass through the shuffled dataset.
#
# We follow the three steps required to run SGD: (i) Load the data, (ii) Evaluate Gradients,
# (iii) Take a gradient descent step. The only difference is the estimator, which is now a CNN.
for epoch in range(n_epochs): # Iterate over n_epochs epochs
for batch_idx, (x_batch, y_batch) in enumerate(train_loader): # Iterate over all batches in the dataset
# (Step i) Load the data. These commands send the data to the GPU memory.
x_batch = x_batch.to(device)
y_batch = y_batch.to(device)
# (Step ii) Compute the gradients. Use automatic differentiation.
estimator.zero_grad() # Reset the gradients to zero
y_hat = estimator(x_batch).squeeze() # Forward pass through the CNN and squeeze the output.
cross_entropy_value = loss(y_hat, y_batch.type(torch.LongTensor).to(device)) # Compute the loss.
cross_entropy_value.backward() # Compute gradients by moving backwards to the gradient reset.
# (Step iii) Update parameters by taking an SGD (or other optimizer) step.
optimizer.step()
# Print training stats at specified intervals to track progress.
if batch_idx print(f"Train Epoch: {epoch+1} \tLoss: {cross_entropy_value.item():.3f}")
# Record the loss at each iteration for visualization.
losses.append(cross_entropy_value.item())
# End of batch loop.
# Evaluate the performance of the CNN on the test set at the end of each epoch.
test_accuracy = evaluate(test_loader, estimator)
test_acc.append(test_accuracy.cpu())
# Print the test accuracy at the end of each epoch to track performance.
print(f'Epoch {epoch+1} / {n_epochs}: Test Accuracy: {test_accuracy*100:.2f}# End of epoch loop.
print('\n\nDone\n')
# Plot the training loss versus the number of iterations.
plt.plot(losses)
plt.title("training loss")
Train Epoch: 1 Loss: 1.103 Train Epoch: 1 Loss: 1.091 Train Epoch: 1 Loss: 1.088 Epoch 1 / 20: Test Accuracy: 38.84% Train Epoch: 2 Loss: 1.095 Train Epoch: 2 Loss: 1.066 Train Epoch: 2 Loss: 1.096 Epoch 2 / 20: Test Accuracy: 39.48% Train Epoch: 3 Loss: 1.124 Train Epoch: 3 Loss: 1.065 Train Epoch: 3 Loss: 1.038 Epoch 3 / 20: Test Accuracy: 52.94% Train Epoch: 4 Loss: 0.950 Train Epoch: 4 Loss: 0.989 Train Epoch: 4 Loss: 1.010 Epoch 4 / 20: Test Accuracy: 49.07% Train Epoch: 5 Loss: 0.897 Train Epoch: 5 Loss: 1.026 Train Epoch: 5 Loss: 0.878 Epoch 5 / 20: Test Accuracy: 62.53% Train Epoch: 6 Loss: 0.843 Train Epoch: 6 Loss: 1.015 Train Epoch: 6 Loss: 0.966 Epoch 6 / 20: Test Accuracy: 57.78% Train Epoch: 7 Loss: 0.869 Train Epoch: 7 Loss: 0.812 Train Epoch: 7 Loss: 0.888 Epoch 7 / 20: Test Accuracy: 68.82% Train Epoch: 8 Loss: 0.683 Train Epoch: 8 Loss: 0.657 Train Epoch: 8 Loss: 0.742 Epoch 8 / 20: Test Accuracy: 36.83% Train Epoch: 9 Loss: 1.061 Train Epoch: 9 Loss: 1.074 Train Epoch: 9 Loss: 1.071 Epoch 9 / 20: Test Accuracy: 44.40% Train Epoch: 10 Loss: 0.919 Train Epoch: 10 Loss: 0.955 Train Epoch: 10 Loss: 0.886 Epoch 10 / 20: Test Accuracy: 57.29% Train Epoch: 11 Loss: 0.877 Train Epoch: 11 Loss: 0.788 Train Epoch: 11 Loss: 0.845 Epoch 11 / 20: Test Accuracy: 58.98% Train Epoch: 12 Loss: 0.808 Train Epoch: 12 Loss: 0.768 Train Epoch: 12 Loss: 0.602 Epoch 12 / 20: Test Accuracy: 61.24% Train Epoch: 13 Loss: 0.813 Train Epoch: 13 Loss: 0.706 Train Epoch: 13 Loss: 0.611 Epoch 13 / 20: Test Accuracy: 65.51% Train Epoch: 14 Loss: 0.545 Train Epoch: 14 Loss: 0.925 Train Epoch: 14 Loss: 0.827 Epoch 14 / 20: Test Accuracy: 61.24% Train Epoch: 15 Loss: 0.795 Train Epoch: 15 Loss: 0.739 Train Epoch: 15 Loss: 0.764 Epoch 15 / 20: Test Accuracy: 56.97% Train Epoch: 16 Loss: 0.646 Train Epoch: 16 Loss: 0.544 Train Epoch: 16 Loss: 0.523 Epoch 16 / 20: Test Accuracy: 66.40% Train Epoch: 17 Loss: 0.674 Train Epoch: 17 Loss: 0.634 Train Epoch: 17 Loss: 0.648 Epoch 17 / 20: Test Accuracy: 59.15% Train Epoch: 18 Loss: 1.066 Train Epoch: 18 Loss: 0.704 Train Epoch: 18 Loss: 0.572 Epoch 18 / 20: Test Accuracy: 55.44% Train Epoch: 19 Loss: 0.675 Train Epoch: 19 Loss: 0.483 Train Epoch: 19 Loss: 0.597 Epoch 19 / 20: Test Accuracy: 65.27% Train Epoch: 20 Loss: 0.534 Train Epoch: 20 Loss: 0.532 Train Epoch: 20 Loss: 0.540 Epoch 20 / 20: Test Accuracy: 66.96% Done
Text(0.5, 1.0, 'training loss')
plt.plot(test_acc, label = 'test acc')
plt.legend()
<matplotlib.legend.Legend at 0x1555d68d0>
The CNN used in Task 2 has failed at class prediction. This is not unexpected. We introduced CNNs to overcome the limitations of fully connected neural networks. In adding the pooling operation we have rendered the architecture closer to a fully connected neural network than to a CNN. We have lost the locality and equivariance that the CNN inherits from the use of convolutions.
This challenge looks insurmountable, because we have an inherent mismatch between the nature of the input — a time signal — and the nature of the output — a class. At some point we need to abandon the time domain. The solution to this challenge is to reduce the time dimension while retaining the structure of time. This is accomplished by pooling, which we explain in the next section.
3. Pooling¶
Pooling operators reduce the dimension of the input signal while retaining the structure of time. They do so by introducing a reduction factor $\Delta$, dividing the time line in windows of width $\Delta$, and extracting information from every window.
The simplest form of pooling is sampling. Let $w=[w_0; w_1;\ldots;w_{N-1}]$ be a signal with $N$ components. We define the sampled signal as the signal $x=[x_0; x_1;\ldots;x_{N_s}]$ in which the component $x_m$ is given by $$ \tag{14} x_m ~=~ w_{m\Delta} ~. $$ Thus, the sampled signal $x$ copies one out of $\Delta$ entries of $w$ and ignores the rest. The rationale for ignoring entries is that the values of entries in a window, say $w_{m\Delta}$ and $w_{m\Delta+1}$, are not very different and we loose little information by discarding some of them.
In general, computing some aggregate summary of the entries in a pooling window is more effective than plain sampling. Average pooling replaces the entries in each pooling window by their average,
$$ \tag{15} x_m ~=~ \frac{1}{\Delta} \Big( \, w_{m\Delta} + w_{m\Delta+1} + \ldots + w_{m\Delta+(\Delta-1)} \,\Big) ~. $$
Another common approach to summarizing entries in a pooling window is to compute the maximum value, $$ \tag{16} x_m ~=~ \max \Big( \, w_{m\Delta} ,\, w_{m\Delta+1} ,\, \ldots ,\, w_{m\Delta+(\Delta-1)} \,\Big) ~. $$ This is called max pooling. Max pooling is more common than average pooling and both are more common than sampling. In any case, pooling is effective when the signal $w$ does not change much within the pooling window. When this is the case, sampling, average pooling, and max pooling all produce similar summary signals $x$.
The pooling operations in (14)-(16) apply to individual vectors. In CNNs we want to pool multiple features. This is done by pooling each feature individually. Given the matrix feature $W=[w^1; w^2;\ldots;w^{F}]$ the pooled matrix feature $X=[x^1; x^2;\ldots;x^{F}]$ contains the same number of features $F$ and is such that each of the individual vector features $x^f$ is pooled separately. Thus, Component $m$ of Feature $f$ is given by $$ \tag{17} x_m^f ~=~ \text{pool} \Big( \, w^f_{k\Delta} ,\, w^f_{k\Delta+1} ,\, \ldots ,\, w^f_{k\Delta+(\Delta-1)} \,\Big) ~, $$ where the operation $\text{pool}(\cdot)$ stands in for either sampling, average pooling or max pooling.
3.1 Locality and Equivariance¶
Pooling operators retain the locality and equivariance of time. The pooled signal $\mathbf{x}$ is also a time signal except that its components are more spaced out. If we recall the definition of the sampling time in Lab 2A, the signal $\mathbf{w}$ has entries spaced by the sampling time $T_s$ and the signal $\mathbf{x}$ has entries spaced by $\Delta T_s$. This is important because it means that the pooled signal $\mathbf{x}$ can be processed with a convolutional filter. In a layered architecture, this fact implies that signals can be pooled at Layer $\ell-1$ and processed with a convolutional filter at Layer $\ell$ (Section 4).
Although somewhat obvious it is worth remarking that the summary signal contains a smaller number of entries. If $N$ is a multiple of $\Delta$, the number of entries of $\mathbf{x}$ is $N_s = N / \Delta$. Otherwise, the number of entries of $\mathbf{x}$ is given by the integer division $N_s = N \div K + 1$.
This is important in a classification task because it is almost always the case that the number of classes $C$ is much smaller than the number of entries in the input signal $\mathbf{X}$. Thus, the use of pooling at intermediate layers ends up reducing the complexity of the readout layer. This is a eureka moment. Fully connected neural networks do not work for large dimensional signals but they do work for low dimensional signals. We can then use pooling to progressively reduce the dimension of the input signal. All the while we retain the structure of time and the ability of processing with convolutional layers. Once we reach a point at which signals are of sufficiently low complexity we can train an effective readout layer. This is the CNN architecture used in classification tasks that we introduce in the following section.
4. CNNs for Classification Tasks¶
Combining pooling and readout yields the CNN architecture that is used for classification tasks. In this architecture we have $L-1$ convolutional layers and one readout layer. Each of the convolutional layers involves a convolutional filter, a pointwise nonlinearity, and a pooling operation,
$$ \tag{18} \begin{equation} \mathbf{Z}_\ell = \sum_{k=0}^{K_1} \mathcal{S}^k \mathbf{X}_{\ell-1} \mathbf{H}_{\ell k}, \qquad \mathbf{W}_\ell = \sigma \Big(\, \mathbf{Z}_\ell \, \Big), \qquad \mathbf{X}_\ell = \text{pool} \Big(\, \mathbf{W}_\ell \, \Big) . \qquad \end{equation} $$
The multiple feature layer input $\mathbf{X}_{\ell-1}$ is processed with a MIMO convolutional filter with coefficients $\mathbf{H}_{\ell k}$. The filter output $\mathbf{Z}_\ell$ is processed with the pointwise nonlinearity $\sigma(\cdot)$. The output of this operation is a matrix of features $\mathbf{W}_\ell$ whose dimension is reduced by the pooling operator $\text{pool}(\cdot)$ to yield the Layer $\ell$ output $\mathbf{X}_\ell$.
The input $\mathbf{X}_{\ell-1}$ is of dimension $N_{\ell-1} \times F_{\ell-1}$. The output $\mathbf{X}_\ell$ is of dimension $N_{\ell} \times F_{\ell}$. The change in the length of the features from $N_{\ell-1}$ to $N_{\ell}$ is determined by the pooling operator $\text{pool}(\cdot)$. The change in the number of features from $F_{\ell-1}$ to $F_{\ell}$ is determined by the filters of the MIMO filter. The coefficients $\mathbf{H}_{\ell k}$ are of dimension $F_{\ell-1} \times F_{\ell}$.
We follow the $L-1$ convolutional layers with a readout layer,
$$ \tag{19} \begin{equation} \mathbf{Z}_{L} = \mathbf{A}_{L} \text{vec} \Big(\,\mathbf{X}_{L-1} \,\Big), \qquad \mathbf{x}_{L} = \sigma \Big(\, \mathbf{x}_{L} \, \Big) . \end{equation} $$
The input to Layer $L$ is $\text{vec}(\mathbf{X}_{L-1})$. This is the output of Layer $L-1$ rearranged in vector form. This input is multiplied by the matrix $\mathbf{A}_{L}$ to produce the intermediate feature $\mathbf{Z}_{L}$. This is then passed through the pointwise nonlinearity $\sigma(\cdot)$ to produce the layer’s output $\mathbf{X}_{L}$.
The input vector $\text{vec}(\mathbf{X}_{L-1})$ is of dimension $N_{L-1} \times F_{L-1}$. The output vector is of dimension $N_L$. This implies that the matrix $\mathbf{A}_{L}$ has $N_{L-1} \times F_{L-1}$ columns and $N_L$ rows.
The output of Layer $L$ is also the output of the CNN. We write this for reference as
$$ \tag{20} \begin{equation} \mathbf{\Phi} (\mathbf{x}; \mathcal{H}) = \mathbf{x}_L, \end{equation} $$
where the tensor $\mathcal{H}$ contains the filters of the convolutional filters used in Layers 1 through $L-1$ and the coefficients $\mathbf{A}_{L}$ of the fully connected layer. The output $\mathbf{\Phi} (\mathbf{x}; \mathcal{H})$ can be interpreted as a vector of classification scores and used in the classification empirical risk minimization problem shown in (11).
A CNN with pooling and readout resembles a CNN with readout only. Both include $L-1$ convolutional layers followed by a fully connected readout layer. The difference is that in the CNN with pooling the convolutional layers process signals of progressively lower dimension. Thus, the readout matrix $\mathbf{A}_{L}$ is of lower dimension too. This is expected to avoid the failure we observed in Task 2.
Task 3¶
Modify the CNN to incorporate pooling. This can be done by modifying the method that implements convolutional layers to incorporate the pooling operation.
As in any CNN the initialization parameters include the number of layers $L$ along with vectors $[K_1, \ldots, K_{L-1}]$ and $[F_0, F_1, \ldots, F_{L-1}]$. These vectors contain the number of taps $K_\ell$ of the filters used at each layer and the number of features $F_\ell$ at the output of each layer.
Since we are incorporating pooling the initialization parameters must also include the vecor $[N_0, N_1, \ldots, N_{L}]$ containing the dimension $N_\ell$ of the features at the output of each layer. Notice that $N_0$ matches the dimension of the input signal and $N_L=C$ matches the number of classes.
The forward method of this class takes a matrix $x$ of dimension $N_0\times F_0$ as an input and returns the output $\Phi(x; \mathcal{H})$ of a CNN with $L-1$ convolutional layers and a readout layer. The dimension of this output is $N_L \times 1$.
Use relu nonlinearities in all layers. Use max pooling in all convolutional layers where pooling is implemented.
Naive pooling implementations¶
Here are two naive implementations of pooling. If your pooling implementation looks like this, it is fine. Their computations are correct, but they are slow because they use for loops. But if you want to make your training faster, you can use the vectorized implementation of max pooling in the next section.
def average_pooling_naive(signal = torch.rand(32, 2, 80), poolsize = 4):
output_length = int(signal.shape[-1]/poolsize)
pooled_signal = torch.empty((signal.shape[0], signal.shape[1], output_length))
for i in range(output_length):
start = i*poolsize
end = start + poolsize
pooled_signal[:,:,i] = torch.mean(signal[:,:,start:end],dim=2).values
return pooled_signal
def max_pooling_naive(signal = torch.rand(32, 2, 80), poolsize = 4):
output_length = int(signal.shape[-1]/poolsize)
pooled_signal = torch.empty((signal.shape[0], signal.shape[1], output_length))
for i in range(output_length):
start = i*poolsize
end = start + poolsize
pooled_signal[:,:,i] = torch.max(signal[:,:,start:end],dim=2).values
return pooled_signal
Implementing Max Pooling and CNN with Pooling¶
Now we will implement a vectorized MaxPool1D layer, and a CNN with pooling layers.
class MaxPool1D(nn.Module):
"""
Implements max pooling in a vectorized, faster fashion.
"""
def __init__(self, pool_size):
"""
Initialize the MaxPool1D layer.
Args:
pool_size (int): The size of the pooling window.
"""
super().__init__()
self.poolsize = pool_size
def forward(self, signal):
"""
Forward pass of the max pooling operation.
Args:
signal (torch.Tensor): Input tensor of shape (batch_size, channels, length).
Returns:
torch.Tensor: Pooled tensor.
"""
# Calculate the output length after pooling
output_length = int(signal.shape[-1]/self.poolsize)
# Reshape the signal to group elements for pooling
# (batch,channels,length) -> (batch,channels,output_length,poolsize)
signal = signal.reshape((signal.shape[0], signal.shape[1], output_length, -1)).to(device)
# Perform max pooling across the last dimension, effectively
# taking the maximum value in each pooling window
# Resulting shape: (batch,channels,output_length)
pooled_signal = torch.max(signal, dim=-1).values
return pooled_signal
class CNNWithPooling(nn.Module):
"""
This class is the same as the CNN in Task 2, but with pooling layers added.
"""
def __init__(self,
n_input=1,
n_output=3,
n_channels=[8, 8],
kernel_sizes=[80, 3],
pooling_windows=[8, 8]
):
"""
Init method. This is where attributes are initialized.
Args:
n_input (int): Number of input features (1 in this case, because our audio is mono).
n_output (int): Number of output classes (3 in this case, because we have 3 digits).
n_channels (list[int]): The number of output features for each convolutional layer.
kernel_sizes (list[int]): List of kernel sizes for each convolutional layer.
pooling_windows (list[int]): List of pooling window sizes for each pooling layer.
"""
super().__init__()
# Calculate the dimension of the flattened output after convolutions and pooling
emb_dim = int(n_channels[1] * sample_rate / (pooling_windows[0] * pooling_windows[1]))
# Initialize the first convolutional layer
self.conv1 = nn.Conv1d(n_input,
n_channels[0],
kernel_size=kernel_sizes[0],
padding='same')
# Initialize the first pooling layer
self.pool1 = MaxPool1D(pooling_windows[0])
# Initialize the second convolutional layer
self.conv2 = nn.Conv1d(n_channels[0],
n_channels[1],
kernel_size=kernel_sizes[1],
padding='same')
# Initialize the second pooling layer
self.pool2 = MaxPool1D(pooling_windows[1])
# Initialize the readout layer
self.readout = ReadoutLayer(emb_dim, n_output)
def forward(self, x0):
"""
Forward method for processing input through the CNN with pooling.
Args:
x0 (torch.Tensor): Input tensor.
Returns:
torch.Tensor: Output tensor after passing through the network.
"""
# First convolutional layer followed by ReLU and pooling
x1 = self.conv1(x0)
x1 = F.relu(x1)
x1 = self.pool1(x1)
# Second convolutional layer followed by ReLU and pooling
x2 = self.conv2(x1)
x2 = F.relu(x2)
x2 = self.pool2(x2)
# Flatten the tensor for the readout layer
x3 = torch.flatten(x2, start_dim=1)
# Readout layer
x3 = self.readout(x3)
return x3
Task 4¶
Load the same data used in Task 2. Use the class of Task 3 to train a CNN with a readout layer to classify the audio signals into the different possible spoken digits. The CNN should have the following parameters: 2 layers, number of channels per layer: 8, kernel size (number of taps) at each layer: 80 and 3 respectively, learning rate: 0.05, pooling layers: max pooling.
Evaluate the training cross entropy loss and the test accuracy in terms of the relative number of incorrect classifications.
Training CNN with Pooling¶
# Instantiate the CNN model. We create a CNN object and move it to the device.
# Similar to how we instantiate a ConvolutionalFilter object in the first code,
# we now instantiate an object from the CNN class.
estimator = CNNWithPooling(n_input=1,
n_output=3,
n_channels=[8, 8],
kernel_sizes=[80, 3],
pooling_windows=[8, 8]).to(device)
# Set the parameters of stochastic gradient descent (SGD). These include the learning rate,
# batch size, and number of epochs. Here we increased the learning rate to 0.03 and the number of epochs to 20.
lr = 0.03
batch_size = 64
n_epochs = 20
optimizer = optim.SGD(estimator.parameters(), lr=lr)
# Specify the loss function. For the CNN, we use cross-entropy loss since this is a classification task.
loss = nn.CrossEntropyLoss()
# The train and test dataloaders handle randomized batches in the training set and non-shuffled
# batches in the test set. We keep the same structure as before, where we use these dataloaders
# to handle loading data into memory.
train_loader = DataLoader(
TensorDataset(train_audios, train_labels),
batch_size=batch_size,
shuffle=True,
)
test_loader = DataLoader(
TensorDataset(test_audios, test_labels),
batch_size=batch_size,
shuffle=False,
)
# Initialize null structures for storing the evolution of the training loss and test accuracy
# at the end of each epoch. This is not needed for training, just for displaying results.
losses = []
test_acc = []
print('\n')
# Begin the training loop. This is similar to the stochastic gradient descent (SGD) implementation
# used for the convolutional filter. In the inner loop, we sweep over batches of size batch_size.
# In the outer loop, we sweep over epochs. One epoch is a complete pass through the shuffled dataset.
#
# We follow the three steps required to run SGD: (i) Load the data, (ii) Evaluate Gradients,
# (iii) Take a gradient descent step. The only difference is the estimator, which is now a CNN.
for epoch in range(n_epochs): # Iterate over n_epochs epochs
for batch_idx, (x_batch, y_batch) in enumerate(train_loader): # Iterate over all batches in the dataset
# (Step i) Load the data. These commands send the data to the GPU memory.
x_batch = x_batch.to(device)
y_batch = y_batch.to(device)
# (Step ii) Compute the gradients. Use automatic differentiation.
estimator.zero_grad() # Reset the gradients to zero
y_hat = estimator(x_batch).squeeze() # Forward pass through the CNN and squeeze the output.
cross_entropy_value = loss(y_hat, y_batch.type(torch.LongTensor).to(device)) # Compute the loss.
cross_entropy_value.backward() # Compute gradients by moving backwards to the gradient reset.
# (Step iii) Update parameters by taking an SGD (or other optimizer) step.
optimizer.step()
# Print training stats at specified intervals to track progress.
if batch_idx print(f"Train Epoch: {epoch+1} \tLoss: {cross_entropy_value.item():.3f}")
# Record the loss at each iteration for visualization.
losses.append(cross_entropy_value.item())
# End of batch loop.
# Evaluate the performance of the CNN on the test set at the end of each epoch.
test_accuracy = evaluate(test_loader, estimator)
test_acc.append(test_accuracy.cpu())
# Print the test accuracy at the end of each epoch to track performance.
print(f'Epoch {epoch+1} / {n_epochs}: Test Accuracy: {test_accuracy*100:.2f}# End of epoch loop.
print('\n\nDone\n')
# Plot the training loss versus the number of iterations.
plt.plot(losses)
plt.title("training loss")
Train Epoch: 1 Loss: 1.095 Train Epoch: 1 Loss: 1.095 Train Epoch: 1 Loss: 1.097 Epoch 1 / 20: Test Accuracy: 40.21% Train Epoch: 2 Loss: 1.094 Train Epoch: 2 Loss: 1.088 Train Epoch: 2 Loss: 1.105 Epoch 2 / 20: Test Accuracy: 42.39% Train Epoch: 3 Loss: 1.104 Train Epoch: 3 Loss: 1.073 Train Epoch: 3 Loss: 1.103 Epoch 3 / 20: Test Accuracy: 44.00% Train Epoch: 4 Loss: 1.074 Train Epoch: 4 Loss: 1.092 Train Epoch: 4 Loss: 1.003 Epoch 4 / 20: Test Accuracy: 43.35% Train Epoch: 5 Loss: 1.048 Train Epoch: 5 Loss: 1.071 Train Epoch: 5 Loss: 1.025 Epoch 5 / 20: Test Accuracy: 53.51% Train Epoch: 6 Loss: 1.039 Train Epoch: 6 Loss: 1.031 Train Epoch: 6 Loss: 0.857 Epoch 6 / 20: Test Accuracy: 59.63% Train Epoch: 7 Loss: 0.926 Train Epoch: 7 Loss: 0.838 Train Epoch: 7 Loss: 0.760 Epoch 7 / 20: Test Accuracy: 62.77% Train Epoch: 8 Loss: 0.816 Train Epoch: 8 Loss: 0.768 Train Epoch: 8 Loss: 0.609 Epoch 8 / 20: Test Accuracy: 67.53% Train Epoch: 9 Loss: 0.754 Train Epoch: 9 Loss: 0.641 Train Epoch: 9 Loss: 0.866 Epoch 9 / 20: Test Accuracy: 73.89% Train Epoch: 10 Loss: 0.620 Train Epoch: 10 Loss: 0.417 Train Epoch: 10 Loss: 0.648 Epoch 10 / 20: Test Accuracy: 68.73% Train Epoch: 11 Loss: 0.659 Train Epoch: 11 Loss: 0.946 Train Epoch: 11 Loss: 0.632 Epoch 11 / 20: Test Accuracy: 75.99% Train Epoch: 12 Loss: 0.390 Train Epoch: 12 Loss: 0.585 Train Epoch: 12 Loss: 0.672 Epoch 12 / 20: Test Accuracy: 78.65% Train Epoch: 13 Loss: 0.923 Train Epoch: 13 Loss: 0.525 Train Epoch: 13 Loss: 0.742 Epoch 13 / 20: Test Accuracy: 74.05% Train Epoch: 14 Loss: 0.594 Train Epoch: 14 Loss: 0.436 Train Epoch: 14 Loss: 0.620 Epoch 14 / 20: Test Accuracy: 76.23% Train Epoch: 15 Loss: 0.485 Train Epoch: 15 Loss: 0.589 Train Epoch: 15 Loss: 0.594 Epoch 15 / 20: Test Accuracy: 68.98% Train Epoch: 16 Loss: 0.763 Train Epoch: 16 Loss: 0.470 Train Epoch: 16 Loss: 0.459 Epoch 16 / 20: Test Accuracy: 76.55% Train Epoch: 17 Loss: 0.677 Train Epoch: 17 Loss: 0.416 Train Epoch: 17 Loss: 0.604 Epoch 17 / 20: Test Accuracy: 73.81% Train Epoch: 18 Loss: 0.677 Train Epoch: 18 Loss: 0.499 Train Epoch: 18 Loss: 0.701 Epoch 18 / 20: Test Accuracy: 74.05% Train Epoch: 19 Loss: 0.454 Train Epoch: 19 Loss: 0.536 Train Epoch: 19 Loss: 0.424 Epoch 19 / 20: Test Accuracy: 79.45% Train Epoch: 20 Loss: 0.508 Train Epoch: 20 Loss: 0.694 Train Epoch: 20 Loss: 0.806 Epoch 20 / 20: Test Accuracy: 81.06% Done
Text(0.5, 1.0, 'training loss')
plt.plot(test_acc, label = 'test acc')
plt.legend()
<matplotlib.legend.Legend at 0x157dab380>