频率增强通道注意力机制(FECAM)学习总结

本文提出了一种新的频率增强通道注意力机制(FECAM),旨在解决时间序列预测中傅里叶变换因吉布斯现象导致的高频噪声问题。FECAM基于离散余弦变换,能自适应地模拟信道间的频率依赖性,有效避免预测误差。实验显示,该机制可提升多种主流模型的预测性能,如LSTM、Reformer、Informer和Autoformer等。


时间序列预测是一个长期存在的挑战,因为真实世界的信息处于各种场景(例如,能源,天气,交通,经济,地震预警)。然而,一些主流的预测模型预测结果与实际情况大相径庭。我们认为,这是模型缺乏捕获真实世界数据集中丰富包含的频率信息的能力的原因。目前主流的频率信息提取方法都是基于傅里叶变换(FT)的。然而,由于吉布斯现象,FT的使用是有问题的。如果序列两侧的值差异很大,则在两侧周围观察到振荡近似,并且会引入高频噪声。因此,我们提出了一种新的频率增强信道注意力,它基于离散余弦变换自适应地模拟信道之间的频率相互依赖性,这将从本质上避免傅里叶变换过程中有问题的周期引起的高频噪声,这被定义为吉布斯现象。我们证明了该网络在六个真实数据集中的泛化非常有效,并实现了最先进的性能,我们进一步证明了频率增强信道注意力机制模块可以灵活地应用于不同的网络。该模块可以提高现有主流网络的预测能力,只需几行代码,即可在LSTM上降低35.99%的MSE,在Reformer上降低10.01%,在Informer上降低8.71%,在Autoformer上降低8.29%,在Transformer上降低8.06%等,计算成本很小。

这篇论文提出了一种基于离散余弦变换的频率增强通道注意机制(FECAM),通过捕捉时间序列中的频率信息来提升主流深度学习模型的预测能力,有效避免傅里叶变换引入的高频噪声,显著提升了多种时间序列预测任务的性能。

SENET(channel attention)

FECAM(Frequency Enhanced Channel Attention Mechanism)

As a module to enhance the frequency domain modeling capability of transformers and LSTM

 下面对其部分源码进行解读:

SlefAttention_Family.py代码:

import torch
import torch.nn as nn
import torch.nn.functional as Fimport matplotlib.pyplot as pltimport numpy as np
import math
from math import sqrt
from utils.masking import TriangularCausalMask, ProbMask
# from masking import TriangularCausalMask, ProbMask#for parameter count
import osclass FullAttention(nn.Module):def __init__(self, mask_flag=True, factor=5, scale=None, attention_dropout=0.1, output_attention=False):super(FullAttention, self).__init__()self.scale = scaleself.mask_flag = mask_flagself.output_attention = output_attentionself.dropout = nn.Dropout(attention_dropout)def forward(self, queries, keys, values, attn_mask):B, L, H, E = queries.shape_, S, _, D = values.shapescale = self.scale or 1. / sqrt(E)scores = torch.einsum("blhe,bshe->bhls", queries, keys)if self.mask_flag:if attn_mask is None:attn_mask = TriangularCausalMask(B, L, device=queries.device)scores.masked_fill_(attn_mask.mask, -np.inf)A = self.dropout(torch.softmax(scale * scores, dim=-1))V = torch.einsum("bhls,bshd->blhd", A, values)if self.output_attention:return (V.contiguous(), A)else:return (V.contiguous(), None)class ProbAttention(nn.Module):def __init__(self, mask_flag=True, factor=5, scale=None, attention_dropout=0.1, output_attention=False):super(ProbAttention, self).__init__()self.factor = factorself.scale = scaleself.mask_flag = mask_flagself.output_attention = output_attentionself.dropout = nn.Dropout(attention_dropout)def _prob_QK(self, Q, K, sample_k, n_top):  # n_top: c*ln(L_q)# Q [B, H, L, D]B, H, L_K, E = K.shape_, _, L_Q, _ = Q.shape# calculate the sampled Q_KK_expand = K.unsqueeze(-3).expand(B, H, L_Q, L_K, E)index_sample = torch.randint(L_K, (L_Q, sample_k))  # real U = U_part(factor*ln(L_k))*L_qK_sample = K_expand[:, :, torch.arange(L_Q).unsqueeze(1), index_sample, :]Q_K_sample = torch.matmul(Q.unsqueeze(-2), K_sample.transpose(-2, -1)).squeeze()# find the Top_k query with sparisty measurementM = Q_K_sample.max(-1)[0] - torch.div(Q_K_sample.sum(-1), L_K)M_top = M.topk(n_top, sorted=False)[1]# use the reduced Q to calculate Q_KQ_reduce = Q[torch.arange(B)[:, None, None],torch.arange(H)[None, :, None],M_top, :]  # factor*ln(L_q)Q_K = torch.matmul(Q_reduce, K.transpose(-2, -1))  # factor*ln(L_q)*L_kreturn Q_K, M_topdef _get_initial_context(self, V, L_Q):B, H, L_V, D = V.shapeif not self.mask_flag:# V_sum = V.sum(dim=-2)V_sum = V.mean(dim=-2)contex = V_sum.unsqueeze(-2).expand(B, H, L_Q, V_sum.shape[-1]).clone()else:  # use maskassert (L_Q == L_V)  # requires that L_Q == L_V, i.e. for self-attention onlycontex = V.cumsum(dim=-2)return contexdef _update_context(self, context_in, V, scores, index, L_Q, attn_mask):B, H, L_V, D = V.shapeif self.mask_flag:attn_mask = ProbMask(B, H, L_Q, index, scores, device=V.device)scores.masked_fill_(attn_mask.mask, -np.inf)attn = torch.softmax(scores, dim=-1)  # nn.Softmax(dim=-1)(scores)context_in[torch.arange(B)[:, None, None],torch.arange(H)[None, :, None],index, :] = torch.matmul(attn, V).type_as(context_in)if self.output_attention:attns = (torch.ones([B, H, L_V, L_V]) / L_V).type_as(attn).to(attn.device)attns[torch.arange(B)[:, None, None], torch.arange(H)[None, :, None], index, :] = attnreturn (context_in, attns)else:return (context_in, None)def forward(self, queries, keys, values, attn_mask):B, L_Q, H, D = queries.shape_, L_K, _, _ = keys.shapequeries = queries.transpose(2, 1)keys = keys.transpose(2, 1)values = values.transpose(2, 1)U_part = self.factor * np.ceil(np.log(L_K)).astype('int').item()  # c*ln(L_k)u = self.factor * np.ceil(np.log(L_Q)).astype('int').item()  # c*ln(L_q)U_part = U_part if U_part < L_K else L_Ku = u if u < L_Q else L_Qscores_top, index = self._prob_QK(queries, keys, sample_k=U_part, n_top=u)# add scale factorscale = self.scale or 1. / sqrt(D)if scale is not None:scores_top = scores_top * scale# get the contextcontext = self._get_initial_context(values, L_Q)# update the context with selected top_k queriescontext, attn = self._update_context(context, values, scores_top, index, L_Q, attn_mask)return context.contiguous(), attnclass AttentionLayer(nn.Module):def __init__(self, attention, d_model, n_heads, d_keys=None,d_values=None):super(AttentionLayer, self).__init__()d_keys = d_keys or (d_model // n_heads)d_values = d_values or (d_model // n_heads)self.inner_attention = attentionself.query_projection = nn.Linear(d_model, d_keys * n_heads)self.key_projection = nn.Linear(d_model, d_keys * n_heads)self.value_projection = nn.Linear(d_model, d_values * n_heads)self.out_projection = nn.Linear(d_values * n_heads, d_model)self.n_heads = n_headsdef forward(self, queries, keys, values, attn_mask):B, L, _ = queries.shape_, S, _ = keys.shapeH = self.n_headsqueries = self.query_projection(queries).view(B, L, H, -1)keys = self.key_projection(keys).view(B, S, H, -1)values = self.value_projection(values).view(B, S, H, -1)out, attn = self.inner_attention(queries,keys,values,attn_mask)out = out.view(B, L, -1)return self.out_projection(out), attn

这个代码实现了一个注意力机制模块,主要包括了两种注意力机制(全局注意力 FullAttention 和 概率注意力 ProbAttention),以及一个常规的 AttentionLayer 用于处理多头注意力。以下是详细的解释:

1. 基础导入

import torch
import torch.nn as nn
import torch.nn.functional as F
import matplotlib.pyplot as plt
import numpy as np
import math
from math import sqrt
from utils.masking import TriangularCausalMask, ProbMask
  • torchtorch.nntorch.nn.functional 是 PyTorch 的核心模块,用于定义和操作神经网络结构。
  • matplotlib.pyplotnumpy 是常用的 Python 库,用于绘图和数值计算。
  • TriangularCausalMaskProbMask 用于为注意力机制提供掩码(masking),以确保特定位置的注意力权重被正确计算。

2. FullAttention 类(全局注意力)

class FullAttention(nn.Module):def __init__(self, mask_flag=True, factor=5, scale=None, attention_dropout=0.1, output_attention=False):super(FullAttention, self).__init__()self.scale = scaleself.mask_flag = mask_flagself.output_attention = output_attentionself.dropout = nn.Dropout(attention_dropout)
  • FullAttention 实现了全局自注意力机制。它通过 querieskeys 计算注意力权重。
  • mask_flag:决定是否应用掩码。
  • scale:对注意力分数进行缩放。
  • attention_dropout:为防止过拟合,应用 Dropout。

forward 函数:

def forward(self, queries, keys, values, attn_mask):B, L, H, E = queries.shape_, S, _, D = values.shapescale = self.scale or 1. / sqrt(E)scores = torch.einsum("blhe,bshe->bhls", queries, keys)if self.mask_flag:if attn_mask is None:attn_mask = TriangularCausalMask(B, L, device=queries.device)scores.masked_fill_(attn_mask.mask, -np.inf)A = self.dropout(torch.softmax(scale * scores, dim=-1))V = torch.einsum("bhls,bshd->blhd", A, values)return (V.contiguous(), A if self.output_attention else None)
  • torch.einsum:使用爱因斯坦求和约定来进行矩阵乘法,计算 querieskeys 的相似度。
  • mask_flag:当使用掩码时,将填充 -np.inf 来忽略特定位置的注意力分数。
  • softmax:用于将分数转换为概率分布。
  • V:是使用注意力分数和 values 计算出的加权和。

3. ProbAttention 类(概率注意力)

class ProbAttention(nn.Module):def __init__(self, mask_flag=True, factor=5, scale=None, attention_dropout=0.1, output_attention=False):super(ProbAttention, self).__init__()self.factor = factorself.scale = scaleself.mask_flag = mask_flagself.output_attention = output_attentionself.dropout = nn.Dropout(attention_dropout)
  • ProbAttention 实现了一种基于稀疏注意力的机制。通过概率的方法减少计算量,特别适用于长序列的处理。
  • factor:控制采样和稀疏度。
  • scaleattention_dropout 类似于 FullAttention 中的作用。

_prob_QK 函数:

