SMO算法實現
直接給出完整的代碼如下
點擊查看代碼
# coding=utf-8
import numpy as np
import random
class mySVM():
def __init__(self, max_iter=1e6, kernel_type='linear', C=1.0, epsilon=1e-2):
self.max_iter = max_iter
self.kernel_type = kernel_type
self.C = C
self.epsilon = epsilon
self.iter_num = 0
self.alpha = None
self.w = None
self.b = None
self.support_vectors = None
self.kernels = {
'linear': self.kernel_linear,
'quadratic': self.kernel_poly
}
def fit(self, X, y):
# Initialization
y = 2 * (y - 0.5) # label: 0 -> -1, 1 -> 1
n_data, n_feature = X.shape[0], X.shape[1]
self.alpha = np.zeros(shape=n_data)
self.w = np.zeros(shape=n_feature)
self.b = 0.
kernel = self.kernels[self.kernel_type]
count = 0
err_prev = -1
# Compute y*f(x) , Error vec E , KKT offset
Error, yf = self.cal_E(X, y, self.alpha, n_data, w=self.w, b=self.b)
offset = self.cal_offset(self.alpha, self.C, yf)
while True:
count += 1
# idx1, idx2 to be optimized
idx1, idx2 = self.found_idx(yf, self.alpha, Error, offset, err_prev)
# loop
cnt_idx1 = -1
for i in idx1:
cnt_idx1 += 1
for j in idx2[cnt_idx1]:
# Set new alpha
x_i, x_j, y_i, y_j = X[i, :], X[j, :], y[i], y[j]
# print(i, j)
alpha_old_j, alpha_old_i = self.alpha[j].copy(), self.alpha[i].copy()
# E_i, E_j
E_i = Error[i]
E_j = Error[j]
# Set new alpha and clip to [L, H]
eta = kernel(x_i, x_i) + kernel(x_j, x_j) - 2 * kernel(x_i, x_j)
if eta <= 0:
continue
(L, H) = self.compute_L_H(self.C, alpha_old_j, alpha_old_i, y_j, y_i)
self.alpha[j] = alpha_old_j + float(y_j * (E_i - E_j)) / eta
self.alpha[j] = max(self.alpha[j], L)
self.alpha[j] = min(self.alpha[j], H)
self.alpha[i] = alpha_old_i + y_i * y_j * (alpha_old_j - self.alpha[j])
# set model parameters
self.w = self.calc_w(self.alpha, y, X)
self.b = self.calc_b(X, y, i, j, E_i, E_j, alpha_old_i, alpha_old_j)
Error, yf = self.cal_E(X, y, self.alpha, n_data, w=self.w, b=self.b)
# # Check convergence
# 最大/平均違反量 < epsilon or 迭代次數 > max_iter
offset = self.cal_offset(self.alpha, self.C, yf)
err = np.max(offset)
# print('error: ', err)
if err < self.epsilon or err == err_prev:
break
err_prev = err
if count >= self.max_iter:
raise RuntimeError("Iteration number exceeded the max_iter of {} iterations".format(self.max_iter))
# support vectors: alpha != 0
alpha_idx = np.where(self.alpha > 0)[0]
self.support_vectors = X[alpha_idx, :]
self.iter_num = count
# # final model parameters
self.w = self.calc_w(self.alpha, y, X)
b_sv = y[alpha_idx] - np.dot(self.w.T, X[alpha_idx, :].T)
y_sv = y[alpha_idx]
b_median = np.median(b_sv)
if b_sv.shape[0] % 2 == 0:
self.b = b_median
return X[alpha_idx, :], count
else:
tt = np.where(b_sv == b_median)
y_median = y_sv[tt]
min_ = 1e5
idx_neighbor = -1
for ii in range(1, alpha_idx.shape[0]):
if y_sv[ii] != y_median and b_sv[ii] - b_median < min_:
min_ = b_sv[ii] - b_median
idx_neighbor = ii
self.b = 0.5 * (b_median + b_sv[idx_neighbor])
return X[alpha_idx, :], count
def found_idx(self, yf, alpha, Error, offset, prev_err=-1):
idx1 = []
idx2 = []
found_alpha = False
# 外循環:違反KKT條件最嚴重的 alpha_1
idx_case1 = np.where(yf >= 1)[0] # yf >= 1 and 0 = alpha
idx_case2 = np.where(yf == 1)[0] # yf == 1 and 0 < alpha < C
idx_case3 = np.where(yf <= 1)[0] # yf <= 1 and alpha = C
for idx in idx_case2: # yf = 1 and 0 < alpha < C
if not 0 < alpha[idx] < self.C and offset[idx] >= prev_err:
found_alpha = True
idx1.append(idx)
if not found_alpha:
for idx in idx_case1: # yf >= 1 and 0 = alpha
if not alpha[idx] == 0 and offset[idx] >= prev_err:
found_alpha = True
idx1.append(idx)
if not found_alpha:
for idx in idx_case3: # yf <= 1 and alpha = C
if not alpha[idx] == self.C and offset[idx] >= prev_err:
found_alpha = True
idx1.append(idx)
if not found_alpha: # all alpha satisfied KKT condition
return idx1, idx2
# 內循環:|E1 - E2| 最大的 alpha_2
for idx in idx1:
Ei_Ej = np.abs(Error - Error[idx])
max_Ei_Ej = np.max(Ei_Ej)
idx2.append(list(np.argwhere(Ei_Ej == max_Ei_Ej).flatten()))
return idx1, idx2
def predict(self, X):
fx = self.f_wx_plus_b(X, self.w, self.b)
return np.sign(fx).astype(int)
def calc_b(self, X, y, i, j, E_i, E_j, alpha_i_old, alpha_j_old):
# b_tmp = y - np.dot(w.T, X.T)
K = self.kernels[self.kernel_type]
b1 = self.b - E_i - y[i] * (self.alpha[i] - alpha_i_old) * K(X[i], X[i])
- y[j] * (self.alpha[j] - alpha_j_old) * K(X[i], X[j])
b2 = self.b - E_j - y[i] * (self.alpha[i] - alpha_i_old) * K(X[i], X[j])
- y[j] * (self.alpha[j] - alpha_j_old) * K(X[j], X[j])
if 0 < self.alpha[i] < self.C:
b_tmp = b1
elif 0 < self.alpha[j] < self.C:
b_tmp = b2
else:
b_tmp = (b1 + b2) / 2
return b_tmp
def calc_w(self, alpha, y, X):
return np.dot(X.T, np.multiply(alpha, y))
# Prediction
def f_wx_plus_b(self, X, w, b):
return np.dot(w.T, X.T) + b
def compute_L_H(self, C, alpha_old_j, alpha_old_i, y_j, y_i):
if (y_i != y_j):
return (max(0, alpha_old_j - alpha_old_i), min(C, C - alpha_old_i + alpha_old_j))
else:
return (max(0, alpha_old_i + alpha_old_j - C), min(C, alpha_old_i + alpha_old_j))
# Define kernels
def kernel_linear(self, x1, x2):
return np.dot(x1, x2.T)
def kernel_poly(self, x1, x2, d=2):
return np.dot(x1, x2.T) ** d
def cal_offset(self, alpha, C, yf):
offset = np.ones_like(alpha)
for idx in range(alpha.shape[0]):
if alpha[idx] == 0: # yf >= 1 and 0 = alpha
offset[idx] = 1 - yf[idx]
elif alpha[idx] == C: # yf <= 1 and alpha = C
offset[idx] = yf[idx] - 1
else: # yf == 1 and 0 < alpha < C
offset[idx] = abs(1 - yf[idx])
return offset
def cal_E(self, X, y, alpha, n_data, w, b):
yf = np.zeros(shape=n_data)
Error = np.zeros(shape=n_data)
for j in range(n_data):
x_j, y_j, alpha_j = X[j, :], y[j], alpha[j]
fx = self.f_wx_plus_b(x_j, w, b)
yf[j] = y_j * fx
Error[j] = fx - y_j
return Error, yf
調用方法如下
from SVM_SMO import mySVM
svm = mySVM()
support_vectors, iterations = svm.fit(X_train, y_train) # 訓練
y_predict = svm.predict(X_test) # 測試
使用案例:點擊查看代碼
#
# coding=utf-8
import pandas as pd
import numpy as np
import matplotlib.pyplot as plt
from sklearn.model_selection import train_test_split
from sklearn.svm import SVC
from sklearn.metrics import accuracy_score
df = pd.read_csv('iris.data.txt', header=0)
data = df.values[:, :2]
def split_data(data, test_size=0.25, random_state=None):
"""
將數據劃分為訓練集和測試集
參數:
data: numpy數組, 形狀為 [n_samples, n_features],包含特征數據
test_size: 測試集占總樣本的比例, 默認為0.25
random_state: 隨機數種子, 默認為None
返回值:
X_train: 訓練集特征
X_test: 測試集特征
y_train: 訓練集標簽
y_test: 測試集標簽
"""
labels = np.zeros(data.shape[0])
labels[50:] = 1
X_train, X_test, y_train, y_test = train_test_split(data, labels, test_size=test_size, random_state=random_state)
return X_train, X_test, y_train, y_test
def svm_process(X_train, y_train, X_test, y_test):
"""
使用 sklearn 的 SVM 訓練模型, 并得到分類函數的參數和支撐向量
參數:
X_train: 訓練集特征
y_train: 訓練集標簽
X_test: 測試集特征
y_test: 測試集標簽(用于計算準確率)
返回值:
k: 分類函數的斜率
b: 分類函數的截距
support_vectors: 支撐向量
"""
svm = SVC(kernel='linear')
svm.fit(X_train, y_train)
y_predict = svm.predict(X_test) ###這一步作為中間變量,雖然不需要傳遞但必須要計算。
accuracy = accuracy_score(y_test, y_predict) ###比較你的y_predict與真實標簽y_text
print("Accuracy: ", accuracy)
### 得到線性分類函數的參數。自己編寫時直接計算即可。
k = svm.coef_[0]
b = svm.intercept_[0]
### 需計算得到支撐向量們
support_vectors = svm.support_vectors_
return k, b, support_vectors
def plot_results(k, b, support_vectors, X_test, y_test):
"""
繪制分類結果圖
參數:
k: 分類函數的斜率
b: 分類函數的截距
support_vectors: 支撐向量
X_test: 測試集特征
y_test: 測試集標簽
"""
# 畫出結果圖
plt.figure(figsize=(10, 6))
# 繪制支撐向量點
plt.scatter(support_vectors[:, 0], support_vectors[:, 1], s=200, linewidth=1,
facecolors='none', edgecolors='k', label='support vectors', alpha=0.5)
# 繪制測試集樣本
plt.scatter(X_test[:, 0], X_test[:, 1], c=y_test, cmap='coolwarm', edgecolors='k', s=70, label='test samples')
# 繪制分類函數
xx = np.linspace(X_test[:, 0].min(), X_test[:, 0].max())
yy = - (k[0] / k[1]) * xx - (b / k[1])
plt.plot(xx, yy, 'k-', label='cls function')
plt.title('SVM Classification Results')
plt.legend()
plt.xlabel('Feature 1')
plt.ylabel('Feature 2')
plt.show()
def my_svm_test(X_train, y_train, X_test, y_test):
from SVM_SMO import mySVM
svm = mySVM()
support_vectors, iterations = svm.fit(X_train, y_train)
print('Iteration number: ', iterations)
y_predict = svm.predict(X_test)
y_predict = (y_predict + 1) // 2 # 把標簽轉換為 0,1
accuracy = accuracy_score(y_test, y_predict)
print("Accuracy: ", accuracy)
# ### 得到線性分類函數的參數。自己編寫時直接計算即可。
# k = np.array([2.19, -2.50])
# b = -3.97
k = svm.w
b = svm.b
### 需計算得到支撐向量們
support_vectors = svm.support_vectors
return k, b, support_vectors
X_train, X_test, y_train, y_test = split_data(data)
### 幫助理解各樣本的形狀
print(X_train.shape)
print(y_train.shape)
print(X_test.shape)
print(y_test.shape)
# k, b, support_vectors = svm_process(X_train, y_train, X_test,y_test)
k, b, support_vectors = my_svm_test(X_train, y_train, X_test, y_test)
### 上面得到的準確率,以及這三項可能會因樣本點的隨機性而在一定范圍內變化
print('k', k)
print('b', b)
### 支撐向量個數,內容也可能改變
print('support_vectors.shape', support_vectors.shape)
plot_results(k, b, support_vectors, X_train, y_train)
plot_results(k, b, support_vectors, X_test, y_test)
代碼分析
1. 序列最小最優化算法(SMO)
SMO 算法是要解如下凸二次規劃的對偶問題:

