<output id="qn6qe"></output>

    1. <output id="qn6qe"><tt id="qn6qe"></tt></output>
    2. <strike id="qn6qe"></strike>

      亚洲 日本 欧洲 欧美 视频,日韩中文字幕有码av,一本一道av中文字幕无码,国产线播放免费人成视频播放,人妻少妇偷人无码视频,日夜啪啪一区二区三区,国产尤物精品自在拍视频首页,久热这里只有精品12

      大模型基礎補全計劃(四)---LSTM的實例與測試(RNN的改進)

      PS:要轉載請注明出處,本人版權所有。

      PS: 這個只是基于《我自己》的理解,

      如果和你的原則及想法相沖突,請諒解,勿噴。

      環境說明

      ??無

      前言


      ???本文是這個系列第四篇,它們是:

      ???上文我們提到了RNN這種處理序列信息的網絡結構,今天我們將會提到RNN的改進版本之一的網絡結構:LSTM。注意在transformer結構出來之前,RNN還有很多的改進結構,畢竟這是一個大的研究方向。





      LSTM (long short-term memory) 簡介


      ??



      LSTM的意義

      ??我們首先來想一想RNN的結構,很樸素的理解:RNN有兩個輸入,一個是當前輸入,一個是上一次隱藏參數輸入。如果我們從時間線來看,對于早期的輸入\(X_{t-n}\)來說,由于隱藏參數一層層迭代和傳遞,對于\(X_t\)的影響非常的弱。此外,相對的,對于\(X_{t-1}\)來說,其對\(X_t\)的影響非常的強,如果\(X_{t-1}\)信息不完整,可能會影響輸出。

      ??為了解決上面RNN結構遇到的問題,提出了LSTM結構。



      LSTM的結構介紹

      ??首先我們來看看其結構圖如下:

      rep_img

      注:此圖來自于 https://zh.d2l.ai/chapter_recurrent-modern/lstm.html ,若侵權,聯系刪之。

      ??其有如下的一些內容:

      • 有三個輸入:輸入\(X_t\),隱藏參數\(H_{t-1}\),記憶\(C_{t-1}\)

      • 有三個門:輸入門 \(I_t = \sigma(X_tW_{xi} + H_{t-1}W_{hi} + b_i)\) , 遺忘門 \(F_t = \sigma(X_tW_{xf} + H_{t-1}W_{hf} + b_f)\),輸出門 \(O_t = \sigma(X_tW_{xo} + H_{t-1}W_{ho} + b_o)\)

      • 有一個候選記憶元 \(\widetilde{C_t} = tanh(X_tW_{xc} + H_{t-1}W_{hc} + b_c)\)

      • 有一個記憶元\(C_t\),其含義很簡單,有多少記憶來自于\(\widetilde{C_t}\) ,然后由輸入門 \(I_t\)控制多少候選記憶元進入新記憶中,由 遺忘門 遺忘門 \(F_t\) 來控制多少以前的記憶\(C_{t-1}\)進入新的記憶中。其公式為:\(C_t = F_t \odot C_{t-1} + I_t \odot \widetilde{C_t}\)

      • 有三個輸出:輸出門 \(O_t\),記憶元 \(C_t\),隱藏態\(H_t = O_t \odot tanh(C_t)\)

      ??總的來說,就是給隱藏參數加入了記憶參數,并可以通過記憶影響隱藏參數。





      基于LSTM訓練一個簡單的文字序列輸出模型


      ??對于文本預處理、數據集構造、訓練框架搭建詳見前文《大模型基礎補全計劃(三)---RNN實例與測試》

      ??下面是構建LSTM的網絡結構,首先我們手動來構建網絡:

      
      def get_lstm_params(vocab_size, num_hiddens, device):
          num_inputs = num_outputs = vocab_size
      
          def normal(shape):
              return torch.randn(size=shape, device=device)*0.01
      
          def three():
              return (normal((num_inputs, num_hiddens)),
                      normal((num_hiddens, num_hiddens)),
                      torch.zeros(num_hiddens, device=device))
      
          W_xi, W_hi, b_i = three()  # 輸入門參數
          W_xf, W_hf, b_f = three()  # 遺忘門參數
          W_xo, W_ho, b_o = three()  # 輸出門參數
          W_xc, W_hc, b_c = three()  # 候選記憶元參數
      
          # 輸出層參數
          W_hq = normal((num_hiddens, num_outputs))
          b_q = torch.zeros(num_outputs, device=device)
      
          # 附加梯度
          params = [W_xi, W_hi, b_i, W_xf, W_hf, b_f, W_xo, W_ho, b_o, W_xc, W_hc,
                    b_c, W_hq, b_q]
          for param in params:
              param.requires_grad_(True)
          return params
      
      def init_lstm_state(batch_size, num_hiddens, device):
          return (torch.zeros((batch_size, num_hiddens), device=device),
                  torch.zeros((batch_size, num_hiddens), device=device))
      
      def lstm(inputs, state, params):
          [W_xi, W_hi, b_i, W_xf, W_hf, b_f, W_xo, W_ho, b_o, W_xc, W_hc, b_c,
           W_hq, b_q] = params
          (H, C) = state
          outputs = []
          for X in inputs:
              I = torch.sigmoid((X @ W_xi) + (H @ W_hi) + b_i)
              F = torch.sigmoid((X @ W_xf) + (H @ W_hf) + b_f)
              O = torch.sigmoid((X @ W_xo) + (H @ W_ho) + b_o)
              C_tilda = torch.tanh((X @ W_xc) + (H @ W_hc) + b_c)
              C = F * C + I * C_tilda
              H = O * torch.tanh(C)
              Y = (H @ W_hq) + b_q
              outputs.append(Y)
          return torch.cat(outputs, dim=0), (H, C)
      

      ??然后是通過torch框架來設計網絡:

      
      lstm_layer = nn.LSTM(num_inputs, num_hiddens)
      
      

      ??最后是完整的訓練代碼:

      import os
      import random
      import torch
      import math
      from torch import nn
      from torch.nn import functional as F
      import numpy as np
      import time
      import visdom
      import sys
      
      sys.path.append('.')
      import dateset
      class Accumulator:
          """在n個變量上累加"""
          def __init__(self, n):
              """Defined in :numref:`sec_softmax_scratch`"""
              self.data = [0.0] * n
      
          def add(self, *args):
              self.data = [a + float(b) for a, b in zip(self.data, args)]
      
          def reset(self):
              self.data = [0.0] * len(self.data)
      
          def __getitem__(self, idx):
              return self.data[idx]
          
      class Timer:
          """記錄多次運行時間"""
          def __init__(self):
              """Defined in :numref:`subsec_linear_model`"""
              self.times = []
              self.start()
      
          def start(self):
              """啟動計時器"""
              self.tik = time.time()
      
          def stop(self):
              """停止計時器并將時間記錄在列表中"""
              self.times.append(time.time() - self.tik)
              return self.times[-1]
      
          def avg(self):
              """返回平均時間"""
              return sum(self.times) / len(self.times)
      
          def sum(self):
              """返回時間總和"""
              return sum(self.times)
      
          def cumsum(self):
              """返回累計時間"""
              return np.array(self.times).cumsum().tolist()
          
      # 以num_steps為步長,從隨機的起始位置開始,返回
      # x1=[ [random_offset1:random_offset1 + num_steps], ... , [random_offset_batchsize:random_offset_batchsize + num_steps] ]
      # y1=[ [random_offset1 + 1:random_offset1 + num_steps + 1], ... , [random_offset_batchsize + 1:random_offset_batchsize + num_steps + 1] ]
      def seq_data_iter_random(corpus, batch_size, num_steps):  #@save
          """使用隨機抽樣生成一個小批量子序列"""
          # 從隨機偏移量開始對序列進行分區,隨機范圍包括num_steps-1
          corpus = corpus[random.randint(0, num_steps - 1):]
          # 減去1,是因為我們需要考慮標簽
          num_subseqs = (len(corpus) - 1) // num_steps
          # 長度為num_steps的子序列的起始索引
          # [0, num_steps*1, num_steps*2, num_steps*3, ...]
          initial_indices = list(range(0, num_subseqs * num_steps, num_steps))
          # 在隨機抽樣的迭代過程中,
          # 來自兩個相鄰的、隨機的、小批量中的子序列不一定在原始序列上相鄰
          random.shuffle(initial_indices)
      
          def data(pos):
              # 返回從pos位置開始的長度為num_steps的序列
              return corpus[pos: pos + num_steps]
      
          num_batches = num_subseqs // batch_size
          for i in range(0, batch_size * num_batches, batch_size):
              # 在這里,initial_indices包含子序列的隨機起始索引
              initial_indices_per_batch = initial_indices[i: i + batch_size]
              X = [data(j) for j in initial_indices_per_batch]
              Y = [data(j + 1) for j in initial_indices_per_batch]
              yield torch.tensor(X), torch.tensor(Y)
      
      # 以num_steps為步長,從隨機的起始位置開始,返回
      # x1=[:, random_offset1:random_offset1 + num_steps]
      # y1=[:, random_offset1 + 1:random_offset1 + num_steps + 1]
      
      def seq_data_iter_sequential(corpus, batch_size, num_steps):  #@save
          """使用順序分區生成一個小批量子序列"""
          # 從隨機偏移量開始劃分序列
          offset = random.randint(0, num_steps)
          num_tokens = ((len(corpus) - offset - 1) // batch_size) * batch_size
          # 重新根據corpus建立X_corpus, Y_corpus,兩者之間差一位。注意X_corpus, Y_corpus的長度是batch_size的整數倍
          Xs = torch.tensor(corpus[offset: offset + num_tokens])
          Ys = torch.tensor(corpus[offset + 1: offset + 1 + num_tokens])
      
          # 直接根據batchsize劃分X_corpus, Y_corpus
          Xs, Ys = Xs.reshape(batch_size, -1), Ys.reshape(batch_size, -1)
          # 計算出需要多少次才能取完數據
          num_batches = Xs.shape[1] // num_steps
          for i in range(0, num_steps * num_batches, num_steps):
              X = Xs[:, i: i + num_steps]
              Y = Ys[:, i: i + num_steps]
              yield X, Y
      
      
      class SeqDataLoader:  #@save
          """加載序列數據的迭代器"""
          def __init__(self, batch_size, num_steps, use_random_iter, max_tokens):
              if use_random_iter:
                  self.data_iter_fn = seq_data_iter_random
              else:
                  self.data_iter_fn = seq_data_iter_sequential
              self.corpus, self.vocab = dateset.load_dataset(max_tokens)
              self.batch_size, self.num_steps = batch_size, num_steps
      
          def __iter__(self):
              return self.data_iter_fn(self.corpus, self.batch_size, self.num_steps)
          
      def load_data_epoch(batch_size, num_steps,  #@save
                                 use_random_iter=False, max_tokens=10000):
          """返回時光機器數據集的迭代器和詞表"""
          data_iter = SeqDataLoader(
              batch_size, num_steps, use_random_iter, max_tokens)
          return data_iter, data_iter.vocab
      
      
      
      def get_lstm_params(vocab_size, num_hiddens, device):
          num_inputs = num_outputs = vocab_size
      
          def normal(shape):
              return torch.randn(size=shape, device=device)*0.01
      
          def three():
              return (normal((num_inputs, num_hiddens)),
                      normal((num_hiddens, num_hiddens)),
                      torch.zeros(num_hiddens, device=device))
      
          W_xi, W_hi, b_i = three()  # 輸入門參數
          W_xf, W_hf, b_f = three()  # 遺忘門參數
          W_xo, W_ho, b_o = three()  # 輸出門參數
          W_xc, W_hc, b_c = three()  # 候選記憶元參數
      
          # 輸出層參數
          W_hq = normal((num_hiddens, num_outputs))
          b_q = torch.zeros(num_outputs, device=device)
      
          # 附加梯度
          params = [W_xi, W_hi, b_i, W_xf, W_hf, b_f, W_xo, W_ho, b_o, W_xc, W_hc,
                    b_c, W_hq, b_q]
          for param in params:
              param.requires_grad_(True)
          return params
      
      def init_lstm_state(batch_size, num_hiddens, device):
          return (torch.zeros((batch_size, num_hiddens), device=device),
                  torch.zeros((batch_size, num_hiddens), device=device))
      
      def lstm(inputs, state, params):
          [W_xi, W_hi, b_i, W_xf, W_hf, b_f, W_xo, W_ho, b_o, W_xc, W_hc, b_c,
           W_hq, b_q] = params
          (H, C) = state
          outputs = []
          for X in inputs:
              I = torch.sigmoid((X @ W_xi) + (H @ W_hi) + b_i)
              F = torch.sigmoid((X @ W_xf) + (H @ W_hf) + b_f)
              O = torch.sigmoid((X @ W_xo) + (H @ W_ho) + b_o)
              C_tilda = torch.tanh((X @ W_xc) + (H @ W_hc) + b_c)
              C = F * C + I * C_tilda
              H = O * torch.tanh(C)
              Y = (H @ W_hq) + b_q
              outputs.append(Y)
          return torch.cat(outputs, dim=0), (H, C)
      
      def try_gpu(i=0):
          """如果存在,則返回gpu(i),否則返回cpu()
      
          Defined in :numref:`sec_use_gpu`"""
          if torch.cuda.device_count() >= i + 1:
              return torch.device(f'cuda:{i}')
          return torch.device('cpu')
      
      
      #@save
      class RNNModel(nn.Module):
          """循環神經網絡模型"""
          def __init__(self, rnn_layer, vocab_size, device, **kwargs):
              super(RNNModel, self).__init__(**kwargs)
              self.rnn = rnn_layer
              self.vocab_size = vocab_size
              self.num_hiddens = self.rnn.hidden_size
              # 如果RNN是雙向的(之后將介紹),num_directions應該是2,否則應該是1
              if not self.rnn.bidirectional:
                  self.num_directions = 1
                  self.linear = nn.Linear(self.num_hiddens, self.vocab_size, device=device)
              else:
                  self.num_directions = 2
                  self.linear = nn.Linear(self.num_hiddens * 2, self.vocab_size, device=device)
      
          def forward(self, inputs, state):
              X = F.one_hot(inputs.T.long(), self.vocab_size)
              X = X.to(torch.float32)
              Y, state = self.rnn(X, state)
              # 全連接層首先將Y的形狀改為(時間步數*批量大小,隱藏單元數)
              # 它的輸出形狀是(時間步數*批量大小,詞表大小)。
              output = self.linear(Y.reshape((-1, Y.shape[-1])))
              return output, state
      
          def begin_state(self, device, batch_size=1):
              if not isinstance(self.rnn, nn.LSTM):
                  # nn.GRU以張量作為隱狀態
                  return  torch.zeros((self.num_directions * self.rnn.num_layers,
                                       batch_size, self.num_hiddens),
                                      device=device)
              else:
                  # nn.LSTM以元組作為隱狀態
                  return (torch.zeros((
                      self.num_directions * self.rnn.num_layers,
                      batch_size, self.num_hiddens), device=device),
                          torch.zeros((
                              self.num_directions * self.rnn.num_layers,
                              batch_size, self.num_hiddens), device=device))
      
      
      class RNNModelScratch: #@save
          """從零開始實現的循環神經網絡模型"""
          def __init__(self, vocab_size, num_hiddens, device,
                       get_params, init_state, forward_fn):
              self.vocab_size, self.num_hiddens = vocab_size, num_hiddens
              # 初始化了隱藏參數 W_xh, W_hh, b_h,  W_hq, b_q
              self.params = get_params(vocab_size, num_hiddens, device)
              self.init_state, self.forward_fn = init_state, forward_fn
      
          def __call__(self, X, state):
              # X的形狀:(batch_size, num_steps)
              # X one_hot之后的形狀:(num_steps,batch_size,詞表大小)
              X = F.one_hot(X.T, self.vocab_size).type(torch.float32)
              return self.forward_fn(X, state, self.params)
      
          def begin_state(self, batch_size, device):
              return self.init_state(batch_size, self.num_hiddens, device)
      
      def predict_ch8(prefix, num_preds, net, vocab, device):  #@save
          """在prefix后面生成新字符"""
          state = net.begin_state(batch_size=1, device=device)
          outputs = [vocab[prefix[0]]]
          get_input = lambda: torch.tensor([outputs[-1]], device=device).reshape((1, 1))
          for y in prefix[1:]:  # 預熱期
              _, state = net(get_input(), state)
              outputs.append(vocab[y])
          for _ in range(num_preds):  # 預測num_preds步
              # y 包含從開始到現在的所有輸出
              # state是當前計算出來的隱藏參數
              y, state = net(get_input(), state)
              outputs.append(int(y.argmax(dim=1).reshape(1)))
          return ''.join([vocab.idx_to_token[i] for i in outputs])
      
      def grad_clipping(net, theta):  #@save
          """裁剪梯度"""
          if isinstance(net, nn.Module):
              params = [p for p in net.parameters() if p.requires_grad]
          else:
              params = net.params
          norm = torch.sqrt(sum(torch.sum((p.grad ** 2)) for p in params))
          if norm > theta:
              for param in params:
                  param.grad[:] *= theta / norm
      
      
      def train_epoch_ch8(net, train_iter, loss, updater, device, use_random_iter):
          """訓練網絡一個迭代周期(定義見第8章)"""
          state, timer = None, Timer()
          metric = Accumulator(2)  # 訓練損失之和,詞元數量
          # X的形狀:(batch_size, num_steps)
          # Y的形狀:(batch_size, num_steps)
          for X, Y in train_iter:
              if state is None or use_random_iter:
                  # 在第一次迭代或使用隨機抽樣時初始化state
                  state = net.begin_state(batch_size=X.shape[0], device=device)
              else:
                  if isinstance(net, nn.Module) and not isinstance(state, tuple):
                      # state對于nn.GRU是個張量
                      state.detach_()
                  else:
                      # state對于nn.LSTM或對于我們從零開始實現的模型是個張量
                      for s in state:
                          s.detach_()
              y = Y.T.reshape(-1)
              X, y = X.to(device), y.to(device)
              # y_hat 包含從開始到現在的所有輸出
              # y_hat的形狀:(batch_size * num_steps, 詞表大小)
              # state是當前計算出來的隱藏參數
              y_hat, state = net(X, state)
              # 交叉熵損失函數,傳入預測值和標簽值,并求平均值
              l = loss(y_hat, y.long()).mean()
              if isinstance(updater, torch.optim.Optimizer):
                  updater.zero_grad()
                  l.backward()
                  grad_clipping(net, 1)
                  updater.step()
              else:
                  l.backward()
                  grad_clipping(net, 1)
                  # 因為已經調用了mean函數
                  updater(batch_size=1)
              # 這里記錄交叉熵損失的值的和,以及記錄對應交叉熵損失值的樣本個數
              metric.add(l * y.numel(), y.numel())
          # 求交叉熵損失的平均值,再求exp,即可得到困惑度
          return math.exp(metric[0] / metric[1]), metric[1] / timer.stop()
      
      
      def sgd(params, lr, batch_size):
          """小批量隨機梯度下降
      
          Defined in :numref:`sec_linear_scratch`"""
          with torch.no_grad():
              for param in params:
                  param -= lr * param.grad / batch_size
                  param.grad.zero_()
      
      #@save
      def train_ch8(net, train_iter, vocab, lr, num_epochs, device,
                    use_random_iter=False):
          """訓練模型(定義見第8章)"""
          loss = nn.CrossEntropyLoss()
          # 新建一個連接客戶端
          # 指定 env=u'test1',默認端口為 8097,host 是 'localhost'
          vis = visdom.Visdom(env=u'test1', server="http://10.88.88.136", port=8097)
          animator = vis
          # 初始化
          if isinstance(net, nn.Module):
              updater = torch.optim.SGD(net.parameters(), lr)
          else:
              updater = lambda batch_size: sgd(net.params, lr, batch_size)
          predict = lambda prefix: predict_ch8(prefix, 30, net, vocab, device)
          # 訓練和預測
          for epoch in range(num_epochs):
              ppl, speed = train_epoch_ch8(
                  net, train_iter, loss, updater, device, use_random_iter)
              
      
      
              if (epoch + 1) % 10 == 0:
                  # print(predict('你是?'))
                  # print(epoch)
                  # animator.add(epoch + 1, )
      
                  if epoch == 9:
                      # 清空圖表:使用空數組來替換現有內容
                      vis.line(X=np.array([0]), Y=np.array([0]), win='train_ch8', update='replace')
      
                  vis.line(
                      X=np.array([epoch + 1]),
                      Y=[ppl],
                      win='train_ch8',
                      update='append',
                      opts={
                          'title': 'train_ch8',
                          'xlabel': 'epoch',
                          'ylabel': 'ppl',
                          'linecolor': np.array([[0, 0, 255]]),  # 藍色線條
                      }
                  )
          print(f'困惑度 {ppl:.1f}, {speed:.1f} 詞元/秒 {str(device)}')
          print(predict('你是'))
          print(predict('我有一劍'))
      
      if __name__ == '__main__':
          batch_size, num_steps = 32, 35
          train_iter, vocab = load_data_epoch(batch_size, num_steps)
      
          vocab_size, num_hiddens, device = len(vocab), 256, try_gpu()
          num_epochs, lr = 1000, 1
          model = RNNModelScratch(len(vocab), num_hiddens, device, get_lstm_params, init_lstm_state, lstm)
          
          # num_inputs = vocab_size
          # lstm_layer = nn.LSTM(num_inputs, num_hiddens)
          # model = RNNModel(lstm_layer, len(vocab), device)
          # model = model.to(device)
          
          print(predict_ch8('你是', 30, model, vocab, device))
          train_ch8(model, train_iter, vocab, lr, num_epochs, device)
      

      ??我們分別使用手動構建的LSTM和框架構建的LSTM進行訓練和測試,結果如下:

      rep_img
      rep_img
      rep_img
      rep_img

      ??我們可以看到,模型未訓練和訓練后的對比,明顯訓練后的語句要通順一點。





      后記


      ??綜合RNN和LSTM兩篇文章的結論來看,其對序列數據確實有一定的效果。

      ??此外,當前我們用RNN/LSTM做了序列數據的后續模擬生成工作,但是由于網絡深度、廣度的問題,其效果也就比在詞表中隨機抽取字組成的序列看起來要好點。

      參考文獻




      打賞、訂閱、收藏、丟香蕉、硬幣,請關注公眾號(攻城獅的搬磚之路)
      qrc_img

      PS: 請尊重原創,不喜勿噴。

      PS: 要轉載請注明出處,本人版權所有。

      PS: 有問題請留言,看到后我會第一時間回復。

      posted on 2025-09-14 10:55  SkyOnSky  閱讀(79)  評論(0)    收藏  舉報

      導航

      主站蜘蛛池模板: 99久久99这里只有免费费精品| 武装少女在线观看高清完整版免费| 大新县| 韩国无码AV片午夜福利| 国产情侣一区二区三区| 国产精品十八禁一区二区| 国产日产亚洲系列av| 少妇熟女久久综合网色欲| 亚洲av一本二本三本| 中文字幕无码免费久久| 绥江县| 日韩一区二区三区理伦片| 虎白女粉嫩尤物福利视频| 播放灌醉水嫩大学生国内精品| 超碰成人人人做人人爽| 国产黄色三级三级看三级| 91毛片网| 梨树县| 日韩精品人妻av一区二区三区| 国产精品久久久久7777| 精品无码av无码专区| 国产精品白丝一区二区三区| 国产精品一区在线蜜臀| 偷拍激情视频一区二区三区| 亚洲成人一区二区av| 亚洲乱理伦片在线观看中字| 久久99精品国产麻豆婷婷| 九九热在线视频只有精品| 大香蕉av一区二区三区| 精品国产成人国产在线视| 极品无码国模国产在线观看 | 国产精品天干天干综合网| 国产偷人妻精品一区二区在线 | 午夜AAAAA级岛国福利在线 | 免费国产高清在线精品一区| 成人免费精品网站在线观看影片| 日韩加勒比一本无码精品| 国内熟女中文字幕第一页| 大胸美女吃奶爽死视频| 在线视频中文字幕二区| 国产人妻高清国产拍精品|