def _prob_QK(self, Q, K, sample_k, n_top):K_expand = K.unsqueeze(-3).expand(B, H, L_Q, L_K, E)index_sample = torch.randint(L_K, (L_Q, sample_k))K_sample = K_expand[:, :, torch.arange(L_Q).unsqueeze(1), index_sample, :]Q_K_sample = torch.matmul(Q.unsqueeze(-2), K_sample.transpose(-2, -1)).squeeze()M = Q_K_sample.max(-1)[0] - torch.div(Q_K_sample.sum(-1), L_K)M_top = M.topk(n_top, sorted=False)[1]Q_reduce = Q[torch.arange(B)[:, None, None], torch.arange(H)[None, :, None], M_top, :]Q_K = torch.matmul(Q_reduce, K.transpose(-2, -1))return Q_K, M_top
  • 目的:通过抽样计算 QK 的相似度,从而减少全局计算开销。
  • 采样:通过从 K 中随机选择一部分,并与 Q 进行计算,得出最具代表性的 Top_k 查询,减少计算负担。

4. AttentionLayer 类

class AttentionLayer(nn.Module):def __init__(self, attention, d_model, n_heads, d_keys=None, d_values=None):super(AttentionLayer, self).__init__()self.inner_attention = attentionself.query_projection = nn.Linear(d_model, d_keys * n_heads)self.key_projection = nn.Linear(d_model, d_keys * n_heads)self.value_projection = nn.Linear(d_model, d_values * n_heads)self.out_projection = nn.Linear(d_values * n_heads, d_model)self.n_heads = n_heads
  • AttentionLayer 是多头注意力层,将输入的 querieskeysvalues 进行线性变换,然后通过注意力机制处理。
  • d_model:输入特征的维度。
  • n_heads:多头注意力的头数。
  • inner_attention:可以是 FullAttentionProbAttention
forward 函数:
def forward(self, queries, keys, values, attn_mask):B, L, _ = queries.shapeH = self.n_headsqueries = self.query_projection(queries).view(B, L, H, -1)keys = self.key_projection(keys).view(B, S, H, -1)values = self.value_projection(values).view(B, S, H, -1)out, attn = self.inner_attention(queries,keys,values,attn_mask)out = out.view(B, L, -1)return self.out_projection(out), attn
  • 线性变换querieskeysvalues 都通过线性层进行投影,转换为适应多头注意力的形状。
  • 多头注意力:将 querieskeysvalues 分成多个头来计算注意力。
  • 输出投影:最后通过 out_projection 投影到原始维度。

总结

  • 全局注意力FullAttention)计算每对 querieskeys 的相似度,用于标准的 Transformer 模型。
  • 概率注意力ProbAttention)通过稀疏采样减少计算复杂度,特别适用于长序列输入。
  • AttentionLayer 实现了一个多头注意力机制,可以选择全局注意力或概率注意力来处理输入。

这段代码的核心是实现不同的注意力机制,主要用于序列到序列的建模任务,例如时间序列预测或自然语言处理任务。


在 PyTorch 中,forward 函数是每个自定义神经网络模块(nn.Module)的核心方法,它定义了模型的前向传播过程。前向传播是指输入数据如何经过网络层,逐步计算出输出结果的过程。

forward 函数的主要作用:

  1. 定义模型的计算逻辑forward 函数负责具体的计算过程。每当你将输入传入模型时,模型会自动调用 forward 函数来执行从输入到输出的所有计算。

    forward 函数中,你将定义:

    • 数据如何流经每一层(如卷积层、全连接层、注意力机制等)。
    • 应用哪些激活函数、正则化(如 BatchNormDropout)等操作。
    • 最终如何产生输出。
  2. 将输入映射到输出forward 函数的核心任务是将输入数据(张量)传递到各层并生成输出。例如,在卷积神经网络中,输入张量会经过卷积层、池化层、激活函数等模块,最后输出结果。

  3. 自动执行反向传播: 虽然 forward 仅定义前向传播过程,但它与自动微分(autograd)紧密相连。当你调用 loss.backward() 时,PyTorch 会自动根据 forward 中的操作来计算梯度,进而进行反向传播和参数更新。


Transformer_EncDec.py代码:

import torch
import torch.nn as nn
import torch.nn.functional as F
from layers.dctnet import dct_channel_block, dct
# from dctnet import dct_channel_block, dctclass ConvLayer(nn.Module):def __init__(self, c_in):super(ConvLayer, self).__init__()self.downConv = nn.Conv1d(in_channels=c_in,out_channels=c_in,kernel_size=3,padding=2,padding_mode='circular')self.norm = nn.BatchNorm1d(c_in)self.activation = nn.ELU()self.maxPool = nn.MaxPool1d(kernel_size=3, stride=2, padding=1)def forward(self, x):x = self.downConv(x.permute(0, 2, 1))x = self.norm(x)x = self.activation(x)x = self.maxPool(x)x = x.transpose(1, 2)return xclass EncoderLayer(nn.Module):def __init__(self, attention, d_model, d_ff=None, dropout=0.1, activation="relu"):super(EncoderLayer, self).__init__()d_ff = d_ff or 4 * d_modelself.attention = attentionself.conv1 = nn.Conv1d(in_channels=d_model, out_channels=d_ff, kernel_size=1)self.conv2 = nn.Conv1d(in_channels=d_ff, out_channels=d_model, kernel_size=1)self.norm1 = nn.LayerNorm(d_model)self.norm2 = nn.LayerNorm(d_model)self.dropout = nn.Dropout(dropout)self.activation = F.relu if activation == "relu" else F.geluself.dct_layer=dct_channel_block(512)self.dct_norm = nn.LayerNorm([512], eps=1e-6)def forward(self, x, attn_mask=None):# print(x.shape)#torch.Size([32, 96, 512])#torch.Size([32, 49, 512])'''before self-attention'''# mid  = self.dct_layer(x)## x = x+midnew_x, attn = self.attention(x, x, x,attn_mask=attn_mask)x = x + self.dropout(new_x)y = x = self.norm1(x)y = self.dropout(self.activation(self.conv1(y.transpose(-1, 1))))y = self.dropout(self.conv2(y).transpose(-1, 1))# print("y.shape:",y.shape)#y.shape: torch.Size([32, 96, 512])#为了减少计算量,不然要做512次L=96的dct#加入到moduleresult = self.norm2(x + y)# mid  = self.dct_layer(result)# result = result+mid# result = self.dct_norm(result) #norm 144return result, attn# return self.norm2(x + y), attnclass Encoder(nn.Module):def __init__(self, attn_layers, conv_layers=None, norm_layer=None):super(Encoder, self).__init__()self.attn_layers = nn.ModuleList(attn_layers)self.conv_layers = nn.ModuleList(conv_layers) if conv_layers is not None else Noneself.norm = norm_layerdef forward(self, x, attn_mask=None):# x [B, L, D]attns = []if self.conv_layers is not None:for attn_layer, conv_layer in zip(self.attn_layers, self.conv_layers):x, attn = attn_layer(x, attn_mask=attn_mask)x = conv_layer(x)attns.append(attn)x, attn = self.attn_layers[-1](x)attns.append(attn)else:for attn_layer in self.attn_layers:x, attn = attn_layer(x, attn_mask=attn_mask)attns.append(attn)if self.norm is not None:x = self.norm(x)return x, attnsclass DecoderLayer(nn.Module):def __init__(self, self_attention, cross_attention, d_model, d_ff=None,dropout=0.1, activation="relu"):super(DecoderLayer, self).__init__()d_ff = d_ff or 4 * d_modelself.self_attention = self_attentionself.cross_attention = cross_attentionself.conv1 = nn.Conv1d(in_channels=d_model, out_channels=d_ff, kernel_size=1)self.conv2 = nn.Conv1d(in_channels=d_ff, out_channels=d_model, kernel_size=1)self.norm1 = nn.LayerNorm(d_model)self.norm2 = nn.LayerNorm(d_model)self.norm3 = nn.LayerNorm(d_model)self.dropout = nn.Dropout(dropout)self.activation = F.relu if activation == "relu" else F.gelu# self.dct_layer=dct_channel_block(512)# self.dct_norm = nn.LayerNorm([512], eps=1e-6)def forward(self, x, cross, x_mask=None, cross_mask=None):x = x + self.dropout(self.self_attention(x, x, x,attn_mask=x_mask)[0])x = self.norm1(x)x = x + self.dropout(self.cross_attention(x, cross, cross,attn_mask=cross_mask)[0])y = x = self.norm2(x)y = self.dropout(self.activation(self.conv1(y.transpose(-1, 1))))y = self.dropout(self.conv2(y).transpose(-1, 1))# result = self.norm3(x + y)# mid  = self.dct_layer(result)# result = result+mid# result = self.dct_norm(result) #norm 144# return result# return self.norm3(x + y)return self.norm3(x + y)class Decoder(nn.Module):def __init__(self, layers, norm_layer=None, projection=None):super(Decoder, self).__init__()self.layers = nn.ModuleList(layers)self.norm = norm_layerself.projection = projectiondef forward(self, x, cross, x_mask=None, cross_mask=None):for layer in self.layers:x = layer(x, cross, x_mask=x_mask, cross_mask=cross_mask)if self.norm is not None:x = self.norm(x)if self.projection is not None:x = self.projection(x)return x

这段代码定义了一个类似 Transformer 的神经网络架构,包含卷积层、编码器层和解码器层。它可以用于处理序列数据(如时间序列、自然语言处理等)。具体包括以下组件:

1. ConvLayer(卷积层):

class ConvLayer(nn.Module):def __init__(self, c_in):super(ConvLayer, self).__init__()self.downConv = nn.Conv1d(in_channels=c_in, out_channels=c_in, kernel_size=3, padding=2, padding_mode='circular')self.norm = nn.BatchNorm1d(c_in)self.activation = nn.ELU()self.maxPool = nn.MaxPool1d(kernel_size=3, stride=2, padding=1)
  • 这个卷积层首先对输入进行 1D 卷积,然后进行批归一化(BatchNorm)和 ELU 激活,再通过最大池化层减少特征维度。
  • 作用:将高维的输入进行降采样,同时提取局部特征。

2. EncoderLayer(编码器层)

class EncoderLayer(nn.Module):def __init__(self, attention, d_model, d_ff=None, dropout=0.1, activation="relu"):super(EncoderLayer, self).__init__()d_ff = d_ff or 4 * d_modelself.attention = attentionself.conv1 = nn.Conv1d(in_channels=d_model, out_channels=d_ff, kernel_size=1)self.conv2 = nn.Conv1d(in_channels=d_ff, out_channels=d_model, kernel_size=1)self.norm1 = nn.LayerNorm(d_model)self.norm2 = nn.LayerNorm(d_model)self.dropout = nn.Dropout(dropout)self.activation = F.relu if activation == "relu" else F.geluself.dct_layer = dct_channel_block(512)  # DCT增强self.dct_norm = nn.LayerNorm([512], eps=1e-6)
  • 作用:这个层结合了注意力机制、自注意力(self-attention)、卷积层(用于前馈网络),并且通过 LayerNorm 和 Dropout 进行正则化。它还引入了 DCT(离散余弦变换)来增强输入的特征处理。
  • 自注意力机制:通过 attention 来捕获不同位置之间的相关性。
  • 卷积网络:通过卷积进一步对特征进行变换和学习。

