實用指南:GPT_Data_Processing_Tutorial
從零開始構(gòu)建GPT數(shù)據(jù)處理管道:完整教程
目錄
引言:為什么數(shù)據(jù)處理是LLM的基礎(chǔ)?
在深入理解大語言模型(LLM)之前,我們必須先解決一個根本問題:如何將人類語言轉(zhuǎn)換為模型可以處理的數(shù)字形式?
本教程將帶你一步步構(gòu)建完整的數(shù)據(jù)處理管道,從原始文本到模型輸入。我們將以GPT-2為例,理解現(xiàn)代LLM處理文本的核心原理。
學(xué)習(xí)目標(biāo):
- 理解文本分詞的必要性和方法
- 掌握從簡單到復(fù)雜的分詞器實現(xiàn)
- 學(xué)會使用滑動窗口創(chuàng)建訓(xùn)練數(shù)據(jù)
- 理解嵌入層和位置編碼的作用
- 構(gòu)建可復(fù)用的數(shù)據(jù)加載器
文本分詞:從字符到token
為什么需要分詞?
計算機(jī)無法直接理解文字,需要將文本切分成更小的單元,這些單元稱為token。token可以是:
- 單詞(“hello”, “world”)
- 子詞(“un”, “happy”)
- 字符(“a”, “b”, “c”)
簡單分詞實現(xiàn)
讓我們從最基礎(chǔ)的分詞開始:
import re
text = "Hello, world. Is this-- a test?"
# 使用正則表達(dá)式分割文本
tokens = re.split(r'([,.:;?_!"()\']|--|\s)', text)
tokens = [item.strip() for item in tokens if item.strip()]
print(tokens)
# 輸出:['Hello', ',', 'world', '.', 'Is', 'this', '--', 'a', 'test', '?']
正則表達(dá)式解析:
[,.:;?_!"()\']- 匹配標(biāo)點符號|--- 匹配雙破折號|\s- 匹配空白字符- 外層的
()保留分隔符
處理實際文本
讓我們處理一個完整的故事:
# 下載并加載文本
import urllib.request
url = "https://raw.githubusercontent.com/rasbt/LLMs-from-scratch/main/ch02/01_main-chapter-code/the-verdict.txt"
raw_text = urllib.request.urlopen(url).read().decode('utf-8')
# 分詞處理
tokens = re.split(r'([,.:;?_!"()\']|--|\s)', raw_text)
tokens = [item.strip() for item in tokens if item.strip()]
print(f"總token數(shù): {len(tokens)}")
# 輸出:4690
構(gòu)建詞匯表:token到ID的映射
詞匯表的作用
神經(jīng)網(wǎng)絡(luò)處理的是數(shù)字,不是文本。詞匯表建立了token和唯一ID之間的映射關(guān)系。
# 構(gòu)建詞匯表
all_words = sorted(set(tokens)) # 去重并排序
vocab = {token: integer for integer, token in enumerate(all_words)}
print(f"詞匯表大小: {len(vocab)}")
# 輸出:1130
實現(xiàn)基礎(chǔ)分詞器
class SimpleTokenizerV1:
def __init__(self, vocab):
self.str_to_int = vocab # token → ID
self.int_to_str = {i: s for s, i in vocab.items()} # ID → token
def encode(self, text):
"""將文本轉(zhuǎn)換為ID序列"""
tokens = re.split(r'([,.:;?_!"()\']|--|\s)', text)
tokens = [item.strip() for item in tokens if item.strip()]
ids = [self.str_to_int[token] for token in tokens]
return ids
def decode(self, ids):
"""將ID序列轉(zhuǎn)換回文本"""
text = " ".join([self.int_to_str[i] for i in ids])
# 清理標(biāo)點符號前的空格
text = re.sub(r'\s+([,.?!"()\'])', r'\1', text)
return text
# 使用示例
tokenizer = SimpleTokenizerV1(vocab)
text = "Hello, world."
ids = tokenizer.encode(text)
print(f"編碼結(jié)果: {ids}")
print(f"解碼結(jié)果: {tokenizer.decode(ids)}")
特殊標(biāo)記:處理未知和邊界情況
問題:未知詞匯
基礎(chǔ)分詞器遇到訓(xùn)練時未見過的詞會報錯:
# 嘗試編碼不在詞匯表中的文本
text = "Hello, do you like tea?"
try:
tokenizer.encode(text)
except KeyError as e:
print(f"錯誤: {e}")
# 輸出:錯誤: 'Hello' # 'Hello' 不在詞匯表中
解決方案:添加特殊標(biāo)記
# 擴(kuò)展詞匯表,添加特殊標(biāo)記
all_tokens.extend(["", "<|unk|>"])
vocab = {token: integer for integer, token in enumerate(all_tokens)}
# 改進(jìn)的分詞器
class SimpleTokenizerV2:
def __init__(self, vocab):
self.str_to_int = vocab
self.int_to_str = {i: s for s, i in vocab.items()}
def encode(self, text):
tokens = re.split(r'([,.:;?_!"()\']|--|\s)', text)
tokens = [item.strip() for item in tokens if item.strip()]
# 處理未知詞
tokens = [token if token in self.str_to_int else "<|unk|>" for token in tokens]
ids = [self.str_to_int[token] for token in tokens]
return ids
def decode(self, ids):
text = " ".join([self.int_to_str[i] for i in ids])
text = re.sub(r'\s+([,.?!"()\'])', r'\1', text)
return text
# 測試改進(jìn)的分詞器
tokenizer = SimpleTokenizerV2(vocab)
text = "Hello, do you like tea?"
ids = tokenizer.encode(text)
print(f"編碼: {ids}")
print(f"解碼: {tokenizer.decode(ids)}")
常用特殊標(biāo)記
| 標(biāo)記 | 作用 | GPT-2中的處理 |
|---|---|---|
| `< | unk | >` |
<s> | 序列開始 | 不使用 |
</s> | 序列結(jié)束 | 使用`` |
<pad> | 填充 | 使用`` |
BytePair編碼:工業(yè)級解決方案
BPE的原理
BytePair編碼(BPE)是一種數(shù)據(jù)壓縮算法,被OpenAI用于GPT系列模型。它的核心思想是:
- 從字符級別開始
- 迭代合并最頻繁的相鄰token對
- 形成更大的子詞單元
使用tiktoken庫
import tiktoken
# 初始化GPT-2編碼器
tokenizer = tiktoken.get_encoding("gpt2")
# 編碼文本
text = "Hello, do you like tea? In the sunlit terraces of someunknownPlace."
ids = tokenizer.encode(text, allowed_special={""})
print(f"Token IDs: {ids}")
# 解碼
decoded = tokenizer.decode(ids)
print(f"解碼文本: {decoded}")
# 查看token對應(yīng)的文本
for i, token_id in enumerate(ids[:10]):
token_text = tokenizer.decode([token_id])
print(f"Token {i}: ID={token_id}, Text='{token_text}'")
BPE的優(yōu)勢
# 處理未知詞
text = "unfamiliarword"
ids = tokenizer.encode(text)
tokens = [tokenizer.decode([id]) for id in ids]
print(f"BPE分解: {tokens}")
# 可能輸出:['unfam', 'iliar', 'word']
# BPE將未知詞分解為已知的子詞組合
# 避免了<unk>標(biāo)記,保持更多信息
滑動窗口:創(chuàng)造訓(xùn)練樣本
為什么需要滑動窗口?
LLM的任務(wù)是預(yù)測下一個詞。我們需要將長文本切分成多個訓(xùn)練樣本,每個樣本包含:
- 輸入序列:連續(xù)的token
- 目標(biāo)序列:輸入序列向右移動一位
滑動窗口實現(xiàn)
# 示例:創(chuàng)建訓(xùn)練樣本
token_ids = [290, 4920, 2241, 287, 257, 984, 15632, 438]
context_size = 4
# 使用滑動窗口
samples = []
for i in range(len(token_ids) - context_size):
input_seq = token_ids[i:i + context_size]
target_seq = token_ids[i + 1: i + context_size + 1]
samples.append((input_seq, target_seq))
# 打印樣本
for i, (inp, tgt) in enumerate(samples):
print(f"樣本 {i}:")
print(f" 輸入: {inp}")
print(f" 目標(biāo): {tgt}")
print(f" 任務(wù): 根據(jù)前{len(inp)}個詞預(yù)測下一個詞")
關(guān)鍵參數(shù)說明
# 滑動窗口參數(shù)
max_length = 256 # 每個樣本的長度
stride = 128 # 滑動步長
# 完整實現(xiàn)
def create_samples(token_ids, max_length, stride):
inputs = []
targets = []
for i in range(0, len(token_ids) - max_length, stride):
input_chunk = token_ids[i:i + max_length]
target_chunk = token_ids[i + 1: i + max_length + 1]
inputs.append(input_chunk)
targets.append(target_chunk)
return inputs, targets
參數(shù)選擇策略:
stride = 1:最大數(shù)據(jù)利用率,高重疊stride = max_length:無重疊,最高效stride < max_length:平衡效率和多樣性
嵌入層:從離散到連續(xù)
嵌入層的作用
神經(jīng)網(wǎng)絡(luò)需要連續(xù)的數(shù)值輸入,而不是離散的整數(shù)。嵌入層將token ID映射為高維向量。
import torch
import torch.nn as nn
# 創(chuàng)建嵌入層
vocab_size = 50257 # GPT-2詞匯表大小
embedding_dim = 256 # 嵌入維度
embedding_layer = nn.Embedding(vocab_size, embedding_dim)
# 查看嵌入矩陣
print(f"嵌入層形狀: {embedding_layer.weight.shape}")
# 輸出:torch.Size([50257, 256])
# 轉(zhuǎn)換token ID到向量
token_ids = torch.tensor([15496, 11, 466]) # "Hello"的token
embeddings = embedding_layer(token_ids)
print(f"嵌入向量形狀: {embeddings.shape}")
# 輸出:torch.Size([3, 256])
嵌入層的工作原理
# 嵌入層本質(zhì)上是一個查找表
# 每個token ID對應(yīng)一行向量
# 模擬簡單例子
simple_vocab = 5
embed_dim = 3
simple_embedding = nn.Embedding(simple_vocab, embed_dim)
# 手動查看
for i in range(simple_vocab):
vector = simple_embedding(torch.tensor([i]))
print(f"Token {i}: {vector.squeeze().tolist()}")
為什么使用嵌入層?
- 降維:將高維one-hot編碼壓縮到低維空間
- 語義相似性:相似的詞在向量空間中更接近
- 可訓(xùn)練:嵌入向量通過訓(xùn)練學(xué)習(xí)到最優(yōu)表示
位置編碼:讓模型理解順序
問題:嵌入層丟失位置信息
# 相同的詞,不同位置
text1 = "The cat sat"
text2 = "Sat the cat"
# 經(jīng)過嵌入層后,相同詞的向量相同
# 但位置信息丟失了!
解決方案:絕對位置編碼
# 創(chuàng)建位置嵌入層
context_length = 1024 # 最大序列長度
pos_embedding_layer = nn.Embedding(context_length, embedding_dim)
# 生成位置索引
max_length = 4
position_ids = torch.arange(max_length)
print(f"位置ID: {position_ids}")
# 輸出:tensor([0, 1, 2, 3])
# 獲取位置嵌入
pos_embeddings = pos_embedding_layer(position_ids)
print(f"位置嵌入形狀: {pos_embeddings.shape}")
# 輸出:torch.Size([4, 256])
組合token嵌入和位置嵌入
# 完整的輸入嵌入計算
def create_input_embeddings(token_ids, max_length):
# Token嵌入
token_embeddings = token_embedding_layer(token_ids)
# 位置嵌入
position_ids = torch.arange(max_length)
position_embeddings = pos_embedding_layer(position_ids)
# 相加(廣播機(jī)制)
input_embeddings = token_embeddings + position_embeddings
return input_embeddings
# 示例
batch_size = 8
token_ids = torch.randint(0, vocab_size, (batch_size, max_length))
input_embeddings = create_input_embeddings(token_ids, max_length)
print(f"最終嵌入形狀: {input_embeddings.shape}")
# 輸出:torch.Size([8, 4, 256])
位置編碼的作用機(jī)制
# 可視化位置編碼的影響
import matplotlib.pyplot as plt
# 計算不同位置的余弦相似度
def plot_position_similarity():
pos_embeds = pos_embedding_layer.weight.detach().numpy()
# 計算位置0與其他位置的相似度
pos_0 = pos_embeds[0]
similarities = []
for i in range(20):
pos_i = pos_embeds[i]
sim = np.dot(pos_0, pos_i) / (np.linalg.norm(pos_0) * np.linalg.norm(pos_i))
similarities.append(sim)
plt.figure(figsize=(10, 5))
plt.plot(similarities)
plt.xlabel('Position')
plt.ylabel('Cosine Similarity with Position 0')
plt.title('Position Embedding Similarity')
plt.show()
# 每個位置都有獨特的表示,模型可以區(qū)分不同的位置
完整實現(xiàn):數(shù)據(jù)加載器
PyTorch數(shù)據(jù)集類
import torch
from torch.utils.data import Dataset, DataLoader
import tiktoken
class GPTDataset(Dataset):
def __init__(self, text, max_length=256, stride=128):
self.max_length = max_length
self.stride = stride
# 初始化tokenizer
self.tokenizer = tiktoken.get_encoding("gpt2")
# 編碼全文
self.token_ids = self.tokenizer.encode(text, allowed_special={""})
# 創(chuàng)建輸入和目標(biāo)
self.inputs = []
self.targets = []
for i in range(0, len(self.token_ids) - max_length, stride):
input_chunk = self.token_ids[i:i + max_length]
target_chunk = self.token_ids[i + 1: i + max_length + 1]
self.inputs.append(torch.tensor(input_chunk))
self.targets.append(torch.tensor(target_chunk))
def __len__(self):
return len(self.inputs)
def __getitem__(self, idx):
return self.inputs[idx], self.targets[idx]
def create_dataloader(text, batch_size=4, max_length=256,
stride=128, shuffle=True, drop_last=True):
"""創(chuàng)建數(shù)據(jù)加載器"""
dataset = GPTDataset(text, max_length, stride)
dataloader = DataLoader(
dataset,
batch_size=batch_size,
shuffle=shuffle,
drop_last=drop_last,
num_workers=0
)
return dataloader
完整的嵌入處理流程
class GPTEmbeddingProcessor:
def __init__(self, vocab_size=50257, embed_dim=768, max_length=1024):
self.vocab_size = vocab_size
self.embed_dim = embed_dim
self.max_length = max_length
# 初始化嵌入層
self.token_embedding = nn.Embedding(vocab_size, embed_dim)
self.position_embedding = nn.Embedding(max_length, embed_dim)
# 初始化tokenizer
self.tokenizer = tiktoken.get_encoding("gpt2")
def process_batch(self, token_ids):
"""處理一個批次的token IDs"""
batch_size, seq_length = token_ids.shape
# Token嵌入
token_embeds = self.token_embedding(token_ids)
# 位置嵌入
position_ids = torch.arange(seq_length, device=token_ids.device)
position_embeds = self.position_embedding(position_ids)
# 廣播相加
position_embeds = position_embeds.unsqueeze(0).expand(batch_size, -1, -1)
# 最終嵌入
input_embeddings = token_embeds + position_embeds
return input_embeddings
# 使用示例
processor = GPTEmbeddingProcessor()
# 加載數(shù)據(jù)
with open("the-verdict.txt", "r", encoding="utf-8") as f:
text = f.read()
# 創(chuàng)建數(shù)據(jù)加載器
dataloader = create_dataloader(text, batch_size=8, max_length=4, stride=4)
# 處理數(shù)據(jù)
for batch_inputs, batch_targets in dataloader:
# 獲取嵌入
embeddings = processor.process_batch(batch_inputs)
print(f"輸入形狀: {batch_inputs.shape}")
print(f"嵌入形狀: {embeddings.shape}")
print(f"目標(biāo)形狀: {batch_targets.shape}")
break # 只處理第一個batch
數(shù)據(jù)流完整示例
# 完整的數(shù)據(jù)處理流程
def demonstrate_full_pipeline():
"""演示完整的數(shù)據(jù)處理流程"""
# 1. 原始文本
text = "The quick brown fox jumps over the lazy dog."
print(f"原始文本: {text}")
# 2. 分詞
tokenizer = tiktoken.get_encoding("gpt2")
token_ids = tokenizer.encode(text)
print(f"Token IDs: {token_ids}")
# 3. 創(chuàng)建滑動窗口樣本
max_length = 5
samples = []
for i in range(len(token_ids) - max_length):
input_ids = token_ids[i:i + max_length]
target_ids = token_ids[i + 1: i + max_length + 1]
samples.append((input_ids, target_ids))
print(f"\n創(chuàng)建了 {len(samples)} 個訓(xùn)練樣本")
# 4. 轉(zhuǎn)換為tensor
input_tensor = torch.tensor(samples[0][0]).unsqueeze(0)
target_tensor = torch.tensor(samples[0][1]).unsqueeze(0)
print(f"\n第一個樣本:")
print(f"輸入: {input_tensor.tolist()[0]}")
print(f"目標(biāo): {target_tensor.tolist()[0]}")
# 5. 嵌入處理
processor = GPTEmbeddingProcessor()
embeddings = processor.process_batch(input_tensor)
print(f"\n嵌入形狀: {embeddings.shape}")
print(f"每個token被映射到 {embeddings.shape[-1]} 維向量")
# 6. 解碼展示
print("\n解碼展示:")
input_text = tokenizer.decode(input_tensor[0])
target_text = tokenizer.decode(target_tensor[0])
print(f"輸入文本: '{input_text}'")
print(f"目標(biāo)文本: '{target_text}'")
demonstrate_full_pipeline()
實踐建議與常見問題
最佳實踐
選擇合適的max_length
# 根據(jù)硬件和任務(wù)選擇 GPU內(nèi)存 < 8GB: max_length = 128-256 GPU內(nèi)存 = 16GB: max_length = 512-1024 GPU內(nèi)存 > 32GB: max_length = 1024-2048stride的設(shè)置策略
# 訓(xùn)練初期:小stride,更多數(shù)據(jù) stride = max_length // 4 # 訓(xùn)練后期:大stride,避免過擬合 stride = max_length // 2批處理優(yōu)化
# 動態(tài)batch size batch_size = { "embed_dim": 256: 32, "embed_dim": 512: 16, "embed_dim": 768: 8, "embed_dim": 1024: 4 }[embed_dim]
常見問題解決
Q1: 如何處理超長文本?
def chunk_long_text(text, chunk_size=10000, overlap=100):
"""將長文本分塊處理"""
chunks = []
for i in range(0, len(text), chunk_size - overlap):
chunk = text[i:i + chunk_size]
chunks.append(chunk)
return chunks
Q2: 內(nèi)存不足怎么辦?
# 使用梯度累積
accumulation_steps = 4
effective_batch_size = batch_size * accumulation_steps
# 或者減小batch size和max_length
batch_size = 2
max_length = 128
Q3: 如何驗證數(shù)據(jù)處理正確性?
def verify_data_pipeline(dataloader):
"""驗證數(shù)據(jù)處理的正確性"""
for inputs, targets in dataloader:
# 檢查形狀
assert inputs.shape == targets.shape
# 檢查目標(biāo)是否是輸入的偏移
assert torch.all(targets[:, :-1] == inputs[:, 1:])
# 檢查范圍
assert inputs.min() >= 0
assert inputs.max() < vocab_size
print("? 數(shù)據(jù)驗證通過")
break
性能優(yōu)化技巧
預(yù)加載tokenizer
# 避免重復(fù)初始化 tokenizer = tiktoken.get_encoding("gpt2")使用pin_memory
dataloader = DataLoader(dataset, batch_size=32, pin_memory=True)多進(jìn)程加載
dataloader = DataLoader(dataset, batch_size=32, num_workers=4)
調(diào)試技巧
def debug_dataloader(dataloader, num_samples=3):
"""調(diào)試數(shù)據(jù)加載器"""
print("=== 數(shù)據(jù)加載器調(diào)試信息 ===")
for i, (inputs, targets) in enumerate(dataloader):
if i >= num_samples:
break
print(f"\n樣本 {i}:")
print(f" 輸入形狀: {inputs.shape}")
print(f" 目標(biāo)形狀: {targets.shape}")
print(f" 輸入范圍: [{inputs.min()}, {inputs.max()}]")
# 解碼第一個樣本
input_text = tokenizer.decode(inputs[0])
target_text = tokenizer.decode(targets[0])
print(f" 輸入文本: {input_text[:50]}...")
print(f" 目標(biāo)文本: {target_text[:50]}...")

浙公網(wǎng)安備 33010602011771號