序列模型是指一类特别设计来处理序列数据的神经网络模型。序列数据指的是数据中的每个元素都有先后顺序,比如时间序列数据(股票价格、天气变化等)、自然语言文本(句子中的单词顺序)、语音信号等。
1 统计工具
前面介绍了卷积神经网络架构,但是在处理序列数据时,需要新的神经网络架构,下面以股票价格为例:
我们用 x t x_{t} xt表示价格,其中 t t t表示时间步(time step),也就是在时间步 t t t时观察到的价格 x t x_{t} xt,我们通过下列公式来表示我们预测第 t t t日的价格:
x t ∼ P ( x t ∣ x t − 1 , … , x 1 ) . x_t \sim P(x_t \mid x_{t-1}, \ldots, x_1). xt∼P(xt∣xt−1,…,x1).
即,在已知 1 1 1 到 t − 1 t-1 t−1 的价格,求第 t t t 天的价格的概率分布。
1.1 自回归模型
为了实现这个预测,可以使用自回归模型:假设当前值 y t y_{t} yt 与过去的值 y t − 1 , y t − 2 , . . . y t − p y_{t-1} , y_{t-2} , ...y_{t-p} yt−1,yt−2,...yt−p 之间存在线性关系,一般形式为 :
其中:
大致分为两种策略:
①自回归模型: 假设在现实情况下相当长的序列 x t − 1 , … , x 1 x_{t-1}, \ldots, x_1 xt−1,…,x1可能是没价值的,因此我们只需要满足某个长度为 τ \tau τ的时间跨度, 即使用观测序列 x t − 1 , … , x t − τ x_{t-1}, \ldots, x_{t-\tau} xt−1,…,xt−τ。也就是说过长的历史序列可能并不必要,因此只需要关注较短的一段历史数据即可。因为只考虑观测值本身,所以叫自回归模型
②隐变量自回归模型: 即保留一些对过去观测的总结 h t h_{t} ht,这个“总结”是无法直观解释的,它是模型自助捕捉的内部关系依赖,然后同时更新预测值 x ^ t \hat{x}_t x^t和 h t h_t ht,即变为下列式子: x ^ t = P ( x t ∣ h t ) 和 h t = g ( h t − 1 , x t − 1 ) \hat{x}_t = P(x_t \mid h_{t}) 和h_t = g(h_{t-1}, x_{t-1}) x^t=P(xt∣ht)和ht=g(ht−1,xt−1)由于 h t h_{t} ht h t h_{t} ht从未被观测到,这类模型也被称为隐变量自回归模型,这里做出一个假设,即序列本身的动力学(数据随时间演变的方式)不会改变,意味着我们可以用过去的数据来推断未来的趋势,因为我们假定基本的动态规则是一致的。因此,整个序列的概率值可以表示为一系列条件概率的乘积:
P ( x 1 , … , x T ) = ∏ t = 1 T P ( x t ∣ x t − 1 , … , x 1 ) . P(x_1, \ldots, x_T) = \prod_{t=1}^T P(x_t \mid x_{t-1}, \ldots, x_1). P(x1,…,xT)=t=1∏TP(xt∣xt−1,…,x1).
注意,如果我们处理的是离散的对象(如单词), 而不是连续的数字,则上述的考虑仍然有效。我们需要使用分类器而不是回归模型来估计
1.2 马尔可夫模型
马尔可夫条件: 在自回归模型中,如果 t t t 时刻的数值,只与 x t − 1 , … , x t − τ x_{t-1}, \ldots, x_{t-\tau} xt−1,…,xt−τ 有关,而不是整个过去的序列,则称其满足马尔可夫条件。
如果 τ = 1 \tau = 1 τ=1 ,则得到了一个一阶马尔可夫模型, P ( x ) P(x) P(x)由如下公式表示:
P ( x 1 , … , x T ) = ∏ t = 1 T P ( x t ∣ x t − 1 ) 当 P ( x 1 ∣ x 0 ) = P ( x 1 ) . P(x_1, \ldots, x_T) = \prod_{t=1}^T P(x_t \mid x_{t-1}) \text{ 当 } P(x_1 \mid x_0) = P(x_1). P(x1,…,xT)=t=1∏TP(xt∣xt−1) 当 P(x1∣x0)=P(x1).
若当假设 x t x_t xt 仅是离散值时,可以使用动态规划可以沿着马尔可夫链精确地计算结果。
2 训练、预测
下面我们将用一个正弦函数和一些噪声生成1000个序列数据,并使用自回归模型进行训练和预测
2.1 生成数据
import torch
from torch import nn
import matplotlib.pyplot as plt
import numpy as np
from torch.utils.data import TensorDataset, DataLoaderT=1000
time=torch.arange(1,T+1,dtype=torch.float32)
x=torch.sin(0.01*time)+torch.normal(0,0.2,(T,))
# 绘制折线图
plt.plot(time, x)
plt.xlabel('Time')
plt.ylabel('Value')
plt.title('Time Series Data')
plt.show()
运行结果
2.2 构造数据集
我们是准备用 y t = F ( X t ) y_t=F(X_t) yt=F(Xt),其中 X t = [ x t − τ , … , x t − 1 ] X_t= [x_{t-\tau}, \ldots, x_{t-1}] Xt=[xt−τ,…,xt−1],我们这里假设 τ = 4 \tau=4 τ=4,即用前四个数据来预测下一个数据,但是这样的话,前 4 4 4 个数据就没有历史样本去描述了,一般的做法是直接舍弃,或者用零序列去填充。
这里我们用600个数据进行训练,剩余的用于预测。
构建数据集时,使用滑动窗口去构建:
# 构造数据集
tau=4# 初始化特征矩阵,因为前四个值就是当前值的特征
features = torch.zeros((T - tau, tau))
for i in range(T - tau): # 用滑动窗口进行构建features[i,:]=x[i:tau+i]
print('features:',features.shape)
print(features[:5])labels = x[tau:].reshape((-1, 1))
print('labels:',labels.shape)
print(labels[:5])batch_size = 16
n = 600 # 只有前600个样本用于训练
dataset = TensorDataset(features[:n], labels[:n])
train_iter = DataLoader(dataset, batch_size=batch_size, shuffle=False)
运行结果
2.3 构造模型进行训练
# 构造模型
def init_weights(m):if type(m)==nn.Linear:nn.init.xavier_uniform_(m.weight)def net():net=nn.Sequential(nn.Linear(4,10),nn.ReLU(),nn.Linear(10,1))net.apply(init_weights)return net# 评估模型在给定数据集上的损失
def evaluate_loss(net, data_iter, loss):"""评估模型在给定数据集上的损失"""net.eval() # 设置模型为评估模式total_loss = 0.0with torch.no_grad(): # 不计算梯度for X, y in data_iter:y_hat = net(X)l = loss(y_hat, y)total_loss += l.sum().item() # 计算总损失net.train() # 恢复模型为训练模式return total_loss / len(data_iter.dataset)loss=nn.MSELoss(reduction='none')
lr=0.01
net=net()
optimzer=torch.optim.Adam(net.parameters(),lr)
loss_sum=[]
num_epoch=20
def train(net,num_epoch,train_iter,loss,optimzer,loss_sum):for epoch in range(num_epoch):for x,y in train_iter:optimzer.zero_grad()l=loss(net(x),y)l.sum().backward()optimzer.step()temp=evaluate_loss(net,train_iter,loss)loss_sum.append(temp)print("epoch ",epoch+1,": loss:",temp)train(net,num_epoch,train_iter,loss,optimzer,loss_sum)# 绘制折线图
plt.plot(range(num_epoch), loss_sum)
plt.xlabel('epoch')
plt.ylabel('loss')
plt.show()
运行结果
2.4 预测
# 使用模型进行预测
def predict(net, data_iter):net.eval() # 设置模型为评估模式predictions = []with torch.no_grad(): # 不计算梯度for X, y in data_iter:y_hat = net(X)predictions.extend(y_hat.numpy())net.train() # 恢复模型为训练模式return predictions# 获取测试集的预测结果
predictions = predict(net, test_iter)# 绘制预测结果与真实值的对比图
true_values = labels[n:].numpy()
plt.plot(true_values, label='True Values')
plt.plot(predictions, label='Predictions')
plt.xlabel('Time')
plt.ylabel('Value')
plt.legend()
plt.show()
运行结果
2.5 多步预测
# 多步预测
def multistep_predict(net, data_iter, steps):net.eval() multistep_predictions = []with torch.no_grad(): for X, y in data_iter:current_features = X.clone()for _ in range(steps):'''在每一步中,模型用 current_features 作为输入,并预测出 y_hat。然后将 y_hat 拼接到 current_features 的末尾,同时移除 current_features 的第一个时间步,保持输入长度不变。这样,y_hat 成为下一步的输入'''y_hat = net(current_features)current_features = torch.cat([current_features[:, 1:], y_hat], dim=1)multistep_predictions.extend(y_hat.numpy())net.train() return multistep_predictions# 获取测试集的不同步数的多步预测结果
steps = [4, 16, 32]
multistep_predictions = {step: multistep_predict(net, test_iter, step) for step in steps}# 绘制结果
plt.figure(figsize=(12, 6)) # 设置图像的宽度为12英寸,高度为6英寸
plt.plot(true_values, label='True Values')
plt.plot(ones_predictions, label='1-step Predictions')
for step, preds in multistep_predictions.items():plt.plot(preds, label=f'{step}-step Predictions')
plt.xlabel('Time')
plt.ylabel('Value')
plt.legend()
plt.show()
上述的多步预测是迭代预测法,即用自己预测数据再去预测下一个数据,另一种方法是seq2seq,后面在介绍,迭代预测法如下图所示: