<output id="qn6qe"></output>

    1. <output id="qn6qe"><tt id="qn6qe"></tt></output>
    2. <strike id="qn6qe"></strike>

      亚洲 日本 欧洲 欧美 视频,日韩中文字幕有码av,一本一道av中文字幕无码,国产线播放免费人成视频播放,人妻少妇偷人无码视频,日夜啪啪一区二区三区,国产尤物精品自在拍视频首页,久热这里只有精品12

      分布式機器學習:邏輯回歸的并行化實現(PySpark)

      算法的完整實現代碼我已經上傳到了GitHub倉庫:Distributed-ML-PySpark(包括其它分布式機器學習算法),感興趣的童鞋可以前往查看。

      1 梯度計算式導出

      我們在博客《統計學習:邏輯回歸與交叉熵損失(Pytorch實現)》中提到,設\(w\)為權值(最后一維為偏置),樣本總數為\(N\)\(\{(x_i, y_i)\}_{i=1}^N\)為訓練樣本集。樣本維度為\(D\)\(x_i\in \mathbb{R}^{D+1}\)(最后一維擴充),\(y_i\in\{0, 1\}\)。則邏輯回歸的損失函數為:

      \[\mathcal{l}(w) = \sum_{i=1}^{N}\left[y_{i} \log \pi_{w}\left(x_{i}\right)+\left(1-y_{i}\right) \log \left(1-\pi_w\left(x_{i}\right)\right)\right] \]

      這里

      \[\begin{aligned} \pi_w(x) = p(y=1 \mid x; w) =\frac{1}{1+\exp \left(-w^{T} x\right)} \end{aligned} \]

      寫成這個形式就已經可以用諸如Pytorch這類工具來進行自動求導然后采用梯度下降法求解了。不過若需要用表達式直接計算出梯度,我們還需要將損失函數繼續化簡為:

      \[\mathcal{l}(w) = -\sum_{i=1}^N(y_i w^T x_i - \log(1 + \exp(w^T x_i))) \]

      可將梯度表示如下:

      \[\nabla_w{\mathcal{l}(w)} = -\sum_{i=1}^N(y_i - \frac{1}{\exp(-w^Tx)+1})x_i \]

      2 基于Spark的并行化實現

      邏輯回歸的目標函數常采用梯度下降法求解,該算法的并行化可以采用如下的Map-Reduce架構(圖片來自王樹森老師的YouTube課程并行計算與機器學習(1/3)[2]):

      先將第\(t\)輪迭代的權重廣播到各worker,各worker計算一個局部梯度(map過程),然后再將每個節點的梯度聚合(reduce過程),最終對參數進行更新。

      在Spark中每個task對應一個分區,決定了計算的并行度(分區的概念詳間我們上一篇博客Spark: 單詞計數(Word Count)的MapReduce實現(Java/Python) )。我們假設共有3個分區,則在Spark的實現過程如下:

      • map階段: 各task運行map()函數對每個樣本\((x_i, y_i)\)計算梯度\(g_i\), 然后對每個樣本對應的梯度運行進行本地聚合,以減少后面的數據傳輸量。如第1個task執行reduce()操作得到\(\widetilde{g}_1 = \sum_{i=1}^3 g_i\) 如下圖所示:

      • reduce階段:使用reduce()將所有task的計算結果收集到Driver端進行聚合,然后進行參數更新。

      在上圖中,訓練數據用points:PrallelCollectionRDD來表示,參數向量用\(w\)來表示,注意參數向量不是RDD,只是一個單獨的參與運算的變量。

      此外需要注意一點,雖然每個task在本地進行了局部聚合,但如果task過多且每個task本地聚合后的結果(單個gradient)過大那么統一傳遞到Driver端仍然會造成單點的網絡平均等問題。為了解決這個問題,Spark設計了性能更好的treeAggregate()操作,使用樹形聚合方法來減少網絡和計算延遲,我們在第5部分會詳細敘述。

      3 PySpark實現代碼

      PySpark的完整實現代碼如下:

      '''
      Descripttion: 
      Version: 1.0
      Author: ZhangHongYu
      Date: 2022-05-26 21:02:38
      LastEditors: ZhangHongYu 
      LastEditTime: 2022-07-01 16:22:53
      '''
      from sklearn.datasets import load_breast_cancer
      import numpy as np
      from pyspark.sql import SparkSession
      from operator import add
      from sklearn.model_selection import train_test_split
      from sklearn.metrics import accuracy_score
      import matplotlib.pyplot as plt
      import sys
      import os
      
      os.environ['PYSPARK_PYTHON'] = sys.executable
      
      n_threads = 3  # Number of local threads
      n_iterations = 1500  # Number of iterations
      eta = 0.1  # iteration step_size
      
      def logistic_f(x, w):
          return 1 / (np.exp(-x.dot(w)) + 1)
      
      
      def gradient(point: np.ndarray, w: np.ndarray) -> np.ndarray:
          """ Compute linear regression gradient for a matrix of data points
          """
          y = point[-1]    # point label
          x = point[:-1]   # point coordinate
          # For each point (x, y), compute gradient function, then sum these up
          return - (y - logistic_f(x, w)) * x
      
      def draw_acc_plot(accs, n_iterations):
          def ewma_smooth(accs, alpha=0.9):
              s_accs = np.zeros(n_iterations)
              for idx, acc in enumerate(accs):
                  if idx == 0:
                      s_accs[idx] = acc
                  else:
                      s_accs[idx] = alpha * s_accs[idx-1] + (1 - alpha) * acc
              return s_accs
      
          s_accs = ewma_smooth(accs, alpha=0.9)
          plt.plot(np.arange(1, n_iterations + 1), accs, color="C0", alpha=0.3)
          plt.plot(np.arange(1, n_iterations + 1), s_accs, color="C0")
          plt.title(label="Accuracy on test dataset")
          plt.xlabel("Round")
          plt.ylabel("Accuracy")
          plt.savefig("logistic_regression_acc_plot.png")
      
      
      if __name__ == "__main__":
      
          X, y = load_breast_cancer(return_X_y=True)
      
          D = X.shape[1]
          X_train, X_test, y_train, y_test = train_test_split(
              X, y, test_size=0.3, random_state=0)
          n_train, n_test = X_train.shape[0], X_test.shape[0]
      
          spark = SparkSession\
              .builder\
              .appName("Logistic Regression")\
              .master("local[%d]" % n_threads)\
              .getOrCreate()
      
          matrix = np.concatenate(
              [X_train, np.ones((n_train, 1)), y_train.reshape(-1, 1)], axis=1)
      
          points = spark.sparkContext.parallelize(matrix).cache()
      
          # Initialize w to a random value
          w = 2 * np.random.ranf(size=D + 1) - 1
          print("Initial w: " + str(w))
          
          accs = []
          for t in range(n_iterations):
              print("On iteration %d" % (t + 1))
              
              g = points.map(lambda point: gradient(point, w)).reduce(add)
      
              w -= eta * g
              
              y_pred = logistic_f(np.concatenate(
                  [X_test, np.ones((n_test, 1))], axis=1), w)
              pred_label = np.where(y_pred < 0.5, 0, 1)
              acc = accuracy_score(y_test, pred_label)
              accs.append(acc)
              print("iterations: %d, accuracy: %f" % (t, acc))
      
          print("Final w: %s " % w)
          print("Final acc: %f" % acc)
      
          spark.stop()
          
          draw_acc_plot(accs, n_iterations)
      

      注意.master("local[%d]" % n_threads)中的n_threads是我們在本地單機多線程調試模式下所設置的線程數,也就是Spark中的默認分區數,我們此處將n_threads設置為3,則Spark就會默認劃分出3個分區。我們在代碼中采用breast cancer數據集進行訓練和測試,該數據集是個二分類數據集。模型初始權重采用隨機初始化。

      最后,我們來看一下算法的輸出結果。

      初始權重如下:

      Initial w: [ 0.20733249 -0.68270323 -0.23539134  0.46125717 -0.27736064 -0.36072597
        0.92549048 -0.18432978  0.77991313  0.54054734  0.48559498 -0.23031733
        0.67125099  0.57301018  0.69243332 -0.4791771  -0.76039149  0.15924619
        0.01321836 -0.19976038  0.576716    0.50379885  0.58670905 -0.38590575
       -0.48719581 -0.91967718  0.73359703  0.28669715  0.56688998  0.97444464
       -0.44361797]
      

      最終的模型權重與在測試集上的準確率結果如下:

      Final w: [ 1.15974825e+04  1.29973800e+04  6.52553107e+04  2.10241061e+04
        8.86514067e+01 -1.10587723e+02 -2.97300359e+02 -1.27131718e+02
        1.59369309e+02  7.84967515e+01 -4.03071071e+01  8.13799814e+02
       -1.30662140e+03 -4.04474691e+04  5.34490109e+00 -2.28709226e+01
       -4.24236287e+01 -8.04493849e+00  1.12580376e+01  7.93202730e-01
        1.25640151e+04  1.51951403e+04  6.46383775e+04 -3.18968898e+04
        9.95884228e+01 -4.01750499e+02 -6.93005010e+02 -1.78725566e+02
        1.93133380e+02  6.01062122e+01  1.52932953e+03] 
      Final acc: 0.947368
      

      4 關于冗余存儲的反思

      注意根據我們以上的代碼實現中的

      map(lambda point: gradient(point, w)).reduce(add)
      

      這一行中,我們求梯度的函數gradient會根據w將每一個訓練樣本點map到其對應梯度值。w的拷貝會被發送給每個計算節點的每個CPU。比如,假設我們有一個4個CPU的計算節點。

      默認當map過程發生時,所有被map過程需要的數據會被發往mapper,而此處每個CPU都有一個mapper,故如果該計算節點有4個CPU,我們實際上會發送4個w的拷貝到該節點,如下圖所示:

      之所以會這樣,是因為此處假定w會被修改,必須為每個CPU單獨存儲w拷貝以解決并發寫的問題。然而,當我們計算每一步的梯度時,w并未被修改,故此處不存在并發寫的問題。這導致我們浪費了存儲空間,因為本可將w存儲在各個節點的共享內存中的。

      為了解決此問題,我們可以將w進行廣播,這樣它只會被發到每個計算節點一次(而不是每個CPU一次)。為了實現這個想法,我們將w定義為一個廣播變量來使用,如下面代碼所示:

      # Initialize w to a random value
      w = 2 * np.random.ranf(size=D + 1) - 1
      print("Initial w: " + str(w))
      
      for t in range(n_iterations):
          print("On iteration %d" % (t + 1))
          w_br = spark.sparkContext.broadcast(w)
          
          g = points.map(lambda point: gradient(point, w_br.value)).reduce(add)
      
          w -= alpha * g
      

      當我們初始化w時,我們首先將其聲明為一個廣播變量。在每一輪梯度下降的迭代中,我們需要引用w的值。最后,我們在w被更新后重新廣播w。這樣,w在每個機器上被高效地存儲(每個機器一份,而不是多份)。

      5 關于聚合效率的反思

      正如我們前面所說,我們可以用性能更好的treeAggregate()操作來替代reduce()操作,該操作使用樹形聚合方法來減少網絡和計算延遲。
      treeAggregate()函數原型如下:

      RDD.treeAggregate(zeroValue, seqOp, combOp, depth=2)
      

      其中zeroValue為聚合結果的初始值,seqOp函數用于定義單分區(partition)做聚合操作的方法,它第一個參數為聚合結果,第二個參數為分區中的數據變量。combOp定義對分區之間做聚合的方法,它第一個參數為第二個參數都為聚合結果。
      depth為聚合樹的深度。

      此處我們的聚合操作比較簡單,聚合結果初始值設置為0.0seqOpcombOp都設置為add算子即可:

      g = points.map(lambda point: gradient(point, w_br.value))\
          .treeAggregate(0.0, add, add)
      

      6 算法收斂性和復雜度分析

      6.1 收斂性和計算復雜度

      算法的ACC曲線如下圖所示:

      可見我們的算法總體呈現收斂。

      這里的損失函數\(l( \space \cdot \space )\)是光滑凸函數(非強凸函數,它只在局部呈現強凸性),如我們在博客《數值優化:經典一階確定性算法及其收斂性分析》中所分析,假設目標函數\(f: \mathbb{R}^d \rightarrow \mathbb{R}\)是凸函數,且\(\beta\)-光滑,當步長\(\eta = \frac{1}{\beta}\)時,梯度下降法具有\(\mathcal{O}(\frac{1}{t})\)次線性收斂速率

      \[f(w^t) - f(w^*) \leqslant \frac{2\beta \lVert w^0 - w^*\rVert^2}{t} \]

      這意味著在梯度下降求解邏輯回歸問題的迭代次數復雜度為\(\mathcal{O}(\frac{1}{\varepsilon})\),也即\(\mathcal{O}(\frac{1}{\varepsilon})\)輪后會取得\(\varepsilon\)的精度。

      盡管梯度的計算可以被分攤到個計算節點上,然而梯度下降的迭代是串行的。每輪迭代中,Spark會執行同步屏障(synchronization barrier)來確保在各worker開始下一輪迭代前\(w\)已被更新完畢。如果存在掉隊者(stragglers),其它worker就會空閑(idle)等待,直到下一輪迭代。故相比梯度的計算,其迭代計算的“深度”(depth)是其計算瓶頸。

      6.2 通信復雜度

      map過程顯然是并行的,并不需要通信。broadcast過程需要一對多通信,并且reduce過程需要多對一通信(都按照樹形結構)。故對于每輪迭代,總通信時間按

      \[2\text{log}_2(p)(L + \frac{m}{B}) \]

      增長。
      這里\(p\)為除去driver節點的運算節點個數,\(L\)是節點之間的通信延遲。\(B\)是節點之間的通信帶寬。\(M\)是每輪通信中節點間傳輸的信息大小。故消息能夠夠以\(\mathcal{O}(\log p)\)的通信輪數在所有節點間傳遞。

      參考

      posted @ 2022-05-27 19:00  orion-orion  閱讀(1241)  評論(0)    收藏  舉報
      主站蜘蛛池模板: 色综合天天综合网中文伊| 久久九九久精品国产免费直播 | 亚洲精品色国语对白在线| 精品国产一区二区色老头| 成在人线AV无码免观看| 视频一区视频二区制服丝袜| 色偷偷女人的天堂亚洲网| 亚洲精品日本一区二区| 亚洲日韩av在线观看| 亚洲精品一区二区三区大| 国产福利深夜在线播放| 沂源县| 国产精品国产精品偷麻豆| 国产欧美日韩精品第二区| 精品一区二区不卡无码AV| 久热这里只有精品12| 久99久热免费视频播放| 日日爽日日操| 熟女蜜臀av麻豆一区二区| 精品国产一区av天美传媒| 国产欧美日韩另类在线专区| 绥芬河市| 亚洲av成人在线一区| 亚洲国产性夜夜综合| 国产高清在线不卡一区| 美女扒开奶罩露出奶头视频网站| 国外av片免费看一区二区三区| 亚洲日本韩国欧美云霸高清| 国语对白刺激在线视频国产网红| 一区二区三区鲁丝不卡| 亚洲人成网7777777国产| 亚洲色婷婷一区二区三区| 久久先锋男人AV资源网站| 亚洲视频免费一区二区三区| 日本黄页网站免费观看| 伊人狠狠色丁香婷婷综合| 18禁极品一区二区三区| 国产精品白丝久久AV网站| 亚洲综合一区无码精品| 法库县| 国产不卡一区二区四区|