3. Encoder(编码器)

class Encoder(nn.Module):def __init__(self, attn_layers, conv_layers=None, norm_layer=None):super(Encoder, self).__init__()self.attn_layers = nn.ModuleList(attn_layers)self.conv_layers = nn.ModuleList(conv_layers) if conv_layers is not None else Noneself.norm = norm_layer
  • 作用:这个类是编码器的整体结构,由多个 EncoderLayer 组成。如果有卷积层(conv_layers),会在每次注意力计算后对输入进行卷积操作。
  • 流程:输入经过每一层的注意力层,最后通过 LayerNorm 进行归一化。

4. DecoderLayer(解码器层)

class DecoderLayer(nn.Module):def __init__(self, self_attention, cross_attention, d_model, d_ff=None, dropout=0.1, activation="relu"):super(DecoderLayer, self).__init__()d_ff = d_ff or 4 * d_modelself.self_attention = self_attentionself.cross_attention = cross_attentionself.conv1 = nn.Conv1d(in_channels=d_model, out_channels=d_ff, kernel_size=1)self.conv2 = nn.Conv1d(in_channels=d_ff, out_channels=d_model, kernel_size=1)self.norm1 = nn.LayerNorm(d_model)self.norm2 = nn.LayerNorm(d_model)self.norm3 = nn.LayerNorm(d_model)self.dropout = nn.Dropout(dropout)self.activation = F.relu if activation == "relu" else F.gelu
  • 作用:解码器层包括了自注意力(self_attention)和交叉注意力(cross_attention)。自注意力用于捕捉解码器内部的依赖关系,而交叉注意力用于将编码器的输出与解码器的输入结合。
  • 卷积层和归一化:使用卷积和归一化对注意力层的输出进一步处理。

5. Decoder(解码器)

class Decoder(nn.Module):def __init__(self, layers, norm_layer=None, projection=None):super(Decoder, self).__init__()self.layers = nn.ModuleList(layers)self.norm = norm_layerself.projection = projection
  • 作用:解码器由多个 DecoderLayer 组成。输入首先通过每一层的解码器,结合来自编码器的输出,最后通过归一化和可选的投影层生成最终的输出。

6. 前向传播 (forward) 函数

每个模块都有一个 forward 函数,定义了数据在该模块中的流动过程。

ConvLayer.forward
def forward(self, x):x = self.downConv(x.permute(0, 2, 1))x = self.norm(x)x = self.activation(x)x = self.maxPool(x)x = x.transpose(1, 2)return x
  • 操作顺序
    1. 输入经过一维卷积处理。
    2. 批归一化。
    3. 激活函数 ELU。
    4. 最大池化。
    5. 变换维度并返回输出。
EncoderLayer.forward
def forward(self, x, attn_mask=None):new_x, attn = self.attention(x, x, x, attn_mask=attn_mask)x = x + self.dropout(new_x)y = x = self.norm1(x)y = self.dropout(self.activation(self.conv1(y.transpose(-1, 1))))y = self.dropout(self.conv2(y).transpose(-1, 1))result = self.norm2(x + y)return result, attn
  • 操作顺序
    1. 输入通过注意力层进行自注意力计算。
    2. 经过残差连接和 Dropout。
    3. 进行 LayerNorm 归一化。
    4. 输入通过前馈卷积网络。
    5. 最后输出经过归一化的结果。
DecoderLayer.forward
def forward(self, x, cross, x_mask=None, cross_mask=None):x = x + self.dropout(self.self_attention(x, x, x, attn_mask=x_mask)[0])x = self.norm1(x)x = x + self.dropout(self.cross_attention(x, cross, cross, attn_mask=cross_mask)[0])y = x = self.norm2(x)y = self.dropout(self.activation(self.conv1(y.transpose(-1, 1))))y = self.dropout(self.conv2(y).transpose(-1, 1))return self.norm3(x + y)
  • 操作顺序
    1. 输入通过自注意力计算。
    2. 进行 LayerNorm 归一化。
    3. 再通过交叉注意力与编码器的输出结合。
    4. 经过前馈卷积层,最后输出结果。

总结

  • 卷积层 用于特征提取和降采样。
  • 编码器解码器 使用注意力机制来捕捉不同时间步之间的依赖关系,并通过卷积层进行进一步的特征处理。
  • 前向传播 函数定义了每个模块内部的数据流动和处理逻辑,结合了卷积、注意力机制和正则化。

Embed.py代码:

import torch
import torch.nn as nn
import torch.nn.functional as F
from torch.nn.utils import weight_norm
import mathclass PositionalEmbedding(nn.Module):def __init__(self, d_model, max_len=5000):super(PositionalEmbedding, self).__init__()# Compute the positional encodings once in log space.pe = torch.zeros(max_len, d_model).float()pe.require_grad = Falseposition = torch.arange(0, max_len).float().unsqueeze(1)div_term = (torch.arange(0, d_model, 2).float() * -(math.log(10000.0) / d_model)).exp()pe[:, 0::2] = torch.sin(position * div_term)pe[:, 1::2] = torch.cos(position * div_term)pe = pe.unsqueeze(0)self.register_buffer('pe', pe)def forward(self, x):return self.pe[:, :x.size(1)]class TokenEmbedding(nn.Module):def __init__(self, c_in, d_model):super(TokenEmbedding, self).__init__()padding = 1 if torch.__version__ >= '1.5.0' else 2self.tokenConv = nn.Conv1d(in_channels=c_in, out_channels=d_model,kernel_size=3, padding=padding, padding_mode='circular', bias=False)for m in self.modules():if isinstance(m, nn.Conv1d):nn.init.kaiming_normal_(m.weight, mode='fan_in', nonlinearity='leaky_relu')def forward(self, x):x = self.tokenConv(x.permute(0, 2, 1)).transpose(1, 2)return xclass FixedEmbedding(nn.Module):def __init__(self, c_in, d_model):super(FixedEmbedding, self).__init__()w = torch.zeros(c_in, d_model).float()w.require_grad = Falseposition = torch.arange(0, c_in).float().unsqueeze(1)div_term = (torch.arange(0, d_model, 2).float() * -(math.log(10000.0) / d_model)).exp()w[:, 0::2] = torch.sin(position * div_term)w[:, 1::2] = torch.cos(position * div_term)self.emb = nn.Embedding(c_in, d_model)self.emb.weight = nn.Parameter(w, requires_grad=False)def forward(self, x):return self.emb(x).detach()class TemporalEmbedding(nn.Module):def __init__(self, d_model, embed_type='fixed', freq='h'):super(TemporalEmbedding, self).__init__()minute_size = 4hour_size = 24weekday_size = 7day_size = 32month_size = 13Embed = FixedEmbedding if embed_type == 'fixed' else nn.Embeddingif freq == 't':self.minute_embed = Embed(minute_size, d_model)self.hour_embed = Embed(hour_size, d_model)self.weekday_embed = Embed(weekday_size, d_model)self.day_embed = Embed(day_size, d_model)self.month_embed = Embed(month_size, d_model)def forward(self, x):x = x.long()minute_x = self.minute_embed(x[:, :, 4]) if hasattr(self, 'minute_embed') else 0.hour_x = self.hour_embed(x[:, :, 3])weekday_x = self.weekday_embed(x[:, :, 2])day_x = self.day_embed(x[:, :, 1])month_x = self.month_embed(x[:, :, 0])return hour_x + weekday_x + day_x + month_x + minute_xclass TimeFeatureEmbedding(nn.Module):def __init__(self, d_model, embed_type='timeF', freq='h'):super(TimeFeatureEmbedding, self).__init__()freq_map = {'h': 4, 't': 5, 's': 6, 'm': 1, 'a': 1, 'w': 2, 'd': 3, 'b': 3}d_inp = freq_map[freq]self.embed = nn.Linear(d_inp, d_model, bias=False)def forward(self, x):return self.embed(x)class DataEmbedding(nn.Module):def __init__(self, c_in, d_model, embed_type='fixed', freq='h', dropout=0.1):super(DataEmbedding, self).__init__()self.value_embedding = TokenEmbedding(c_in=c_in, d_model=d_model)self.position_embedding = PositionalEmbedding(d_model=d_model)self.temporal_embedding = TemporalEmbedding(d_model=d_model, embed_type=embed_type,freq=freq) if embed_type != 'timeF' else TimeFeatureEmbedding(d_model=d_model, embed_type=embed_type, freq=freq)self.dropout = nn.Dropout(p=dropout)def forward(self, x, x_mark):x = self.value_embedding(x) + self.temporal_embedding(x_mark) + self.position_embedding(x)return self.dropout(x)class DataEmbedding_wo_pos(nn.Module):def __init__(self, c_in, d_model, embed_type='fixed', freq='h', dropout=0.1):super(DataEmbedding_wo_pos, self).__init__()self.value_embedding = TokenEmbedding(c_in=c_in, d_model=d_model)self.position_embedding = PositionalEmbedding(d_model=d_model)self.temporal_embedding = TemporalEmbedding(d_model=d_model, embed_type=embed_type,freq=freq) if embed_type != 'timeF' else TimeFeatureEmbedding(d_model=d_model, embed_type=embed_type, freq=freq)self.dropout = nn.Dropout(p=dropout)def forward(self, x, x_mark):x = self.value_embedding(x) + self.temporal_embedding(x_mark)return self.dropout(x)class DataEmbedding_wo_pos_temp(nn.Module):def __init__(self, c_in, d_model, embed_type='fixed', freq='h', dropout=0.1):super(DataEmbedding_wo_pos_temp, self).__init__()self.value_embedding = TokenEmbedding(c_in=c_in, d_model=d_model)self.position_embedding = PositionalEmbedding(d_model=d_model)self.temporal_embedding = TemporalEmbedding(d_model=d_model, embed_type=embed_type,freq=freq) if embed_type != 'timeF' else TimeFeatureEmbedding(d_model=d_model, embed_type=embed_type, freq=freq)self.dropout = nn.Dropout(p=dropout)def forward(self, x, x_mark):x = self.value_embedding(x)return self.dropout(x)class DataEmbedding_wo_temp(nn.Module):def __init__(self, c_in, d_model, embed_type='fixed', freq='h', dropout=0.1):super(DataEmbedding_wo_temp, self).__init__()self.value_embedding = TokenEmbedding(c_in=c_in, d_model=d_model)self.position_embedding = PositionalEmbedding(d_model=d_model)self.temporal_embedding = TemporalEmbedding(d_model=d_model, embed_type=embed_type,freq=freq) if embed_type != 'timeF' else TimeFeatureEmbedding(d_model=d_model, embed_type=embed_type, freq=freq)self.dropout = nn.Dropout(p=dropout)def forward(self, x, x_mark):x = self.value_embedding(x) + self.position_embedding(x)return self.dropout(x)

这段代码实现了一系列嵌入层(Embedding Layers),主要用于将输入数据(如时间序列数据)转换为适合模型处理的向量表示。这种嵌入是序列建模中非常重要的一部分,特别是在处理时间序列数据时。