在這個問題中,變量是拉格朗日乘子,一個變量\(\alpha i\) 對應于一個樣本點(\(x i\) ,\(y i\) );變量的總數等于訓練樣本容量 N。
SMO 算法是一種啟發式算法,其基本思路是:如果所有變量的解都滿足此最優化問題的 KKT 條件(Karush-Kuhn-Tucker conditions),那么這個最優化問題的解就得到了。因為 KKT條件是該最優化問題的充分必要條件。否則,選擇兩個變量,固定其他變量,針對這兩個變量構建一個二次規劃問題。這個二次規劃問題關于這兩個變量的解應該更接近原始二次規劃問題的解,因為這會使得原始二次規劃問題的目標函數值變得更小。重要的是,這時子問題可以通過解析方法求解,這樣就可以大大提高整個算法的計算速度。子問題有兩個變量,一個是違反 KKT 條件最嚴重的那一個,另一個由約束條件自動確定。如此,SMO 算法將原問題不斷分解為子問題并對子問題求解,進而達到求解原問題的目的。
具體的算法介紹可以參考博客: https://blog.csdn.net/v_july_v/article/details/7624837
2. 算法流程


3. 算法流程 code 版
def fit(self, X, y):
# Initialization
# Compute y*f(x) , Error vec E , KKT offset
while True:
# select idx1, idx2 to be optimized
# 會有很多個 idx1,把它們存在一個列表里,
# 每個 idx1 會對應很多個 idx2,同樣存儲在列表里
# loop
for i in idx1:
cnt_idx1 += 1
for j in idx2[cnt_idx1]:
# Set new alpha and clip to [L, H]
self.alpha[j] = alpha_old_j + float(y_j * (E_i - E_j)) / eta
clip alpha[j] to [L, H]
self.alpha[i] = alpha_old_i + y_i * y_j * (alpha_old_j - self.alpha[j])
# set model parameters
self.w = self.calc_w()
self.b = self.calc_b()
Error, yf = self.cal_E()
# Check convergence
# 最大/平均違反量 < epsilon or 迭代次數 > max_iter
# -------output---------
# support vectors: alpha != 0
# final model parameters
# self.w = self.calc_w()
# self.b = b_support_vectors.median if num_support_vectors is even
# self.b = 0.5 * (b_support_vectors.median + b_neighbor closest to median with different label from median)
4. 關于 KKT 條件

外循環選擇違反KKT條件最嚴重的變量時,選擇順序如下:
case2 -> case1 -> case3
idx_case1 = np.where(yf >= 1)[0] # yf >= 1 and 0 = alpha
idx_case2 = np.where(yf == 1)[0] # yf == 1 and 0 < alpha < C
idx_case3 = np.where(yf <= 1)[0] # yf <= 1 and alpha = C
5. 實驗結果
Iteration number: 3
Accuracy: 0.9736842105263158
k: [2.23113607110187 -2.8089762488565504]
b: -3.2536323858315903
support_vectors.shape: (24, 2)
-------------------------------------測試數據-----------------------------------------------:

-------------------------------------訓練數據-----------------------------------------------:

浙公網安備 33010602011771號