<output id="qn6qe"></output>

    1. <output id="qn6qe"><tt id="qn6qe"></tt></output>
    2. <strike id="qn6qe"></strike>

      亚洲 日本 欧洲 欧美 视频,日韩中文字幕有码av,一本一道av中文字幕无码,国产线播放免费人成视频播放,人妻少妇偷人无码视频,日夜啪啪一区二区三区,国产尤物精品自在拍视频首页,久热这里只有精品12

      大模型基礎(chǔ)補(bǔ)全計(jì)劃(六)---帶注意力機(jī)制的seq2seq實(shí)例與測試(Bahdanau Attention)

      PS:要轉(zhuǎn)載請注明出處,本人版權(quán)所有。

      PS: 這個只是基于《我自己》的理解,

      如果和你的原則及想法相沖突,請諒解,勿噴。

      環(huán)境說明

      ??無

      前言


      ???本文是這個系列第六篇,它們是:

      ??本文,介紹一下注意力機(jī)制,并在上文的機(jī)翻模型seq2seq的實(shí)例中添加一個簡單的注意力機(jī)制,并看看模型效果是否有提升。





      注意力機(jī)制(Bahdanau Attention)


      ???舉一個例子:在日常生活中,比如我們看一幅黑白畫(畫中有一個紅色的蘋果,其他的都是黑白的物體,例如香蕉),這個時候我們無意識的看一眼畫,很有可能第一個關(guān)注的就是這個紅色的蘋果,但是我有意識的控制眼睛集中去看香蕉,這個時候我關(guān)注的就是香蕉。

      ??在上面的例子中,我們的注意力,最開始是無意識的看蘋果,后面有意識的注意香蕉,這里面的區(qū)別就是我們在這個動作里面加了:意識。當(dāng)加了意識后,我們就可以有選擇的根據(jù)條件來關(guān)注這幅畫的我想關(guān)注的地方。

      ??然后我們可以對上面的現(xiàn)象進(jìn)行建模:\(R=Attention(Q,K)*V\),這里我們將Attention當(dāng)作意識,V當(dāng)作黑白畫的特征,Q是畫中是什么?K是V的標(biāo)簽(你可以把K當(dāng)作是V有關(guān)聯(lián)的部分,不同的K,對應(yīng)的不同的V),如果沒有Attention,R就是蘋果,有了Attention,R就可以是香蕉。

      ??我們回頭想一想上一篇文的seq2seq中,我們的encoder的output是最后一層rnn的所有時間步的隱藏狀態(tài)(num_steps,batch_size,num_hiddens),這里包含了我們的序列數(shù)據(jù)在不同時間步的特征變化,當(dāng)我們在做decoder的時候,我們是拿著這個encoder的最后一層rnn的最后一個時間步的隱藏狀態(tài)(1,batch_size,num_hiddens)來作為context的,是一個固定的值,這樣有幾個問題:

      • 對于長序列來說,context可能丟失信息。
      • 我們從固定context中解碼信息,導(dǎo)致了我們對序列在特定解碼步驟中,對context關(guān)注重點(diǎn)是一樣的。

      ??針對上面seq2seq的問題,Bahdanau設(shè)計(jì)了一種模型,可以解決我們遇到的問題,其定義如下:$$c_{t'} = \sum_{t=1}^{T} \alpha(s_{t'-1}, h_{t})h_{t}$$,看公式我們可以知道,這里定義了Q(decoder的上一次隱藏態(tài)\(s_{t'-1}\))/K(encoder的output的部分\(h_{t}\))/V(encoder的output的部分\(h_{t}\))三個概念,含義就是通過Q+K來計(jì)算一個權(quán)重矩陣W(通過softmax歸一化),然后然后將W和V進(jìn)行計(jì)算,得到了我們通過W關(guān)注到的新的\(V_{new}\),這里的W就是我們的注意力矩陣,代表我們關(guān)注V中的哪些部分。整個計(jì)算過程,就相當(dāng)于我們生成了新成context具備了注意力機(jī)制。





      帶注意力機(jī)制的seq2seq 英文翻譯中文 的實(shí)例


      ?? 下面的代碼和上一篇文章的代碼只有decoder部分有比較大的差別,其他的基本類似。如dataset部分的內(nèi)容,請參考上一篇文章。



      seq2seq完整代碼如下

      ??

      import os
      import random
      import torch
      import math
      from torch import nn
      from torch.nn import functional as F
      import numpy as np
      import time
      import visdom
      import collections
      
      import dataset
      class Accumulator:
          """在n個變量上累加"""
          def __init__(self, n):
              """Defined in :numref:`sec_softmax_scratch`"""
              self.data = [0.0] * n
      
          def add(self, *args):
              self.data = [a + float(b) for a, b in zip(self.data, args)]
      
          def reset(self):
              self.data = [0.0] * len(self.data)
      
          def __getitem__(self, idx):
              return self.data[idx]
          
      class Timer:
          """記錄多次運(yùn)行時間"""
          def __init__(self):
              """Defined in :numref:`subsec_linear_model`"""
              self.times = []
              self.start()
      
          def start(self):
              """啟動計(jì)時器"""
              self.tik = time.time()
      
          def stop(self):
              """停止計(jì)時器并將時間記錄在列表中"""
              self.times.append(time.time() - self.tik)
              return self.times[-1]
      
          def avg(self):
              """返回平均時間"""
              return sum(self.times) / len(self.times)
      
          def sum(self):
              """返回時間總和"""
              return sum(self.times)
      
          def cumsum(self):
              """返回累計(jì)時間"""
              return np.array(self.times).cumsum().tolist()
          
      class Encoder(nn.Module):
          """編碼器-解碼器架構(gòu)的基本編碼器接口"""
          def __init__(self, **kwargs):
              # 調(diào)用父類nn.Module的構(gòu)造函數(shù),確保正確初始化
              super(Encoder, self).__init__(**kwargs)
      
          def forward(self, X, *args):
              # 拋出未實(shí)現(xiàn)錯誤,意味著該方法需要在子類中具體實(shí)現(xiàn)
              raise NotImplementedError
      
      class Decoder(nn.Module):
          """編碼器-解碼器架構(gòu)的基本解碼器接口
      
          Defined in :numref:`sec_encoder-decoder`"""
          def __init__(self, **kwargs):
              # 調(diào)用父類nn.Module的構(gòu)造函數(shù),確保正確初始化
              super(Decoder, self).__init__(**kwargs)
      
          def init_state(self, enc_outputs, *args):
              # 拋出未實(shí)現(xiàn)錯誤,意味著該方法需要在子類中具體實(shí)現(xiàn)
              raise NotImplementedError
      
          def forward(self, X, state):
              # 拋出未實(shí)現(xiàn)錯誤,意味著該方法需要在子類中具體實(shí)現(xiàn)
              raise NotImplementedError
      
      class EncoderDecoder(nn.Module):
          """編碼器-解碼器架構(gòu)的基類
      
          Defined in :numref:`sec_encoder-decoder`"""
          def __init__(self, encoder, decoder, **kwargs):
              # 調(diào)用父類nn.Module的構(gòu)造函數(shù),確保正確初始化
              super(EncoderDecoder, self).__init__(**kwargs)
              # 將傳入的編碼器實(shí)例賦值給類的屬性
              self.encoder = encoder
              # 將傳入的解碼器實(shí)例賦值給類的屬性
              self.decoder = decoder
      
          def forward(self, enc_X, dec_X, enc_X_valid_len, *args):
              # 調(diào)用編碼器的前向傳播方法,處理輸入的編碼器輸入數(shù)據(jù)enc_X
              enc_outputs = self.encoder(enc_X, *args)
              # 調(diào)用解碼器的init_state方法,根據(jù)編碼器的輸出初始化解碼器的狀態(tài)
              dec_state = self.decoder.init_state(enc_outputs, enc_X_valid_len)
              # 調(diào)用解碼器的前向傳播方法,處理輸入的解碼器輸入數(shù)據(jù)dec_X和初始化后的狀態(tài)
              return self.decoder(dec_X, dec_state)
          
      
      def masked_softmax(X, valid_lens):  #@save
          """
          執(zhí)行帶掩碼的 Softmax 操作。
          
          參數(shù):
              X (torch.Tensor): 待 Softmax 的張量,通常是注意力機(jī)制中的“分?jǐn)?shù)”(scores)。
                                其形狀通常為 (批量大小, 查詢數(shù)量/序列長度, 鍵值對數(shù)量/序列長度)。
              valid_lens (torch.Tensor): 序列的有效長度張量。
                                         形狀可以是 (批量大小,) 或 (批量大小, 鍵值對數(shù)量)。
                                         用于指示每個序列的哪個部分是有效的(非填充)。
          
          返回:
              torch.Tensor: 經(jīng)過 Softmax 歸一化且填充部分被忽略的概率分布張量。
          """
          
          # 輔助函數(shù):創(chuàng)建一個序列掩碼,并用特定值覆蓋被掩碼(填充)的元素
          def _sequence_mask(X, valid_len, value=0):
              """
              根據(jù)有效長度(valid_len)創(chuàng)建掩碼,并應(yīng)用于張量 X。
              
              參數(shù):
                  X (torch.Tensor): 形狀為 (批量大小 * 查詢數(shù)量, 最大長度) 的張量。
                  valid_len (torch.Tensor): 形狀為 (批量大小 * 查詢數(shù)量,) 的有效長度向量。
                  value (float): 用于替換被掩碼元素的填充值。
                  
              返回:
                  torch.Tensor: 被填充值覆蓋后的張量 X。
              """
              # 獲取序列的最大長度(張量的第二個維度)
              maxlen = X.size(1)
              
              # 核心掩碼邏輯:
              # 1. torch.arange((maxlen), ...) 創(chuàng)建一個從 0 到 maxlen-1 的序列(代表時間步索引)
              # 2. [None, :] 使其形狀變?yōu)?(1, maxlen),用于廣播
              # 3. valid_len[:, None] 使有效長度形狀變?yōu)?(批量大小 * 查詢數(shù)量, 1),用于廣播
              # 4. < 比較操作:當(dāng)索引 < 有效長度時,結(jié)果為 True(有效元素),否則為 False(填充元素)
              mask = torch.arange((maxlen), dtype=torch.float32,
                                  device=X.device)[None, :] < valid_len[:, None]
              
              # 邏輯非 ~mask 得到填充部分的掩碼(True 表示填充部分)
              # 使用填充值(value,通常是 -1e6)覆蓋填充元素
              X[~mask] = value
              return X
      
          # 1. 處理無需掩碼的情況
          if valid_lens is None:
              # 如果未提供有效長度,則執(zhí)行標(biāo)準(zhǔn) Softmax
              return nn.functional.softmax(X, dim=-1)
          
          # 2. 處理需要掩碼的情況
          else:
              # 備份原始形狀,用于后續(xù)重塑
              shape = X.shape
              
              # 統(tǒng)一 valid_lens 的形狀,使其與 X 的前兩個維度相匹配
              if valid_lens.dim() == 1:
                  # 適用于批量中每個序列只有一個有效長度的情況(例如,K-V 序列是等長的)
                  # 將 valid_lens 重復(fù) shape[1] 次,匹配 X 的查詢/序列長度維度
                  valid_lens = torch.repeat_interleave(valid_lens, shape[1])
              else:
                  # 適用于每個查詢-鍵值對的有效長度都不同的情況
                  # 將 2D 張量展平為 1D 向量
                  valid_lens = valid_lens.reshape(-1)
                  
              # 預(yù)處理 Softmax 輸入:將 X 調(diào)整為 2D 矩陣 (批量*查詢數(shù)量, 鍵值對數(shù)量)
              # 并在最后一個軸(Softmax 軸)上,用一個非常大的負(fù)值替換被掩碼的元素
              # Softmax 時 exp(-1e6) 趨近于 0,從而忽略填充部分。
              X = _sequence_mask(X.reshape(-1, shape[-1]), valid_lens, value=-1e6)
              
              # 對經(jīng)過掩碼處理的 X 執(zhí)行 Softmax
              # 結(jié)果張量 X 被重塑回原始的 3D 形狀 (批量大小, 查詢數(shù)量, 鍵值對數(shù)量)
              # 并在最后一個維度(dim=-1)上進(jìn)行歸一化,得到注意力權(quán)重
              return nn.functional.softmax(X.reshape(shape), dim=-1)
          
      
          
      class AdditiveAttention(nn.Module):  #@save
          """
          加性注意力(Additive Attention)模塊。
          通過將 Query 和 Key 投影到相同的維度后相加,再通過 tanh 激活和線性層計(jì)算注意力分?jǐn)?shù)。
          
          公式核心:score(Q, K) = w_v^T * tanh(W_q*Q + W_k*K)
          """
          
          def __init__(self, num_hiddens, dropout, **kwargs):
              """
              初始化加性注意力模塊。
              
              參數(shù):
                  num_hiddens (int): 隱藏層維度,Q 和 K 投影后的維度。
                  dropout (float): Dropout 率。
              """
              super(AdditiveAttention, self).__init__(**kwargs)
              
              # W_k:將 Key (K) 向量投影到 num_hiddens 維度的線性層
              # 使用 nn.LazyLinear 延遲初始化,直到第一次 forward 傳入 K 的維度
              self.W_k = nn.LazyLinear(num_hiddens, bias=False)
              
              # W_q:將 Query (Q) 向量投影到 num_hiddens 維度的線性層
              # 使用 nn.LazyLinear 延遲初始化
              self.W_q = nn.LazyLinear(num_hiddens, bias=False)
              
              # w_v:將激活后的特征向量 (W_q*Q + W_k*K) 投影成一個標(biāo)量分?jǐn)?shù)(維度為 1)
              # 使用 nn.LazyLinear 延遲初始化
              self.w_v = nn.LazyLinear(1, bias=False)
              
              # Dropout 層,用于防止過擬合
              self.dropout = nn.Dropout(dropout)
      
          def forward(self, queries, keys, values, valid_lens):
              """
              執(zhí)行前向傳播計(jì)算。
              
              參數(shù):
                  queries (torch.Tensor): 查詢向量 Q。形狀通常為 (批量大小, 查詢數(shù)量, 查詢維度)。
                  keys (torch.Tensor): 鍵向量 K。形狀通常為 (批量大小, 鍵值對數(shù)量, 鍵維度)。
                  values (torch.Tensor): 值向量 V。形狀通常為 (批量大小, 鍵值對數(shù)量, 值維度)。
                  valid_lens (torch.Tensor): 鍵值對序列的有效長度,用于掩蓋填充部分。
                  
              返回:
                  torch.Tensor: 注意力加權(quán)后的值向量。形狀為 (批量大小, 查詢數(shù)量, 值維度)。
              """
              # 1. 線性變換:分別對 Q 和 K 進(jìn)行投影
              queries, keys = self.W_q(queries), self.W_k(keys)
              
              # 2. 維度擴(kuò)展與相加(Attention Scoring 的核心步驟)
              # queries.unsqueeze(2): 形狀從 (批量大小, 查詢數(shù)量, num_hiddens) 
              #                       變?yōu)?(批量大小, 查詢數(shù)量, 1, num_hiddens)。
              # keys.unsqueeze(1): 形狀從 (批量大小, 鍵值對數(shù)量, num_hiddens) 
              #                     變?yōu)?(批量大小, 1, 鍵值對數(shù)量, num_hiddens)。
              # 兩個張量通過廣播機(jī)制相加,得到 features,形狀為:
              # (批量大小, 查詢數(shù)量, 鍵值對數(shù)量, num_hiddens)
              features = queries.unsqueeze(2) + keys.unsqueeze(1)
              
              # 3. 激活函數(shù):應(yīng)用 tanh 激活(加性注意力機(jī)制的要求)
              features = torch.tanh(features)
              
              # 4. 投影到標(biāo)量分?jǐn)?shù)
              # self.w_v(features): 將 features 的最后一個維度(num_hiddens)投影成 1。
              # scores.squeeze(-1): 移除最后一個單維度 (1),得到最終的注意力分?jǐn)?shù)張量。
              # 形狀為:(批量大小, 查詢數(shù)量, 鍵值對數(shù)量)
              scores = self.w_v(features).squeeze(-1)
              
              # 5. 歸一化(Softmax):使用帶掩碼的 Softmax 得到注意力權(quán)重
              # 填充部分的得分會被設(shè)置為一個極小的負(fù)值,Softmax 后權(quán)重趨近于 0。
              self.attention_weights = masked_softmax(scores, valid_lens)
              
              # 6. 加權(quán)求和
              # torch.bmm: 批量矩陣乘法 (Batch Matrix Multiplication)。
              # 將 [注意力權(quán)重] (批量, Q數(shù)量, K數(shù)量) 與 [值向量] (批量, K數(shù)量, V維度) 相乘
              # 得到最終的注意力輸出,形狀為:(批量大小, 查詢數(shù)量, 值維度)
              # 在 BMM 之前,對注意力權(quán)重應(yīng)用 Dropout。
              return torch.bmm(self.dropout(self.attention_weights), values)
      
      
      #@save
      class Seq2SeqEncoder(Encoder):
          """用于序列到序列學(xué)習(xí)的循環(huán)神經(jīng)網(wǎng)絡(luò)編碼器"""
          def __init__(self, vocab_size, embed_size, num_hiddens, num_layers,
                       dropout=0, **kwargs):
              super(Seq2SeqEncoder, self).__init__(**kwargs)
              # 嵌入層
              self.embedding = nn.Embedding(vocab_size, embed_size)
              self.rnn = nn.GRU(embed_size, num_hiddens, num_layers,
                                dropout=dropout)
              # self.lstm = nn.LSTM(embed_size, num_hiddens, num_layers)
      
          def forward(self, X, *args):
              # 輸入X.shape = (batch_size,num_steps)
              # 輸出'X'的形狀:(batch_size,num_steps,embed_size)
              X = self.embedding(X)
              # 在循環(huán)神經(jīng)網(wǎng)絡(luò)模型中,第一個軸對應(yīng)于時間步
              X = X.permute(1, 0, 2)
              # 如果未提及狀態(tài),則默認(rèn)為0
              output, state = self.rnn(X)
              # output : 這個返回值是所有時間步的隱藏狀態(tài)序列
              # output的形狀:(num_steps,batch_size,num_hiddens)
              # hn (hidden) : 這是每一層rnn的最后一個時間步的隱藏狀態(tài)
              # state的形狀:(num_layers,batch_size,num_hiddens)
              return output, state
          
      class AttentionDecoder(Decoder):  #@save
          """The base attention-based decoder interface."""
          def __init__(self):
              super().__init__()
      
          @property
          def attention_weights(self):
              raise NotImplementedError
          
      class Seq2SeqAttentionDecoder(AttentionDecoder):
          def __init__(self, vocab_size, embed_size, num_hiddens, num_layers,
                       dropout=0):
              super().__init__()
              self.attention = AdditiveAttention(num_hiddens, dropout)
              self.embedding = nn.Embedding(vocab_size, embed_size)
              self.rnn = nn.GRU(
                  embed_size + num_hiddens, num_hiddens, num_layers,
                  dropout=dropout)
              self.dense = nn.LazyLinear(vocab_size)
              # self.apply(d2l.init_seq2seq)
      
          def init_state(self, enc_outputs, enc_valid_lens):
              # Shape of outputs: (num_steps, batch_size, num_hiddens).
              # Shape of hidden_state: (num_layers, batch_size, num_hiddens)
              outputs, hidden_state = enc_outputs
              return (outputs.permute(1, 0, 2), hidden_state, enc_valid_lens)
      
          def forward(self, X, state):
              # Shape of enc_outputs: (batch_size, num_steps, num_hiddens).
              # Shape of hidden_state: (num_layers, batch_size, num_hiddens)
              enc_outputs, hidden_state, enc_valid_lens = state
              # Shape of the output X: (num_steps, batch_size, embed_size)
              X = self.embedding(X).permute(1, 0, 2)
              outputs, self._attention_weights = [], []
              for x in X:
                  # Shape of query: (batch_size, 1, num_hiddens)
                  query = torch.unsqueeze(hidden_state[-1], dim=1)
                  # Shape of context: (batch_size, 1, num_hiddens)
                  context  = self.attention(
                      query, enc_outputs, enc_outputs, enc_valid_lens)
                  # Concatenate on the feature dimension
                  x = torch.cat((context, torch.unsqueeze(x, dim=1)), dim=-1)
                  # Reshape x as (1, batch_size, embed_size + num_hiddens)
                  out, hidden_state = self.rnn(x.permute(1, 0, 2), hidden_state)
                  outputs.append(out)
                  self._attention_weights.append(self.attention.attention_weights)
              # After fully connected layer transformation, shape of outputs:
              # (num_steps, batch_size, vocab_size)
              outputs = self.dense(torch.cat(outputs, dim=0))
              return outputs.permute(1, 0, 2), [enc_outputs, hidden_state,
                                                enc_valid_lens]
      
          @property
          def attention_weights(self):
              return self._attention_weights
      
          
      def try_gpu(i=0):
          """如果存在,則返回gpu(i),否則返回cpu()
      
          Defined in :numref:`sec_use_gpu`"""
          if torch.cuda.device_count() >= i + 1:
              return torch.device(f'cuda:{i}')
          return torch.device('cpu')
      
      def sequence_mask(X, valid_len, value=0):
          """在序列中屏蔽不相關(guān)的項(xiàng)"""
          maxlen = X.size(1)
          mask = torch.arange((maxlen), dtype=torch.float32,
                              device=X.device)[None, :] < valid_len[:, None]
          X[~mask] = value
          return X
      
      class MaskedSoftmaxCELoss(nn.CrossEntropyLoss):
          """帶遮蔽的softmax交叉熵?fù)p失函數(shù)"""
          # pred的形狀:(batch_size,num_steps,vocab_size)
          # label的形狀:(batch_size,num_steps)
          # valid_len的形狀:(batch_size,)
          def forward(self, pred, label, valid_len):
              weights = torch.ones_like(label)
              weights = sequence_mask(weights, valid_len)
              self.reduction='none'
              unweighted_loss = super(MaskedSoftmaxCELoss, self).forward(
                  pred.permute(0, 2, 1), label)
              weighted_loss = (unweighted_loss * weights).mean(dim=1)
              return weighted_loss
          
      def grad_clipping(net, theta):  #@save
          """裁剪梯度"""
          if isinstance(net, nn.Module):
              params = [p for p in net.parameters() if p.requires_grad]
          else:
              params = net.params
          norm = torch.sqrt(sum(torch.sum((p.grad ** 2)) for p in params))
          if norm > theta:
              for param in params:
                  param.grad[:] *= theta / norm
      
      def train_seq2seq(net, data_iter, lr, num_epochs, tgt_vocab, device):
          """訓(xùn)練序列到序列模型"""
          def xavier_init_weights(m):
              if type(m) == nn.Linear:
                  nn.init.xavier_uniform_(m.weight)
              if type(m) == nn.GRU:
                  for param in m._flat_weights_names:
                      if "weight" in param:
                          nn.init.xavier_uniform_(m._parameters[param])
      
          net.apply(xavier_init_weights)
          net.to(device)
          optimizer = torch.optim.Adam(net.parameters(), lr=lr)
          loss = MaskedSoftmaxCELoss()
          net.train()
          vis = visdom.Visdom(env=u'test1', server="http://127.0.0.1", port=8097)
          animator = vis
          for epoch in range(num_epochs):
              timer = Timer()
              metric = Accumulator(2)  # 訓(xùn)練損失總和,詞元數(shù)量
              for batch in data_iter:
                  #清零(reset)優(yōu)化器中的梯度緩存
                  optimizer.zero_grad()
                  # x.shape = [batch_size, num_steps]
                  X, X_valid_len, Y, Y_valid_len = [x.to(device) for x in batch]
                  # bos.shape = batch_size 個 bos-id
                  bos = torch.tensor([tgt_vocab['<bos>']] * Y.shape[0],
                                device=device).reshape(-1, 1)
                  # dec_input.shape = (batch_size, num_steps)
                  # 解碼器的輸入通常由序列的起始標(biāo)志 bos 和目標(biāo)序列(去掉末尾的部分 Y[:, :-1])組成。
                  dec_input = torch.cat([bos, Y[:, :-1]], 1)  # 強(qiáng)制教學(xué)
                  # Y_hat的形狀:(batch_size,num_steps,vocab_size)
                  Y_hat, _ = net(X, dec_input, X_valid_len)
                  l = loss(Y_hat, Y, Y_valid_len)
                  l.sum().backward()      # 損失函數(shù)的標(biāo)量進(jìn)行“反向傳播”
                  grad_clipping(net, 1)
                  num_tokens = Y_valid_len.sum()
                  optimizer.step()
                  with torch.no_grad():
                      metric.add(l.sum(), num_tokens)
      
              if (epoch + 1) % 10 == 0:
                  # print(predict('你是?'))
                  # print(epoch)
                  # animator.add(epoch + 1, )
      
                  if epoch == 9:
                      # 清空圖表:使用空數(shù)組來替換現(xiàn)有內(nèi)容
                      vis.line(X=np.array([0]), Y=np.array([0]), win='train_ch8', update='replace')
                  # _loss_val = l
                  # _loss_val = _loss_val.cpu().sum().detach().numpy()
                  vis.line(
                      X=np.array([epoch + 1]),
                      Y=[ metric[0] / metric[1]],
                      win='train_ch8',
                      update='append',
                      opts={
                          'title': 'train_ch8',
                          'xlabel': 'epoch',
                          'ylabel': 'loss',
                          'linecolor': np.array([[0, 0, 255]]),  # 藍(lán)色線條
                      }
                  )
          print(f'loss {metric[0] / metric[1]:.3f}, {metric[1] / timer.stop():.1f} '
              f'tokens/sec on {str(device)}')
          torch.save(net.cpu().state_dict(), 'model_h.pt')  # [[6]]
          torch.save(net.cpu(), 'model.pt')  # [[6]]
      
      def predict_seq2seq(net, src_sentence, src_vocab, tgt_vocab, num_steps,
                          device, save_attention_weights=False):
          """序列到序列模型的預(yù)測"""
          # 在預(yù)測時將net設(shè)置為評估模式
          net.eval()
          src_tokens = src_vocab[src_sentence.lower().split(' ')] + [
              src_vocab['<eos>']]
          enc_valid_len = torch.tensor([len(src_tokens)], device=device)
          src_tokens = dataset.truncate_pad(src_tokens, num_steps, src_vocab['<pad>'])
          # 添加批量軸
          enc_X = torch.unsqueeze(
              torch.tensor(src_tokens, dtype=torch.long, device=device), dim=0)
          enc_outputs = net.encoder(enc_X, enc_valid_len)
          dec_state = net.decoder.init_state(enc_outputs, enc_valid_len)
          # 添加批量軸
          dec_X = torch.unsqueeze(torch.tensor(
              [tgt_vocab['<bos>']], dtype=torch.long, device=device), dim=0)
          output_seq, attention_weight_seq = [], []
          for _ in range(num_steps):
              Y, dec_state = net.decoder(dec_X, dec_state)
              # 我們使用具有預(yù)測最高可能性的詞元,作為解碼器在下一時間步的輸入
              dec_X = Y.argmax(dim=2)
              pred = dec_X.squeeze(dim=0).type(torch.int32).item()
              # 保存注意力權(quán)重(稍后討論)
              if save_attention_weights:
                  attention_weight_seq.append(net.decoder.attention_weights[0].reshape(num_steps).cpu())
              # 一旦序列結(jié)束詞元被預(yù)測,輸出序列的生成就完成了
              if pred == tgt_vocab['<eos>']:
                  break
              output_seq.append(pred)
          return ' '.join(tgt_vocab.to_tokens(output_seq)), attention_weight_seq
      
      
      def bleu(pred_seq, label_seq, k):  #@save
          """計(jì)算BLEU"""
          pred_tokens, label_tokens = pred_seq.split(' '), [i for i in label_seq]
          len_pred, len_label = len(pred_tokens), len(label_tokens)
          score = math.exp(min(0, 1 - len_label / len_pred))
          for n in range(1, k + 1):
              num_matches, label_subs = 0, collections.defaultdict(int)
              for i in range(len_label - n + 1):
                  label_subs[' '.join(label_tokens[i: i + n])] += 1
              for i in range(len_pred - n + 1):
                  if label_subs[' '.join(pred_tokens[i: i + n])] > 0:
                      num_matches += 1
                      label_subs[' '.join(pred_tokens[i: i + n])] -= 1
              score *= math.pow(num_matches / (len_pred - n + 1), math.pow(0.5, n))
          return score
      
      
      from matplotlib import pyplot as plt
      import matplotlib
      # from matplotlib_inline import backend_inline
      def show_heatmaps(matrices, xlabel, ylabel, titles=None, figsize=(2.5, 2.5),
                        cmap='Reds'):
          """
          顯示矩陣的熱圖(Heatmaps)。
          這個函數(shù)旨在以子圖網(wǎng)格的形式繪制多個矩陣,通常用于可視化注意力權(quán)重等。
      
          參數(shù):
              matrices (numpy.ndarray 或 torch.Tensor 數(shù)組): 
                  一個四維數(shù)組,形狀應(yīng)為 (num_rows, num_cols, height, width)。
                  其中,num_rows 和 num_cols 決定了子圖網(wǎng)格的布局,
                  height 和 width 是每個熱圖(即每個矩陣)的維度。
              xlabel (str): 
                  所有最底行子圖的 x 軸標(biāo)簽。
              ylabel (str): 
                  所有最左列子圖的 y 軸標(biāo)簽。
              titles (list of str, optional): 
                  一個包含 num_cols 個標(biāo)題的列表,用于設(shè)置每一列子圖的標(biāo)題。默認(rèn) None。
              figsize (tuple, optional): 
                  整個圖形(figure)的大小。默認(rèn) (2.5, 2.5)。
              cmap (str, optional): 
                  用于繪制熱圖的顏色映射(colormap)。默認(rèn) 'Reds'。
          """
          # 導(dǎo)入所需的 matplotlib 模塊,確保圖形在 Jupyter/IPython 環(huán)境中正確顯示為 SVG 格式
          # (假設(shè)在包含這個函數(shù)的環(huán)境中已經(jīng)導(dǎo)入了 matplotlib 的 backend_inline)
          # backend_inline.set_matplotlib_formats('svg')
          matplotlib.use('TkAgg')
          # 從輸入的 matrices 形狀中解構(gòu)出子圖網(wǎng)格的行數(shù)和列數(shù)
          # 假設(shè) matrices 的形狀是 (num_rows, num_cols, height, width)
          num_rows, num_cols, _, _ = matrices.shape
          
          # 創(chuàng)建一個包含多個子圖(axes)的圖形(fig)
          # fig: 整個圖形對象
          # axes: 一個 num_rows x num_cols 的子圖對象數(shù)組
          fig, axes = plt.subplots(
              num_rows, num_cols, 
              figsize=figsize,
              sharex=True,    # 所有子圖共享 x 軸刻度
              sharey=True,    # 所有子圖共享 y 軸刻度
              squeeze=False   # 即使只有一行或一列,也強(qiáng)制返回二維數(shù)組的 axes,方便后續(xù)循環(huán)
          )
          
          # 遍歷子圖的行和對應(yīng)的矩陣行
          # i 是行索引, row_axes 是當(dāng)前行的子圖數(shù)組, row_matrices 是當(dāng)前行的矩陣數(shù)組
          for i, (row_axes, row_matrices) in enumerate(zip(axes, matrices)):
              # 遍歷當(dāng)前行中的子圖和對應(yīng)的矩陣
              # j 是列索引, ax 是當(dāng)前的子圖對象, matrix 是當(dāng)前的待繪矩陣
              for j, (ax, matrix) in enumerate(zip(row_axes, row_matrices)):
                  
                  # 使用 ax.imshow() 繪制熱圖
                  # matrix.detach().numpy():將 PyTorch Tensor 轉(zhuǎn)換為 numpy 數(shù)組,并從計(jì)算圖中分離(如果它是 Tensor)
                  # cmap:指定顏色映射
                  pcm = ax.imshow(matrix.detach().numpy(), cmap=cmap)
                  
                  # --- 設(shè)置軸標(biāo)簽和標(biāo)題 ---
                  
                  # 只有最底行 (i == num_rows - 1) 的子圖才顯示 x 軸標(biāo)簽
                  if i == num_rows - 1:
                      ax.set_xlabel(xlabel)
                      
                  # 只有最左列 (j == 0) 的子圖才顯示 y 軸標(biāo)簽
                  if j == 0:
                      ax.set_ylabel(ylabel)
                      
                  # 如果提供了標(biāo)題列表,則設(shè)置當(dāng)前列的子圖標(biāo)題(所有行共享列標(biāo)題)
                  if titles:
                      ax.set_title(titles[j])
                      
          # --- 添加顏色條(Colorbar) ---
          
          # 為整個圖形添加一個顏色條,用于表示數(shù)值和顏色的對應(yīng)關(guān)系
          # pcm: 之前繪制的第一個熱圖返回的 Colormap 
          # ax=axes: 顏色條將參照整個子圖網(wǎng)格進(jìn)行定位和縮放
          # shrink=0.6: 縮小顏色條的高度/長度,使其只占圖形高度的 60%
          fig.colorbar(pcm, ax=axes, shrink=0.6)
          plt.show()
      
      if __name__ == '__main__':
          embed_size, num_hiddens, num_layers, dropout = 32, 32, 2, 0.1
          batch_size, num_steps = 64, 10
          lr, num_epochs, device = 0.005, 2000, try_gpu()
          # train_iter 每個迭代輸出:(batch_size, num_steps)
          train_iter, src_vocab, tgt_vocab, source, target = dataset.load_data(batch_size, num_steps)
          encoder = Seq2SeqEncoder(len(src_vocab), embed_size, num_hiddens, num_layers,
                              dropout)
          decoder = Seq2SeqAttentionDecoder(len(tgt_vocab), embed_size, num_hiddens, num_layers,
                                  dropout)
          net = EncoderDecoder(encoder, decoder)
      
          is_train = False
          is_show = False
          if is_train:
              train_seq2seq(net, train_iter, lr, num_epochs, tgt_vocab, device)
          elif is_show:
              state_dict = torch.load('model_h.pt')
              net.load_state_dict(state_dict)
              net.to(device)
      
              src_text = "Call us."
              translation, attention_weight_seq = predict_seq2seq(
                      net, src_text, src_vocab, tgt_vocab, num_steps, device, True)
              # attention_weights = torch.eye(10).reshape((1, 1, 10, 10))
              # (num_rows, num_cols, height, width)
              print(f'translation={translation}')
              print(attention_weight_seq)
      
              stacked_tensor = torch.stack(attention_weight_seq, dim=0)
              stacked_tensor = stacked_tensor.unsqueeze(0).unsqueeze(0)
              show_heatmaps(
                  stacked_tensor,
                  xlabel='Attention weight', ylabel='Decode Step')
          else:
              state_dict = torch.load('model_h.pt')
              net.load_state_dict(state_dict)
              net.to(device)
              C = 0
              C1 = 0
              for i in range(2000):
                  # print(source[i])
                  # print(target[i])
                  translation, attention_weight_seq = predict_seq2seq(
                      net, source[i], src_vocab, tgt_vocab, num_steps, device)
                  
                  score = bleu(translation, target[i], k=2)
                  if score > 0.0:
                      C = C + 1
                      if score > 0.8:
                          C1 = C1 + 1
                      print(f'{source[i]} => {translation}, bleu {score:.3f}')
      
              print(f'Counter(bleu > 0) = {C}')
              print(f'Valid-Counter(bleu > 0.8) = {C1}')
      

      ??下面是encoder過程的簡單分析:

      1. 將x通過nn.Embedding得到了(batch_size,num_steps,embed_size)的輸入嵌入向量。
      2. 將嵌入向量傳給nn.GRU,得到了兩個輸出,并返回:
        • output,最后一層rnn的所有時間步的隱藏狀態(tài)(num_steps,batch_size,num_hiddens)。
        • h_n,所有rnn層的,最后一個時間步的隱藏狀態(tài)(num_layers,batch_size,num_hiddens)。

      ??下面是decoder過程的簡單分析:

      1. 將decoder_x通過nn.Embedding得到了(batch_size,num_steps,embed_size)的輸入嵌入向量。
      2. 將嵌入向量沿著num_steps進(jìn)行單步運(yùn)行,每一步經(jīng)過Attention過程,得到最終的output,以及最后一個時間步的所有rnn層的h_n,每一步執(zhí)行如下步驟:
        • 將rnn最后一層的隱藏態(tài)作為Q(第一次Q是來自于encoder,后續(xù)都是decoder的每一次運(yùn)行過程產(chǎn)生的隱藏態(tài))
        • 將encoder的output作為K,V,得到當(dāng)前動態(tài)的上下文 context
        • 將decoder_x_step 和 context進(jìn)行組合,得到decoder_x_step_new
        • 將decoder_x_step_new送入nn.GRU,得到當(dāng)前時間步的output, h_t
        • 將每一步的output收集起來作為輸出,將h_t作為下一個時間步的Q循環(huán)起來
      3. 將所有的output經(jīng)過nn.LazyLinear 映射為(num_steps, batch_size, vocab_size),并和h_t返回

      ??和原版本的seq2seq進(jìn)行對比可知:

      • 在原版中,我們的decoder依賴于一個固定的enc_outputs進(jìn)行循環(huán)解碼
      • 在新版中,我們的decoder每次界面,都會有一個Q(第一次是來至于encoder,后續(xù)都是decoder的每一次運(yùn)行過程產(chǎn)生的隱藏態(tài))來計(jì)算enc_outputs的權(quán)重分?jǐn)?shù),然后根據(jù)權(quán)重分?jǐn)?shù)得到一個動態(tài)的enc_outputs,這樣可以讓解碼器每一步都關(guān)注enc_outputs中的不同的重點(diǎn)。

      ??attention weight 的解釋:

      • 把Encoder Output(num_steps,1,num_hiddens)作為K,V
      • 將Decoder的隱藏態(tài)h_t(1,1,num_hiddens)(初始值來自于Encoder的隱藏態(tài)h_t)作為Q,計(jì)算出當(dāng)前step的attention_weight,其是一個softmax概率數(shù)據(jù)。
      • 然后將attention_weight 與 V進(jìn)行計(jì)算,代表模型當(dāng)前關(guān)注EncoderOutput的那部分?jǐn)?shù)據(jù),得到新的Context

      ??下面是訓(xùn)練和測試的一些結(jié)果

      rep_img
      rep_img

      ??從上面的圖可以看到,這個模型有一定的翻譯效果,并且,比上一篇文章的模型效果要好一點(diǎn)。

      ??此外,下面是我們翻譯:"Call us."-> "聯(lián) 系 我 們 。" 的attention weight的可視化(帶mask=3,在不同的decode step中權(quán)重變化。)

      rep_img

      ??從上面的圖可以知道,每一個decode step的注意力權(quán)重矩陣值都不一樣,意味著,每一步解碼的時候,關(guān)注的內(nèi)容也不一樣。





      后記


      ??本文引入了注意力機(jī)制,及注意力機(jī)制在seq2seq中,在應(yīng)用注意力機(jī)制后,和原版的seq2seq的結(jié)論相比,模型效果有提升。

      參考文獻(xiàn)




      打賞、訂閱、收藏、丟香蕉、硬幣,請關(guān)注公眾號(攻城獅的搬磚之路)
      qrc_img

      PS: 請尊重原創(chuàng),不喜勿噴。

      PS: 要轉(zhuǎn)載請注明出處,本人版權(quán)所有。

      PS: 有問題請留言,看到后我會第一時間回復(fù)。

      posted on 2025-11-02 11:08  SkyOnSky  閱讀(78)  評論(0)    收藏  舉報

      導(dǎo)航

      主站蜘蛛池模板: 国产午夜精品福利免费不| 久久se精品一区精品二区国产| 国产午夜影视大全免费观看| 国产精品天堂蜜av在线播放| 欧美日产国产精品日产| 亚洲黄日本午夜一区二区| 内射干少妇亚洲69XXX| 国产精品白丝久久av网站| 日本道高清一区二区三区| 精品无码国产污污污免费| 国产婷婷精品av在线| 国模无吗一区二区二区视频| 久久99精品国产麻豆婷婷| av无码一区二区大桥久未| 久久久久香蕉国产线看观看伊| 99国产精品久久久久久久日本竹 | 日韩无码视频网站| 精品不卡一区二区三区| 亚洲成人av综合一区| 午夜免费无码福利视频麻豆| 亚洲性日韩精品一区二区| 日本一区二区三区四区黄色| 罗田县| 日本高清视频在线www色| 一本无码在线观看| 成人区人妻精品一区二蜜臀| 一二三四中文字幕日韩乱码| 九九热精品在线视频观看| 免费无遮挡无码视频网站| 日本高清视频网站www| 中文字幕无码免费久久| 亚洲av无码精品蜜桃| 少妇av一区二区三区无码| 亚洲中文字幕在线精品一区| 青青草无码免费一二三区| 少妇人妻偷人精品系列| 欧洲lv尺码大精品久久久| 精品午夜福利在线视在亚洲| 男女男免费视频网站国产| 风韵丰满熟妇啪啪区老熟熟女| 人妻蜜臀久久av不卡|