主要模块解释

  1. PositionalEmbedding(位置嵌入)

    • 作用:为每个序列位置编码一个向量,使得模型能够感知序列中每个位置的顺序。常用于 Transformer 模型中,因为这种模型本身不具备位置信息的感知能力。
    • 实现细节
      • 使用正弦和余弦函数为不同位置生成不同的嵌入,位置嵌入向量的奇偶位分别使用 sincos
      • pe 是一个预计算的二维张量,包含最大序列长度(max_len)和每个位置的嵌入(d_model 的维度)。
  2. TokenEmbedding(令牌嵌入)

    • 作用:将输入的数据通过一维卷积进行嵌入。可以理解为将输入的多维数据映射到一个高维的特征空间。
    • 实现细节
      • 使用 1D 卷积将输入通道数(c_in)转换为模型维度(d_model)。
      • 初始化权重时使用 Kaiming 初始化方法,适合 ReLU 和 LeakyReLU 激活函数。
  3. FixedEmbedding(固定嵌入)

    • 作用:通过固定的正弦和余弦函数生成嵌入,类似于位置嵌入,但它是用于固定长度的输入数据。
    • 实现细节
      • 每个位置的奇数位使用 sin,偶数位使用 cos
      • 权重矩阵 w 是固定的,不参与训练。
  4. TemporalEmbedding(时间嵌入)

    • 作用:为时间特征(如小时、天、月份等)生成嵌入,特别适用于处理时间序列数据。
    • 实现细节
      • 支持嵌入不同的时间粒度(如分钟、小时、周几、天、月份等)。
      • 可以选择使用固定嵌入(FixedEmbedding)或可学习嵌入(nn.Embedding)。
  5. TimeFeatureEmbedding(时间特征嵌入)

    • 作用:为特定时间粒度(如小时、天、周等)的数值进行线性嵌入。
    • 实现细节
      • 根据 freq_map 映射不同的时间粒度,并使用线性变换嵌入。
  6. DataEmbedding(数据嵌入)

    • 作用:组合了 TokenEmbeddingPositionalEmbeddingTemporalEmbedding,生成完整的嵌入向量。
    • 实现细节
      • 先将输入数据通过 TokenEmbedding 进行嵌入。
      • 然后结合 TemporalEmbedding 提取时间相关特征。
      • 还会加入位置嵌入(PositionalEmbedding)来提供序列位置信息。
      • 最后通过 Dropout 防止过拟合。
  7. DataEmbedding_wo_pos(无位置嵌入的数据嵌入)

    • 作用:与 DataEmbedding 类似,但不包含位置嵌入。
    • 实现细节
      • 只包含 TokenEmbeddingTemporalEmbedding,省略位置嵌入部分。
  8. DataEmbedding_wo_pos_temp(无位置和时间嵌入的数据嵌入)

    • 作用:只进行 TokenEmbedding,不包括时间和位置嵌入。
    • 实现细节
      • 仅使用 TokenEmbedding 来处理输入数据。
  9. DataEmbedding_wo_temp(无时间嵌入的数据嵌入)

    • 作用:不包括时间嵌入,但仍然使用位置嵌入。
    • 实现细节
      • 包含 TokenEmbeddingPositionalEmbedding,省略了 TemporalEmbedding

嵌入层的整体作用

这些嵌入层的主要作用是将输入的时间序列数据转换成向量表示,这样可以更好地被神经网络模型(如 Transformer 或 CNN)处理。时间序列数据通常包含时间信息(如天、小时等),嵌入层通过提取这些时间信息来增强模型对时间依赖性的感知。通过组合 TokenEmbedding(提取特征)、PositionalEmbedding(位置信息)和 TemporalEmbedding(时间信息),模型可以捕捉到输入数据的复杂时空关系。

dctnet.py代码:

from distutils.command.config import config
import torch.nn as nn
import math
import numpy as np
import torch
try:from torch import irfftfrom torch import rfft
except ImportError:def rfft(x, d):t = torch.fft.fft(x, dim = (-d))r = torch.stack((t.real, t.imag), -1)return rdef irfft(x, d):t = torch.fft.ifft(torch.complex(x[:,:,0], x[:,:,1]), dim = (-d))return t.realdef dct(x, norm=None):"""Discrete Cosine Transform, Type II (a.k.a. the DCT)For the meaning of the parameter `norm`, see:https://docs.scipy.org/doc/scipy-0.14.0/reference/generated/scipy.fftpack.dct.html:param x: the input signal:param norm: the normalization, None or 'ortho':return: the DCT-II of the signal over the last dimension"""x_shape = x.shapeN = x_shape[-1]x = x.contiguous().view(-1, N)v = torch.cat([x[:, ::2], x[:, 1::2].flip([1])], dim=1)# Vc = torch.fft.rfft(v, 1, onesided=False)Vc = rfft(v, 1)k = - torch.arange(N, dtype=x.dtype, device=x.device)[None, :] * np.pi / (2 * N)W_r = torch.cos(k)W_i = torch.sin(k)V = Vc[:, :, 0] * W_r - Vc[:, :, 1] * W_iif norm == 'ortho':V[:, 0] /= np.sqrt(N) * 2V[:, 1:] /= np.sqrt(N / 2) * 2V = 2 * V.view(*x_shape)return V# class senet_block(nn.Module):
#     def __init__(self, channel=512, ratio=1):
#         super(dct_channel_block, self).__init__()
#         self.avg_pool = nn.AdaptiveAvgPool1d(1) #innovation
#         self.fc = nn.Sequential(
#                 nn.Linear(channel, channel // 4, bias=False),
#                 nn.ReLU(inplace=True),
#                 nn.Linear(channel //4, channel, bias=False),
#                 nn.Sigmoid()
#         )#     def forward(self, x):
#         # b, c, l = x.size() # (B,C,L)
#         # y = self.avg_pool(x) # (B,C,L) -> (B,C,1)
#         # print("y",y.shape)
#         x = x.permute(0,2,1)
#         b, c, l = x.size() 
#         y = self.avg_pool(x).view(b, c) # (B,C,L) ->(B,C,1)
#         # print("y",y.shape)
#         # y = self.fc(y).view(b, c, 96)#         y = self.fc(y).view(b,c,1)
#         # print("y",y.shape)
#         # return x * y
#         return (x*y).permute(0,2,1)
class dct_channel_block(nn.Module):def __init__(self, channel):super(dct_channel_block, self).__init__()# self.avg_pool = nn.AdaptiveAvgPool1d(1) #innovationself.fc = nn.Sequential(nn.Linear(channel, channel*2, bias=False),nn.Dropout(p=0.1),nn.ReLU(inplace=True),nn.Linear( channel*2, channel, bias=False),nn.Sigmoid())# self.dct_norm = nn.LayerNorm([512], eps=1e-6)self.dct_norm = nn.LayerNorm([96], eps=1e-6)#for lstm on length-wise# self.dct_norm = nn.LayerNorm([36], eps=1e-6)#for lstm on length-wise on ill with input =36def forward(self, x):b, c, l = x.size() # (B,C,L) (32,96,512)# y = self.avg_pool(x) # (B,C,L) -> (B,C,1)# y = self.avg_pool(x).view(b, c) # (B,C,L) -> (B,C,1)# print("y",y.shape# y = self.fc(y).view(b, c, 96)list = []for i in range(c):freq=dct(x[:,i,:])     # print("freq-shape:",freq.shape)list.append(freq)stack_dct=torch.stack(list,dim=1)stack_dct = torch.tensor(stack_dct)'''for traffic mission:f_weight = self.dct_norm(f_weight.permute(0,2,1))#matters for traffic datasets'''lr_weight = self.dct_norm(stack_dct) lr_weight = self.fc(stack_dct)lr_weight = self.dct_norm(lr_weight) # print("lr_weight",lr_weight.shape)return x *lr_weight #resultif __name__ == '__main__':tensor = torch.rand(8,7,96)dct_model = dct_channel_block()result = dct_model.forward(tensor) print("result.shape:",result.shape)

这段代码实现了基于离散余弦变换(DCT)的通道块,用于增强神经网络中的特征表示。以下是代码的详细解释:

1. 导入库和兼容性处理

from distutils.command.config import config
import torch.nn as nn
import math
import numpy as np
import torch

导入了 PyTorch 的 nn 模块用于神经网络构建,mathnumpy 用于数学运算。

try:from torch import irfftfrom torch import rfft
except ImportError:def rfft(x, d):t = torch.fft.fft(x, dim=(-d))r = torch.stack((t.real, t.imag), -1)return rdef irfft(x, d):t = torch.fft.ifft(torch.complex(x[:, :, 0], x[:, :, 1]), dim=(-d))return t.real

这一段代码用于兼容旧版本的 PyTorch,定义了 rfftirfft 函数。在较新的 PyTorch 版本中,rfftirfft 函数可能已被替换,因此这里使用自定义实现确保代码的兼容性。

2. DCT(离散余弦变换)

def dct(x, norm=None):...

这个函数实现了 离散余弦变换 (DCT),具体是 DCT-II 类型。DCT 是一种频域转换,通常用于信号处理和特征提取,它可以将时域信号转换为频域表示。

主要步骤:
  • 重新排列数据:将输入张量中的偶数和奇数索引部分组合在一起。
  • 进行 FFT(快速傅里叶变换):通过 rfft 函数进行快速傅里叶变换,将信号转换到频域。
  • 正弦和余弦加权:通过 cossin 函数对 FFT 的结果进行加权,得到离散余弦变换的结果。
  • 归一化处理:如果指定 norm='ortho',进行正交归一化。

3. DCT 通道块

class dct_channel_block(nn.Module):def __init__(self, channel):super(dct_channel_block, self).__init__()self.fc = nn.Sequential(nn.Linear(channel, channel * 2, bias=False),nn.Dropout(p=0.1),nn.ReLU(inplace=True),nn.Linear(channel * 2, channel, bias=False),nn.Sigmoid())self.dct_norm = nn.LayerNorm([96], eps=1e-6)

这个类实现了基于 DCT 的通道块,主要用于对输入张量进行变换,增强特征的表达能力。

主要组件:
  • 全连接层(fc:使用了两层全连接层,结合 ReLU 激活函数和 Dropout,来对 DCT 变换后的频域特征进行进一步处理。
  • LayerNorm(dct_norm:用于标准化处理,以稳定训练过程。
前向传播 (forward):
def forward(self, x):b, c, l = x.size() # (B,C,L) (32,96,512)list = []for i in range(c):freq = dct(x[:, i, :])list.append(freq)stack_dct = torch.stack(list, dim=1)stack_dct = torch.tensor(stack_dct)lr_weight = self.dct_norm(stack_dct)lr_weight = self.fc(stack_dct)lr_weight = self.dct_norm(lr_weight)return x * lr_weight
  • 输入x 的形状为 (B, C, L),其中 B 是批量大小,C 是通道数,L 是序列长度。
  • DCT 变换:对于每个通道,计算 DCT,得到频域表示。
  • 处理 DCT 结果:将 DCT 结果通过全连接层和标准化层处理,得到特征权重。
  • 输出:将原始输入与 DCT 处理后的权重相乘,得到最终的输出。

4. 主函数

if __name__ == '__main__':tensor = torch.rand(8, 7, 96)dct_model = dct_channel_block(96)result = dct_model.forward(tensor)print("result.shape:", result.shape)

这部分代码用于测试 dct_channel_block 类:

  • 输入:随机生成一个形状为 (8, 7, 96) 的张量,代表批量大小为 8,通道数为 7,序列长度为 96。
  • 模型实例化:实例化 dct_channel_block,并使用前向传播方法对输入进行处理。
  • 输出:打印处理后的输出张量的形状。

总结

  • DCT 的作用:DCT 可以将时域信号转换为频域信号,在信号处理和特征提取中非常有用。通过这种变换,可以捕捉输入数据中的频率模式,从而增强模型的表达能力。
  • 通道块的作用dct_channel_block 结合 DCT 和全连接层来处理输入特征,并生成频域权重以调整输入特征。
  • 代码的目的:实现了一种结合 DCT 的特征增强机制,旨在通过频域处理提升神经网络在时序数据上的表现。

LSTM.py代码:

from pip import main
import torch
import torch.nn as nn
import torch.nn.functional as F
import numpy as np
import torch
import torch_dct as dct
import numpy as np
import math
from layers.dctnet import dct_channel_block,dct
# from dctnet import dct_channel_block,dct #for parameters calc
import argparse
from fvcore.nn import FlopCountAnalysis,parameter_count_table  device = torch.device("cuda:0" if torch.cuda.is_available() else "cpu")
class Model(nn.Module):#2022.11.7修改前,这个Model能跑通def __init__(self,configs,input_size=None,hidden_size=16, output_size=None,batch_size=None,num_layers=2):#input_size与output_size是通道数super(Model, self).__init__()# self.avg_pool = nn.AdaptiveAvgPool1d(1) #innovationself.seq_len = configs.seq_lenself.pred_len = configs.pred_lenself.input_size = configs.enc_in #channelself.hidden_size = hidden_size #输出维度 也就是输出通道self.num_layers = num_layersself.output_size = configs.enc_in #channel #输出个数self.num_directions = 1 # 单向LSTMself.batch_size = configs.batch_sizeself.lstm = nn.LSTM(self.input_size, self.hidden_size, self.num_layers, batch_first=True)self.linear = nn.Linear(self.hidden_size, self.output_size)#通道数对齐层self.linear_out_len = nn.Linear(self.seq_len, self.pred_len)#输出长度对齐层# self.linear = nn.Linear()#add dce-blockself.dct_layer=dct_channel_block(configs.seq_len)self.dct_norm = nn.LayerNorm([configs.enc_in], eps=1e-6)#作为模块一般normal channel效果好点# self.dct_norm = nn.LayerNorm([512], eps=1e-6)#作为模块一般normal channel效果好点def forward(self, x):# x = x.permute(0,2,1) # (B,L,C)=》(B,C,L)#forL# b, c, l = x.size() # (B,C,L)# batch_size, seq_len = x.shape[0], x.shape[1]# h_0 = torch.randn(self.num_directions * self.num_layers, self.batch_size, self.hidden_size)# c_0 = torch.randn(self.num_directions * self.num_layers, self.batch_size, self.hidden_size)h_0 = torch.randn(self.num_directions * self.num_layers, self.batch_size, self.hidden_size).to(device)c_0 = torch.randn(self.num_directions * self.num_layers, self.batch_size, self.hidden_size).to(device)# output(batch_size, seq_len, num_directions * hidden_size)output, _ = self.lstm(x, (h_0, c_0)) # output(5, 30, 64)# print("output.shape:",output.shape)#result.shape: torch.Size([8, 96, 8])result = self.linear(output)  # (B,L,C)# output.shape: torch.Size([8, 96, 16])#16是hidden_size# result.shape: torch.Size([8, 96, 8])#8是为了符合通道数对齐,exchange_rate有8个变量'''dct'''result = self.dct_layer(result.permute(0,2,1))#加入dct模块,mse降低0.12个点result_len =  self.linear_out_len(result)#为了输出长度对齐 (8,8,96)'''dct'''# result_len =  self.linear_out_len(result.permute(0,2,1))#为了输出长度对齐 (8,8,96)# print("result.shape:",result.shape)#result.shape: torch.Size([8, 96, 8])# result = result.permute(0,2,1)#(B,L,C)=》(B,C,L)# result = self.dct_layer(result_len)#加入dct模块,mse降低0.12个点# result = result.permute(0,2,1)#(B,C,L)=》(B,L,C)return  result_len.permute(0,2,1)
# lstm = LSTM(input_size=7,hidden_size=64, output_size=7,batch_size=8,num_layers=5).to(device)
# tensor = torch.rand(8, 96, 7).to(device)
# result = lstm(tensor)
# print("result.shape:",result.shape)
if __name__ == '__main__': parser = argparse.ArgumentParser(description='Autoformer & Transformer family for Time Series Forecasting')# forecasting taskparser.add_argument('--seq_len', type=int, default=96, help='input sequence length')parser.add_argument('--label_len', type=int, default=48, help='start token length')parser.add_argument('--pred_len', type=int, default=96, help='prediction sequence length')parser.add_argument('--embed_type', type=int, default=0, help='0: default 1: value embedding + temporal embedding + positional embedding 2: value embedding + temporal embedding 3: value embedding + positional embedding 4: value embedding')parser.add_argument('--enc_in', type=int, default=7, help='encoder input size') # DLinear with --individual, use this hyperparameter as the number of channelsparser.add_argument('--dec_in', type=int, default=7, help='decoder input size')parser.add_argument('--c_out', type=int, default=7, help='output size')parser.add_argument('--d_model', type=int, default=512, help='dimension of model')parser.add_argument('--n_heads', type=int, default=8, help='num of heads')parser.add_argument('--e_layers', type=int, default=2, help='num of encoder layers')parser.add_argument('--d_layers', type=int, default=1, help='num of decoder layers')# optimizationparser.add_argument('--num_workers', type=int, default=10, help='data loader num workers')parser.add_argument('--itr', type=int, default=2, help='experiments times')parser.add_argument('--train_epochs', type=int, default=10, help='train epochs')parser.add_argument('--batch_size', type=int, default=32, help='batch size of train input data')parser.add_argument('--patience', type=int, default=3, help='early stopping patience')parser.add_argument('--learning_rate', type=float, default=0.0001, help='optimizer learning rate')parser.add_argument('--des', type=str, default='test', help='exp description')parser.add_argument('--loss', type=str, default='mse', help='loss function')parser.add_argument('--lradj', type=str, default='type1', help='adjust learning rate')parser.add_argument('--use_amp', action='store_true', help='use automatic mixed precision training', default=False)# GPUparser.add_argument('--use_gpu', type=bool, default=True, help='use gpu')parser.add_argument('--gpu', type=int, default=0, help='gpu')parser.add_argument('--use_multi_gpu', action='store_true', help='use multiple gpus', default=False)parser.add_argument('--devices', type=str, default='0,1,2,3', help='device ids of multile gpus')parser.add_argument('--test_flop', action='store_true', default=False, help='See utils/tools for usage')args = parser.parse_args()model = Model(args)print(parameter_count_table(model))

这段代码实现了一个基于 LSTM(长短期记忆网络)和 DCT(离散余弦变换)的时间序列预测模型,并且可以处理不同的模型配置和超参数。这段代码主要用于时间序列数据的预测任务,具体解释如下:

1. 模型类 Model

class Model(nn.Module):def __init__(self, configs, input_size=None, hidden_size=16, output_size=None, batch_size=None, num_layers=2):super(Model, self).__init__()...
  • configs:通过命令行传递的配置参数(如序列长度、批量大小、隐藏层大小等)。
  • input_size:输入的特征数量(通道数),从 configs.enc_in 中获取。
  • hidden_size:LSTM 隐藏层的大小,默认是 16,即 LSTM 输出的特征维度。
  • num_layers:LSTM 网络的层数,默认为 2
  • output_size:模型的输出特征数,从 configs.enc_in 中获取。

该模型的结构包含:

  • 一个 LSTM 层,用于处理输入的时间序列数据。
  • 两个 线性层linearlinear_out_len),用于通道数和序列长度的对齐。
  • 一个基于 DCT 的增强模块 dct_channel_block,用于提取频域特征。

2. LSTM 和线性层

self.lstm = nn.LSTM(self.input_size, self.hidden_size, self.num_layers, batch_first=True)
self.linear = nn.Linear(self.hidden_size, self.output_size)  # 通道数对齐
self.linear_out_len = nn.Linear(self.seq_len, self.pred_len)  # 输出长度对齐
  • LSTM 层:输入维度为 input_size,输出维度为 hidden_size,LSTM 层数为 num_layers
  • 线性层 linear:用于将 LSTM 的输出从 hidden_size 映射回输入的特征数量(通道数)。
  • 线性层 linear_out_len:用于将序列长度从 seq_len 映射到预测的序列长度 pred_len

3. DCT 通道块

self.dct_layer = dct_channel_block(configs.seq_len)
self.dct_norm = nn.LayerNorm([configs.enc_in], eps=1e-6)
  • dct_channel_block:通过 DCT(离散余弦变换)提取频域信息,并对输入特征进行处理。它有助于改善模型对时间序列中的周期性或频域特征的捕捉能力。
  • dct_norm:标准化层,用于将经过 DCT 处理的特征进行归一化。

4. 前向传播函数 forward

def forward(self, x):...h_0 = torch.randn(self.num_directions * self.num_layers, self.batch_size, self.hidden_size).to(device)c_0 = torch.randn(self.num_directions * self.num_layers, self.batch_size, self.hidden_size).to(device)output, _ = self.lstm(x, (h_0, c_0))  # LSTM 前向传播result = self.linear(output)  # 通道数对齐result = self.dct_layer(result.permute(0, 2, 1))  # DCT 模块result_len = self.linear_out_len(result)  # 长度对齐return result_len.permute(0, 2, 1)
  • 初始化隐藏状态和细胞状态h_0c_0 是 LSTM 的初始隐藏状态和细胞状态。
  • LSTM 前向传播:输入 x 经过 LSTM 层,得到 output,并进行通道数对齐。
  • DCT 处理:经过 LSTM 输出后的结果通过 DCT 模块处理,并对频域特征进行操作。
  • 长度对齐:通过 linear_out_len 线性层调整预测序列的长度为 pred_len
  • 输出:返回最终的预测结果,并调整形状。

5. 命令行参数配置

parser = argparse.ArgumentParser(description='Autoformer & Transformer family for Time Series Forecasting')
parser.add_argument('--seq_len', type=int, default=96, help='input sequence length')
parser.add_argument('--pred_len', type=int, default=96, help='prediction sequence length')
parser.add_argument('--enc_in', type=int, default=7, help='encoder input size')
parser.add_argument('--batch_size', type=int, default=32, help='batch size of train input data')
...
args = parser.parse_args()
model = Model(args)
  • seq_len:输入序列的长度。
  • pred_len:预测序列的长度。
  • enc_in:输入特征的通道数。
  • batch_size:批量大小。

通过 argparse 模块,可以通过命令行传入这些参数来控制模型的配置。

6. 模型参数和 FLOPs 计算

from fvcore.nn import FlopCountAnalysis, parameter_count_table  
print(parameter_count_table(model))
  • parameter_count_table:用于计算模型的参数数量。
  • FlopCountAnalysis:用于分析模型的浮点操作(FLOPs)。

这段代码通过 fvcore 库计算模型的参数量,并输出这些信息以便进行模型复杂度的评估。

总结

  • 该模型结合了 LSTMDCT,用于时间序列数据的预测。
  • 通过 DCT 增强模型对频域特征的捕捉能力,有助于提高模型的预测性能。
  • 通过命令行参数,模型的配置可以灵活调整,并且支持多 GPU 训练和 FLOPs 计算。

oneCNN.py代码:

import torch
import torch.nn as nn
import torch.nn.functional as F
import numpy as np
import torch
import torch_dct as dct
import numpy as np
import math
from layers.dctnet import dct_channel_block,dctclass Model(nn.Module):#2022.11.7修改前,这个Model能跑通#CNNdef __init__(self,configs):super(Model, self).__init__()# self.avg_pool = nn.AdaptiveAvgPool1d(1) #innovationself.seq_len = configs.seq_lenself.pred_len = configs.pred_lenself.channel_num = configs.enc_inself.conv1 = nn.Conv1d(in_channels=self.channel_num,out_channels=self.channel_num,kernel_size=5,stride=1,padding=2) #输入通道数和输出通道数应该一样# input = torch.randn(32,7,96)# batch_size x text_len x embedding_size -> batch_size x embedding_size x text_lenself.dct_layer=dct_channel_block(self.seq_len)self.linear = nn.Linear(self.seq_len,self.pred_len)# self.dct_norm = nn.LayerNorm([7], eps=1e-6)#作为模块一般normal channel效果好点def forward(self, x):# print("x.shape:",x.shape)x = x.permute(0,2,1) # (B,L,C)=》(B,C,L)#forL and 1-d conv# b, c, l = x.size() # (B,C,L)out = self.conv1(x) #b,c,lout = self.linear(x)#b,c,l# print(out.size())# out  = self.dct_layer(out)#加入dct模块,mse降低0.12个点# out = self.linear(x)#b,c,l# x = x+mid# x = x.permute(0,2,1) # x = self.dct_norm(x) #norm 144# x = x.permute(0,2,1) return  (out).permute(0,2,1)#b,l,c

这段代码定义了一个基于 CNN(卷积神经网络)DCT(离散余弦变换) 的简单时间序列预测模型。模型通过 1D 卷积和 DCT 增强特征提取,结合线性层对输出进行预测。

代码详解

1. 模型初始化
class Model(nn.Module):def __init__(self, configs):super(Model, self).__init__()self.seq_len = configs.seq_lenself.pred_len = configs.pred_lenself.channel_num = configs.enc_inself.conv1 = nn.Conv1d(in_channels=self.channel_num, out_channels=self.channel_num, kernel_size=5, stride=1, padding=2)self.dct_layer = dct_channel_block(self.seq_len)self.linear = nn.Linear(self.seq_len, self.pred_len)
  • seq_len:输入序列的长度。
  • pred_len:预测序列的长度。
  • channel_num:输入的通道数,代表时间序列的特征数量(例如 7 维度的输入表示有 7 个不同的特征)。
网络组件:
  • conv1:一维卷积层,输入和输出通道数保持一致,卷积核大小为 5,步幅为 1,并使用 padding=2 保证输入和输出序列的长度保持不变。

  • dct_layer:通过 dct_channel_block 提取频域特征,用离散余弦变换增强特征。

  • linear:线性层用于将序列长度从 seq_len 映射到预测长度 pred_len,在输出前对时间序列进行长度调整。

2. 前向传播函数 forward
def forward(self, x):x = x.permute(0, 2, 1)  # 转置输入,调整维度顺序 (B, L, C) -> (B, C, L)out = self.conv1(x)  # 应用一维卷积,提取特征 (B, C, L)out = self.linear(x)  # 应用线性层进行序列长度变换return out.permute(0, 2, 1)  # 再次转置,调整维度回到 (B, L, C)
  • 输入:输入 x 的形状为 (B, L, C),即 B 表示批量大小,L 表示序列长度,C 表示输入的特征维度(通道数)。

  • 转置:在进行 1D 卷积前,先通过 x.permute(0, 2, 1) 将输入从形状 (B, L, C) 转为 (B, C, L),使得输入符合 1D 卷积的要求。

  • 卷积:通过 conv1 提取特征,卷积后维度仍然是 (B, C, L)

  • 线性层:将经过卷积后的特征通过线性层 linear,将序列长度从 seq_len 转换为 pred_len

  • 输出:最后将输出再转置回 (B, L, C),作为最终的预测结果。

可选模块(未激活的部分)

# out  = self.dct_layer(out)  # DCT 层:可选的 DCT 增强模块,用于通过离散余弦变换进一步处理特征。
# out = self.linear(x)  # 可选的线性层,可以用于对卷积后输出再进行进一步处理。
# x = self.dct_norm(x)  # 标准化层,用于标准化 DCT 后的特征。

这些部分暂时被注释掉了,表示 DCT 层和标准化层可能是可选的改进点。在某些情况下,通过 DCT 提取频域特征可以提高模型的性能。

3. 总结

  • 卷积操作:1D 卷积用于捕捉输入时间序列的局部特征。
  • DCT 操作:可选的 DCT 层能够将时域信号转换为频域信号,捕捉输入数据中的周期性或频率特征,从而进一步增强模型的表现。
  • 线性映射:线性层负责调整序列的长度,保证输入和输出长度匹配。

这个模型设计相对简单,但由于使用了卷积操作和 DCT 层,因此在处理具有时空依赖性的数据(如时间序列预测)时有一定优势。


Transformer.py代码:

import torch
import torch.nn as nn
import torch.nn.functional as F
from layers.Transformer_EncDec import Decoder, DecoderLayer, Encoder, EncoderLayer, ConvLayer
from layers.SelfAttention_Family import FullAttention, AttentionLayer
from layers.Embed import DataEmbedding,DataEmbedding_wo_pos,DataEmbedding_wo_temp,DataEmbedding_wo_pos_temp
import numpy as np
from layers.dctnet import dct_channel_block,dctclass Model(nn.Module):"""Vanilla Transformer with O(L^2) complexity"""def __init__(self, configs):super(Model, self).__init__()self.pred_len = configs.pred_lenself.output_attention = configs.output_attention# Embeddingif configs.embed_type == 0:self.enc_embedding = DataEmbedding(configs.enc_in, configs.d_model, configs.embed, configs.freq,configs.dropout)self.dec_embedding = DataEmbedding(configs.dec_in, configs.d_model, configs.embed, configs.freq,configs.dropout)elif configs.embed_type == 1:self.enc_embedding = DataEmbedding(configs.enc_in, configs.d_model, configs.embed, configs.freq,configs.dropout)self.dec_embedding = DataEmbedding(configs.dec_in, configs.d_model, configs.embed, configs.freq,configs.dropout)elif configs.embed_type == 2:self.enc_embedding = DataEmbedding_wo_pos(configs.enc_in, configs.d_model, configs.embed, configs.freq,configs.dropout)self.dec_embedding = DataEmbedding_wo_pos(configs.dec_in, configs.d_model, configs.embed, configs.freq,configs.dropout)elif configs.embed_type == 3:self.enc_embedding = DataEmbedding_wo_temp(configs.enc_in, configs.d_model, configs.embed, configs.freq,configs.dropout)self.dec_embedding = DataEmbedding_wo_temp(configs.dec_in, configs.d_model, configs.embed, configs.freq,configs.dropout)elif configs.embed_type == 4:self.enc_embedding = DataEmbedding_wo_pos_temp(configs.enc_in, configs.d_model, configs.embed, configs.freq,configs.dropout)self.dec_embedding = DataEmbedding_wo_pos_temp(configs.dec_in, configs.d_model, configs.embed, configs.freq,configs.dropout)# Encoderself.encoder = Encoder([EncoderLayer(AttentionLayer(FullAttention(False, configs.factor, attention_dropout=configs.dropout,output_attention=configs.output_attention), configs.d_model, configs.n_heads),configs.d_model,configs.d_ff,dropout=configs.dropout,activation=configs.activation) for l in range(configs.e_layers)],norm_layer=torch.nn.LayerNorm(configs.d_model))# Decoderself.decoder = Decoder([DecoderLayer(AttentionLayer(FullAttention(True, configs.factor, attention_dropout=configs.dropout, output_attention=False),configs.d_model, configs.n_heads),AttentionLayer(FullAttention(False, configs.factor, attention_dropout=configs.dropout, output_attention=False),configs.d_model, configs.n_heads),configs.d_model,configs.d_ff,dropout=configs.dropout,activation=configs.activation,)for l in range(configs.d_layers)],norm_layer=torch.nn.LayerNorm(configs.d_model),projection=nn.Linear(configs.d_model, configs.c_out, bias=True))self.dct_layer=dct_channel_block(512)self.dct_norm = nn.LayerNorm([512], eps=1e-6)def forward(self, x_enc, x_mark_enc, x_dec, x_mark_dec,enc_self_mask=None, dec_self_mask=None, dec_enc_mask=None):enc_out = self.enc_embedding(x_enc, x_mark_enc)enc_out, attns = self.encoder(enc_out, attn_mask=enc_self_mask)# print("enc_out.shape:",enc_out.shape)#enc_out.shape: torch.Size([32, 96, 512])#加入dct模块# mid  = self.dct_layer(enc_out)# enc_out = enc_out+mid# enc_out = self.dct_norm(enc_out) #norm 144dec_out = self.dec_embedding(x_dec, x_mark_dec)dec_out = self.decoder(dec_out, enc_out, x_mask=dec_self_mask, cross_mask=dec_enc_mask)if self.output_attention:return dec_out[:, -self.pred_len:, :], attnselse:return dec_out[:, -self.pred_len:, :]  # [B, L, D]

这段代码实现了一个结合 TransformerDCT(离散余弦变换) 的神经网络模型,主要用于时间序列预测任务。模型结构基于 Transformer 编码器-解码器框架,支持多种嵌入方式,并集成了 DCT 通道块用于增强特征处理。

1. 模型简介

该模型的核心是标准的 Transformer 编码器-解码器结构,具有以下主要模块:

  • 嵌入模块:通过 DataEmbedding 和其变种为输入数据生成嵌入表示。
  • 编码器(Encoder):基于多层自注意力机制的编码器,用于提取输入序列的高维特征。
  • 解码器(Decoder):结合自注意力和交叉注意力的解码器,用于生成预测序列。
  • DCT 模块:用于对编码器输出进行频域增强处理(当前注释掉,作为可选模块)。

2. 嵌入模块

if configs.embed_type == 0:self.enc_embedding = DataEmbedding(configs.enc_in, configs.d_model, configs.embed, configs.freq, configs.dropout)self.dec_embedding = DataEmbedding(configs.dec_in, configs.d_model, configs.embed, configs.freq, configs.dropout)
  • DataEmbedding:为输入数据生成嵌入。嵌入方式由 configs.embed_type 控制,支持五种不同类型的嵌入:
    • 0:默认的嵌入(值、时间、位置嵌入的组合)。
    • 1:与 embed_type=0 相同。
    • 2:没有位置嵌入。
    • 3:没有时间嵌入。
    • 4:没有时间和位置嵌入。

3. 编码器(Encoder)

self.encoder = Encoder([EncoderLayer(AttentionLayer(FullAttention(False, configs.factor, attention_dropout=configs.dropout, output_attention=configs.output_attention),configs.d_model, configs.n_heads),configs.d_model,configs.d_ff,dropout=configs.dropout,activation=configs.activation) for l in range(configs.e_layers)],norm_layer=torch.nn.LayerNorm(configs.d_model)
)
  • Encoder:由多个 EncoderLayer 组成,每层包括自注意力机制和前馈神经网络。
  • FullAttention:注意力机制,False 表示不使用掩码(即标准自注意力)。
  • LayerNorm:标准化层,用于稳定训练。

4. 解码器(Decoder)

self.decoder = Decoder([DecoderLayer(AttentionLayer(FullAttention(True, configs.factor, attention_dropout=configs.dropout, output_attention=False),configs.d_model, configs.n_heads),AttentionLayer(FullAttention(False, configs.factor, attention_dropout=configs.dropout, output_attention=False),configs.d_model, configs.n_heads),configs.d_model,configs.d_ff,dropout=configs.dropout,activation=configs.activation,)for l in range(configs.d_layers)],norm_layer=torch.nn.LayerNorm(configs.d_model),projection=nn.Linear(configs.d_model, configs.c_out, bias=True)
)
  • Decoder:由多个 DecoderLayer 组成,每层结合了自注意力和交叉注意力机制。
  • projection:最后一层是线性层,用于将解码器的输出投影到目标输出维度(configs.c_out)。

5. DCT 通道块

self.dct_layer = dct_channel_block(512)
self.dct_norm = nn.LayerNorm([512], eps=1e-6)
  • dct_channel_block:DCT 模块用于对编码器的输出进行频域变换,但该部分目前被注释掉。
  • dct_norm:LayerNorm 层用于对经过 DCT 的特征进行标准化处理。

6. 前向传播函数

def forward(self, x_enc, x_mark_enc, x_dec, x_mark_dec,enc_self_mask=None, dec_self_mask=None, dec_enc_mask=None):enc_out = self.enc_embedding(x_enc, x_mark_enc)enc_out, attns = self.encoder(enc_out, attn_mask=enc_self_mask)# DCT 模块(可选)# mid  = self.dct_layer(enc_out)# enc_out = enc_out + mid# enc_out = self.dct_norm(enc_out)dec_out = self.dec_embedding(x_dec, x_mark_dec)dec_out = self.decoder(dec_out, enc_out, x_mask=dec_self_mask, cross_mask=dec_enc_mask)if self.output_attention:return dec_out[:, -self.pred_len:, :], attnselse:return dec_out[:, -self.pred_len:, :]
  • 输入

    • x_encx_mark_enc 是编码器的输入和时间标记。
    • x_decx_mark_dec 是解码器的输入和时间标记。
    • enc_self_maskdec_self_maskdec_enc_mask 是可选的注意力掩码。
  • 编码器:通过 enc_embeddingencoder 模块处理编码器输入。

  • DCT:注释掉的 DCT 模块可以对编码器输出进行频域处理。

  • 解码器:通过 dec_embeddingdecoder 模块处理解码器输入,并结合编码器的输出进行预测。

  • 输出

    • 如果 output_attentionTrue,返回解码器的输出和注意力矩阵。
    • 否则,只返回解码器的最终预测。

7. 总结

  • 该模型是一个结合 Transformer 和 DCT 的时间序列预测模型。
  • Transformer 编码器-解码器架构使模型能够捕捉复杂的时间依赖关系。
  • DCT 模块作为可选部分,能够增强模型对频域特征的捕捉。
  • 模型支持多种嵌入方式,通过调整 configs.embed_type 来选择不同的嵌入策略。

Linear.py代码:

import torch
import torch.nn as nn
import torch.nn.functional as F
import numpy as np
import torch
import torch_dct as dctimport numpy as np
import math
# class Model(nn.Module):SENET for ETTmx#     """
#     Just one Linear layer
#     """
#     def __init__(self,configs,channel=7,ratio=1):
#         super(Model, self).__init__()#         self.avg_pool = nn.AdaptiveAvgPool1d(1) #innovation
#         self.fc = nn.Sequential(
#                 nn.Linear(7,14, bias=False),
#                 nn.Dropout(p=0.1),
#                 nn.ReLU(inplace=True) ,
#                 nn.Linear(14,7, bias=False),
#                 nn.Sigmoid()
#         )
#         self.seq_len = configs.seq_len
#         self.pred_len = configs.pred_len#         self.Linear_More_1 = nn.Linear(self.seq_len,self.pred_len * 2)
#         self.Linear_More_2 = nn.Linear(self.pred_len*2,self.pred_len)
#         self.relu = nn.ReLU()
#         self.gelu = nn.GELU()    #         self.drop = nn.Dropout(p=0.1)
#         # Use this line if you want to visualize the weights
#        
#     def forward(self, x):
#         # x: [Batch, Input length, Channel]
#   
#         x = x.permute(0,2,1) # (B,L,C)->(B,C,L)
#         b, c, l = x.size() # (B,C,L)
#         y = self.avg_pool(x).view(b, c) # (B,C,L) #         # np.save('f_weight.npy', f_weight_np)
# #         # np.save('%d f_weight.npy' %epoch, f_weight_np)
#         # print("y",y.shape)
#         # return (x * y).permute(0,2,1)
#         return (z).permute(0,2,1)class my_Layernorm(nn.Module):"""Special designed layernorm for the seasonal part"""def __init__(self, channels):super(my_Layernorm, self).__init__()self.layernorm = nn.LayerNorm(channels)def forward(self, x):x_hat = self.layernorm(x)bias = torch.mean(x_hat, dim=1).unsqueeze(1).repeat(1, x.shape[1], 1)return x_hat - bias
class Model(nn.Module):def __init__(self,configs,channel=96,ratio=1):super(Model, self).__init__()# self.avg_pool = nn.AdaptiveAvgPool1d(1) #innovationself.seq_len = configs.seq_lenself.pred_len = configs.pred_lenself.channel_num = configs.enc_inself.fc = nn.Sequential(nn.Linear(channel, channel*2, bias=False),nn.Dropout(p=0.1),nn.ReLU(inplace=True),nn.Linear( channel*2, channel, bias=False),nn.Sigmoid())self.fc_inverse = nn.Sequential(nn.Linear(channel, channel//2, bias=False),nn.Dropout(p=0.1),nn.ReLU(inplace=True),nn.Linear( channel//2, channel, bias=False),nn.Sigmoid())# self.fc_plot = nn.Linear(channel, channel, bias=False)self.mid_Linear = nn.Linear(self.seq_len, self.seq_len)self.Linear = nn.Linear(self.seq_len, self.pred_len)self.Linear_1 = nn.Linear(self.seq_len, self.pred_len)# self.dct_norm = nn.LayerNorm([self.channel_num], eps=1e-6)self.dct_norm = nn.LayerNorm(self.seq_len, eps=1e-6)# self.my_layer_norm = nn.LayerNorm([96], eps=1e-6)def forward(self, x):x = x.permute(0,2,1) # (B,L,C)=》(B,C,L)#forLb, c, l = x.size() # (B,C,L)list = []for i in range(c):#i represent channel freq=dct.dct(x[:,i,:])     #dct# print("freq-shape:",freq.shape)list.append(freq)stack_dct=torch.stack(list,dim=1) stack_dct = torch.tensor(stack_dct)#(B,L,C)stack_dct = self.dct_norm(stack_dct)#matters for trafficf_weight = self.fc(stack_dct)f_weight = self.dct_norm(f_weight)#matters for traffic#visualization for fecam tensorf_weight_cpu = f_weightf_weight_np = f_weight_cpu.cpu().detach().numpy()np.save('f_weight_weather_wf.npy', f_weight_np)# np.save('%d f_weight.npy' %epoch, f_weight_np)# f_weight = self.dct_norm(f_weight.permute(0,2,1))#matters for traffic# result = self.Linear(x)#forL# f_weight_np = result.cpu().detach().numpy()# np.save('f_weight.npy', f_weight_np)# x = x.permute(0,2,1)# result = self.Linear((x *(f_weight_inverse)))#forL result = self.Linear((x *(f_weight)))#forLreturn  result.permute(0,2,1)

这段代码实现了一个时间序列预测模型,模型结合了 DCT(离散余弦变换)线性层 来对输入的序列进行处理,并输出预测结果。模型在数据处理中引入了频域变换,并对频域特征进行了加权处理。以下是详细的解释:

1. my_Layernorm

class my_Layernorm(nn.Module):"""Special designed layernorm for the seasonal part"""def __init__(self, channels):super(my_Layernorm, self).__init__()self.layernorm = nn.LayerNorm(channels)def forward(self, x):x_hat = self.layernorm(x)bias = torch.mean(x_hat, dim=1).unsqueeze(1).repeat(1, x.shape[1], 1)return x_hat - bias
  • 作用:实现了一种特殊的 LayerNorm 层,它对每个样本进行标准化,并减去每个样本的均值,目的是更好地处理序列中具有季节性或周期性的特征。

2. 模型 Model 初始化

class Model(nn.Module):def __init__(self, configs, channel=96, ratio=1):super(Model, self).__init__()self.seq_len = configs.seq_lenself.pred_len = configs.pred_lenself.channel_num = configs.enc_inself.fc = nn.Sequential(nn.Linear(channel, channel*2, bias=False),nn.Dropout(p=0.1),nn.ReLU(inplace=True),nn.Linear(channel*2, channel, bias=False),nn.Sigmoid())self.fc_inverse = nn.Sequential(nn.Linear(channel, channel//2, bias=False),nn.Dropout(p=0.1),nn.ReLU(inplace=True),nn.Linear(channel//2, channel, bias=False),nn.Sigmoid())self.mid_Linear = nn.Linear(self.seq_len, self.seq_len)self.Linear = nn.Linear(self.seq_len, self.pred_len)self.dct_norm = nn.LayerNorm(self.seq_len, eps=1e-6)
  • fcfc_inverse:两个线性层用于调整输入通道的权重。fc 是增大通道维度的网络,fc_inverse 是缩小通道维度的网络,主要用于特征的重整和处理。
  • mid_LinearLinear:分别用于序列长度的映射。mid_Linear 用于中间的特征处理,Linear 将输入的序列长度从 seq_len 映射到 pred_len
  • dct_norm:用于对经过 DCT 处理的频域特征进行标准化,以确保数据分布稳定,减少模型训练的波动。

3. 前向传播 forward

def forward(self, x):x = x.permute(0, 2, 1)  # 调整输入维度 (B, L, C) -> (B, C, L)b, c, l = x.size()  # 获取输入数据的维度 (Batch, Channel, Length)list = []# 对每个通道的输入数据进行 DCTfor i in range(c):freq = dct.dct(x[:, i, :])  # 对第 i 个通道应用 DCTlist.append(freq)# 将各通道的 DCT 结果堆叠在一起stack_dct = torch.stack(list, dim=1)stack_dct = torch.tensor(stack_dct)# 对 DCT 结果进行 LayerNorm 归一化stack_dct = self.dct_norm(stack_dct)# 通过全连接层对 DCT 结果进行处理f_weight = self.fc(stack_dct)f_weight = self.dct_norm(f_weight)# 保存处理后的权重用于可视化f_weight_cpu = f_weightf_weight_np = f_weight_cpu.cpu().detach().numpy()np.save('f_weight_weather_wf.npy', f_weight_np)# 将原始输入和经过处理的 DCT 权重相乘,并通过线性层进行序列预测result = self.Linear(x * f_weight)  # 结合权重后的输入数据进行预测return result.permute(0, 2, 1)  # 调整输出维度回 (B, L, C)
前向传播步骤:
  1. 维度调整:通过 permute 将输入的维度从 (B, L, C) 转换为 (B, C, L),方便后续进行 1D 操作。
  2. DCT 处理:对每个通道的数据应用 DCT(离散余弦变换),捕捉数据的频域特征。
  3. 堆叠 DCT 结果:将每个通道的 DCT 结果堆叠在一起,形成一个新的张量。
  4. 标准化:通过 LayerNorm 对 DCT 结果进行标准化,确保特征的稳定性。
  5. 全连接层处理:通过全连接层调整 DCT 结果的权重,提取出更高层次的频域特征。
  6. 可视化保存:将处理后的权重保存到 .npy 文件中,用于后续的可视化分析。
  7. 预测结果生成:将输入数据和处理后的权重相乘,然后通过线性层输出预测结果。
  8. 维度恢复:最后将输出的维度恢复为 (B, L, C),与输入数据的形状保持一致。

4. 总结

  • DCT 应用:该模型在每个通道上应用了 DCT,能够提取出输入数据中的频域特征,这对于具有周期性或频率特征的时间序列数据非常有效。
  • 全连接层权重调整:通过全连接层对 DCT 特征进行加权处理,有助于模型学习不同频率成分对预测的影响。
  • 标准化:使用 LayerNorm 保证频域特征在不同通道之间的平衡,避免某些通道的特征在模型中过度放大或缩小。

这个模型结合了 DCT 和神经网络的优势,通过频域特征处理提高了对复杂时间序列数据的预测能力,特别适合具有周期性变化的时间序列任务。


 

run_longExp.py代码:

这段代码是一个基于命令行参数的实验框架,用于时间序列预测任务,使用了 Transformer 系列模型(如 Autoformer、Informer、Transformer)。通过命令行参数,可以灵活调整模型、数据集、超参数等配置,执行训练、测试和预测流程。


参考:

用于时间序列预测的频率增强信道注意力机制(Python代码实现)_fecam-CSDN博客

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.xdnf.cn/news/149624.html

如若内容造成侵权/违法违规/事实不符,请联系一条长河网进行投诉反馈,一经查实,立即删除!

相关文章

赛意SMOM和金蝶云星空接口打通对接实战

赛意SMOM和金蝶云星空接口打通对接实战 对接源平台:赛意SMOM 赛意信息已经发展成为国内企业数字化服务领域最具发展潜力的领军企业之一&#xff0c;聚焦于工业互联网、智能制造、新一代信息技术、数字化转型等领域的技术与商业模式应用&#xff0c;为企业提供高端软件咨询、实施…

成为谷歌开发者专家(GDE)的经历

大家好&#xff0c;我是张海龙(Jason)。经过一年多的准备&#xff0c;GDE申请 终于正式成功通过面试&#xff0c;成为了国内第一位Firebase GDE。下面对整个过程做个总结&#xff0c;希望对大家有所帮助。 1.什么是 GDE&#xff1f; Google Developers上面有详细的说明&#x…

Craft:年度 Mac 应用,卡片式笔记新星

今年的年度 Mac 应用大奖颁给了Craft&#xff0c;这是一款集笔记、文档和个人管理于一体的独特工具。Craft 最大的亮点在于其卡片式的交互设计&#xff0c;这种设计让信息组织变得更加直观且高效。 尽管它仅上线了一年时间&#xff0c;但已经展现出了不输于许多老牌笔记应用的…

前端开发迎来新机会,全栈转型就靠这个!

在如今的开发世界&#xff0c;全栈开发者已成为许多前端开发者的新目标。随着技术的不断演进&#xff0c;前端不再局限于写页面和样式&#xff0c;而是逐渐向后端延伸&#xff0c;甚至触及数据库和云服务。如果你想在职业道路上更进一步&#xff0c;向全栈开发者靠拢&#xff0…

【JavaEE】——多重锁,死锁问题和解决思路

阿华代码&#xff0c;不是逆风&#xff0c;就是我疯&#xff0c;你们的点赞收藏是我前进最大的动力&#xff01;&#xff01;希望本文内容能够帮助到你&#xff01; 目录 一&#xff1a;加锁的“可重入性” 1&#xff1a;问题引入 2&#xff1a;问题分析 3&#xff1a;可重…

springboot快速开发平台使用达梦数据库

1.首先来到DM管理工具 大致流程是&#xff1a;创建表空间&#xff08;用于给新建的用户使用&#xff09;-》创建用户&#xff08;绑定表空间&#xff09; 文件位置 2.创建用户 来到所属角色页面&#xff0c;第一个权限管理员一定要勾上&#xff0c;其他的看情况 3.来到DM数…

Java项目实战II基于SSM的国外摇滚乐队交流和周边售卖系统的设计与实现(开发文档+数据库+源码)

目录 一、前言 二、技术介绍 三、系统实现 四、文档参考 五、核心代码 六、源码获取 全栈码农以及毕业设计实战开发&#xff0c;CSDN平台Java领域新星创作者 一、前言 随着互联网技术的飞速发展&#xff0c;信息传播的广度和深度不断拓展&#xff0c;为各行业的创新发展…

GPIO与MIO控制LED——ZYNQ学习笔记2

一、GPIO简介 ZYNQ 分为 PS 和 PL 两部分&#xff0c;那么器件的引脚&#xff08; Pin&#xff09;资源同样也分成了两部分。 ZYNQ PS 中的外设可以通过 MIO&#xff08; multiplexed I/O&#xff0c;多路复用 I/O&#xff09;模块连接到 PS 端的引脚上&#xff0c;也可以通过 …

Python近红外光谱数据分析

ChatGPT4.0在近红外光谱数据分析、定性/定量分析模型代码自动生成等方面的强大功能&#xff0c;同时更加系统地学习人工智能&#xff08;包括传统机器学习、深度学习等&#xff09;的基础理论&#xff0c;以及具体的代码实现方法掌握ChatGPT4.0在科研工作中的各种使用方法与技巧…

C++学习笔记----8、掌握类与对象(一)---- 对象中的动态内存分配(1)

1、FRIENDS c允许类声明为其它类&#xff0c;其它类的成员函数&#xff0c;或者非成员函数为friend。可以访问protected与private数据成员与成员函数。例如&#xff0c;假设你有两个类Foo与Bar。你可以指定Bar类是Foo类的一个friend&#xff1a; class Foo {friend class Bar;…

C++之哈希 --- 哈希的应用(位图布隆过滤器)

一、位图 1.1 位图的基本概念 在如今网络交通高度发达的时代&#xff0c;网购已经成为我们日常生活中的一部分。没当双11到来&#xff0c;各大平台都会迎来一次网购的高潮。这就会让服务器短时间内获得高达几十亿上百亿的数据&#xff0c;那我们该如何去处理这海量的数据呢&am…

FortiGate 防火墙 DNS 地址转换(DNS Translation)

简介 本例介绍 FortiGate 防火墙 DNS 地址转换&#xff08;DNS Translation&#xff09;配置方法。 一、 网络结构 网络结构如下图&#xff0c;PC1 连接在 FG60B 的 Internal 接口&#xff0c;FG60B 的 Wan1 接口连接 FG80CM 的 DMZ 接口&#xff0c;Wan1 接口开启 DNS 服务…

开发环境搭建之windows和ubuntu系统互传文件

ubuntu和Windows主机之间的文件传输有很多种&#xff0c;安装VMware Tools后&#xff0c;可以设置虚拟机共享文件夹&#xff0c;将Windows主机的文件目录挂载到ubuntu中&#xff0c;实现文件共享。 设置方法如下&#xff0c;点击菜单栏的“虚拟机”&#xff0c;选择“设置”。…

【LLM大模型】如何让大模型更好地进行场景落地?

自ChatGPT模型问世后&#xff0c;在全球范围内掀起了AI新浪潮。 有很多企业和高校也随之开源了一些效果优异的大模型&#xff0c;例如&#xff1a;Qwen系列模型、MiniCPM序列模型、Yi系列模型、ChatGLM系列模型、Llama系列模型、Baichuan系列模型、Deepseek系列模型、Moss模型…

[OPEN SQL] SELECT语句

本次操作使用的数据库表为SCUSTOM&#xff0c;其字段内容如下所示 航班用户(SCUSTOM) 1.SELECT语句 SELECT语句从数据库表中读取必要的数据 1.1 读取一行数据 语法格式 SELECT SINGLE <cols>... WHERE cols&#xff1a;数据库表的字段 从数据库表中读取一条数据可使…

用CPU训练机器学习模型

人工智能最近的成功通常归功于 GPU 的出现和发展。GPU 的架构通常包括数千个多处理器、高速内存、专用张量核心等&#xff0c;特别适合满足人工智能/机器学习工作负载的密集需求。 不幸的是&#xff0c;人工智能开发的快速增长导致对 GPU 的需求激增&#xff0c;使得 GPU 难以…

Java面试篇基础部分- 锁详解

可重入锁 可重入锁也叫作递归锁,是指在同一个线程中,在外层函数获取到该锁之后,内存的递归函数还可以获取到该锁。在Java语言环境下,ReentrantLock和Synchroinzed都是可重入锁的代表。 公平锁与非公平锁 公平锁(Fair Lock)是指在分配锁之前检查是否有线程在排队等待获取…

学习MRI处理过程中搜到的宝藏网站

今天浏览网页查到了一些宝藏网站&#xff0c;正好记录一下&#xff0c;后面搜到好东东再接着填充&#xff0c;方便查阅~ &#xff08;1&#xff09;牛人网站 这个网站是在搜集seed关键词时发现的&#xff0c;用pdf文档记录&#xff0c;可下载查阅&#xff0c;条理清晰&#xf…

ios swift5 UITextView占位字符,记录限制字数

文章目录 截图代码&#xff1a;具体使用代码&#xff1a;CustomTextView 截图 代码&#xff1a;具体使用 scrollView.addSubview(contentTextView)contentTextView.placeholderLabel.text LocalizableManager.localValue("write_comment")contentTextView.maxCharac…

ActiveMQ 的传输协议机制

ActiveMQ 通过网络连接器这种连接机制来实现客户端与服务端之间的通信&#xff0c;ActiveMQ支持的传输协议在activeMQ 安装目录的 conf/activemq.xml中的<transportConnectors>标签之内。 ActiveMQ 支持的 client 端和 broker 端的通讯协议有&#xff1a;TCP、NIO、UDP、…