<output id="qn6qe"></output>

    1. <output id="qn6qe"><tt id="qn6qe"></tt></output>
    2. <strike id="qn6qe"></strike>

      亚洲 日本 欧洲 欧美 视频,日韩中文字幕有码av,一本一道av中文字幕无码,国产线播放免费人成视频播放,人妻少妇偷人无码视频,日夜啪啪一区二区三区,国产尤物精品自在拍视频首页,久热这里只有精品12

      分布式機器學習:PageRank算法的并行化實現(PySpark)

      算法的完整實現代碼我已經上傳到了GitHub倉庫:Distributed-ML-PySpark(包括其它分布式機器學習算法),感興趣的童鞋可以前往查看。

      1 PageRank的兩種串行迭代求解算法

      我們在博客《數值分析:冪迭代和PageRank算法(Numpy實現)》算法中提到過用冪法求解PageRank。
      給定有向圖

      我們可以寫出其馬爾科夫概率轉移矩陣\(M\)(第\(i\)列對應對\(i\)節點的鄰居并沿列歸一化)

      \[\left(\begin{array}{lll} 0 & 0 & 1 \\ \frac{1}{2} & 0 & 0 \\ \frac{1}{2} & 1 & 0 \end{array}\right) \]

      然后我們定義Google矩陣為

      \[G=\frac{q}{n} E+(1-q) M \]

      此處\(q\)為上網者從一個頁面轉移到另一個隨機頁面的概率(一般為0.15),\(1-q\) 為點擊當前頁面上鏈接的概率,\(E\)為元素全1的\(n\times n\) 矩陣( \(n\) 為節點個數)。

      而PageRank算法可以視為求解Google矩陣占優特征值(對于隨機矩陣而言,即1)對應的特征向量。設初始化Rank向量為 \(x\)\(x_i\) 為頁面\(i\)的Rank值),則我們可以采用冪法來求解:

      \[x_{t+1}=G x_{t} \]

      (每輪迭代后要歸一化)

      現實場景下的圖大多是稀疏圖,即\(M\)是稀疏矩陣。冪法中計算 \((1-q)Mx_t\) ,對于節點 \(i\) 需使用reduceByKey()(key為節點編號)操作。計算 \(\frac{q}{n}{E}x_t\) 則需要對所有節點的Rank進行reduce()操作,操作頗為繁復。

      PageRank還有一種求解算法(名字就叫“迭代算法”),它的迭代形式如下:

      \[x_{t+1} = \frac{q}{n}\bm{1} + (1-q)Mx_t \]

      可以看到,這種迭代方法就規避了計算 \(\frac{q}{n}Ex_t\),通信開銷更小。我們接下來就采用這種迭代形式。

      2 圖劃分的兩種方法

      目前對圖算法進行并行化的主要思想是將大圖切分為多個子圖,然后將這些子圖分布到不同的機器上進行并行計算,在必要時進行跨機器通信同步計算得出結果。學術界和工業界提出了多種將大圖切分為子圖的劃分方法,主要包括兩種,邊劃分(Edge Cut)和點劃分(Vertex Cut)。

      2.1 邊劃分

      如下圖所示,邊劃分是對圖中某些邊進行切分。具體在Pregel[1]圖計算框架中,每個分區包含一些節點和節點的出邊;在GraphLab[2]圖計算框架中,每個分區包含一些節點、節點的出邊和入邊,以及這些節點的鄰居節點。邊劃分的優點是可以保留節點的鄰居信息,缺點是容易出現劃分不平衡,如對于度很高的節點,其關聯的邊都被劃分到一個分區中,造成其他分區中的邊可能很少。另外,如下圖最右邊的圖所示,邊劃分可能存在邊冗余。

      2.2 點劃分

      如下圖所示,點劃分是對圖中某些點進行切分,得到多個圖分區,每個分區包含一部分邊,以及與邊相關聯的節點。具體地,PowerGraph[3],GraphX[4]等框架采用點劃分,被劃分的節點存在多個分區中。點劃分的優缺點與邊劃分的優缺點正好相反,可以將邊較為平均地分配到不同機器中,但沒有保留節點的鄰居關系。

      總而言之,邊劃分將節點分布到不同機器中(可能劃分不平衡),而點劃分將邊分布到不同機器中(劃分較為平衡)。接下來我們使用的算法為類似Pregel的劃分方式,使用邊劃分。我們下面的算法是簡化版,沒有處理懸掛節點的問題。

      3 對迭代算法的并行化

      我們將Rank向量用均勻分布初始化(也可以用全1初始化,不過就不再以概率分布的形式呈現),設分區數為3,算法總體迭代流程可以表示如下:

      注意,圖中flatMap()步驟中,節點\(i\)計算其contribution(貢獻度):\((x_t)_i/|\mathcal{N}_i|\),并將貢獻度發送到鄰居集合\(\mathcal{N}_i\)中的每一個節點。之后,將所有節點收到的貢獻度使用reduceByKey()(節點編號為key)規約后得到向量\(\hat{x}\),和串行算法中\(Mx_t\)的對應關系如下圖所示:

      并按照公式\(x_{t+1} = \frac{q}{n} + (1-q)\hat{x}\)來計算節點的Rank向量。然后繼續下一輪的迭代過程。

      4 編程實現

      用PySpark對PageRank進行并行化編程實現,代碼如下:

      import sys
      from operator import add
      from typing import Iterable, Tuple
      from pyspark.resultiterable import ResultIterable
      from pyspark.sql import SparkSession
      import os
      
      os.environ['PYSPARK_PYTHON'] = sys.executable
      
      n_threads = 4  # Number of local threads
      n_iterations = 10  # Number of iterations
      q = 0.15 #the default value of q is 0.15
      
      def computeContribs(neighbors: ResultIterable[int], rank: float) -> Iterable[Tuple[int, float]]:
          # Calculates the contribution(rank/num_neighbors) of each vertex, and send it to its neighbours.
          num_neighbors = len(neighbors)
          for vertex in neighbors:
              yield (vertex, rank / num_neighbors)
      
      if __name__ == "__main__":
          # Initialize the spark context.
          spark = SparkSession\
              .builder\
              .appName("PageRank")\
              .master("local[%d]" % n_threads)\
              .getOrCreate()
      
          # link: (source_id, dest_id)
          links = spark.sparkContext.parallelize(
              [(1, 2), (1, 3), (2, 3), (3, 1)],
          )                       
      
          # drop duplicate links and convert links to an adjacency list.
          adj_list = links.distinct().groupByKey().cache()
      
          # count the number of vertexes
          n_vertexes = adj_list.count()
      
          # init the rank of each vertex, the default is 1.0/n_vertexes
          ranks = adj_list.map(lambda vertex_neighbors: (vertex_neighbors[0], 1.0/n_vertexes))
      
          # Calculates and updates vertex ranks continuously using PageRank algorithm.
          for t in range(n_iterations):
              # Calculates the contribution(rank/num_neighbors) of each vertex, and send it to its neighbours.
              contribs = adj_list.join(ranks).flatMap(lambda vertex_neighbors_rank: computeContribs(
                  vertex_neighbors_rank[1][0], vertex_neighbors_rank[1][1]  # type: ignore[arg-type]
              ))
      
              # Re-calculates rank of each vertex based on the contributions it received
              ranks = contribs.reduceByKey(add).mapValues(lambda rank: q/n_vertexes + (1 - q)*rank)
      
          # Collects all ranks of vertexs and dump them to console.
          for (vertex, rank) in ranks.collect():
              print("%s has rank: %s." % (vertex, rank))
      
          spark.stop()
      

      運行結果如下:

      1 has rank: 0.38891305880091237.  
      2 has rank: 0.214416470596171.
      3 has rank: 0.3966704706029163.
      

      該Rank向量與我們采用串行冪法得到的Rank向量 \(R=(0.38779177,0.21480614,0.39740209)^{T}\) 近似相等,說明我們的并行化算法運行正確。

      參考

      • [1] Malewicz G, Austern M H, Bik A J C, et al. Pregel: a system for large-scale graph processing[C]//Proceedings of the 2010 ACM SIGMOD International Conference on Management of data. 2010: 135-146.

      • [2] Low Y, Gonzalez J, Kyrola A, et al. Distributed graphlab: A framework for machine learning in the cloud[J]. arXiv preprint arXiv:1204.6078, 2012.

      • [3] Gonzalez J E, Low Y, Gu H, et al. {PowerGraph}: Distributed {Graph-Parallel} Computation on Natural Graphs[C]//10th USENIX symposium on operating systems design and implementation (OSDI 12). 2012: 17-30.

      • [4] Spark: GraphX Programming Guide

      • [5] GiHub: Spark官方Python樣例

      • [6] 許利杰,方亞芬. 大數據處理框架Apache Spark設計與實現[M]. 電子工業出版社, 2021.

      • [7] Stanford CME 323: Distributed Algorithms and Optimization (Lecture 15)

      • [8] wikipedia: PageRank

      • [9] 李航. 統計學習方法(第2版)[M]. 清華大學出版社, 2019.

      • [10] Timothy sauer. 數值分析(第2版)[M].機械工業出版社, 2018.

      posted @ 2022-06-03 22:06  orion-orion  閱讀(1292)  評論(0)    收藏  舉報
      主站蜘蛛池模板: 久久中文字幕国产精品| 欧美videos粗暴| 国产在线一区二区不卡| 国产高清小视频一区二区| 色综合国产一区二区三区| 丰满少妇被猛烈进入av久久| 惠安县| 欧洲亚洲国内老熟女超碰| 国产综合色在线精品| 久久99日本免费国产精品| 国产精品+日韩精品+在线播放| 苏尼特右旗| 少妇高潮潮喷到猛进猛出小说| 国产一区二区三区av在线无码观看| 国产综合久久久久鬼色| 免费人成在线观看网站| 中文毛片无遮挡高潮免费| 亚洲午夜福利AV一区二区无码| 午夜福利伦伦电影理论片在线观看| 99精品国产精品一区二区| 男人扒女人添高潮视频| 日本久久香蕉一本一道| 亚洲国产欧美在线人成| 粉嫩国产一区二区三区在线| 天堂а√在线中文在线| 国产999久久高清免费观看| 无遮挡高潮国产免费观看| 无码视频伊人| 无套内谢少妇一二三四| 最近中文字幕免费手机版| 黄色A级国产免费大片视频| 久久男人av资源网站| 日本亚洲欧洲免费无线码| 国产四虎永久免费观看| 精品无人码麻豆乱码1区2区| 91老肥熟女九色老女人| 69人妻精品中文字幕| 国产无遮挡又黄又爽高潮| 免费天堂无码人妻成人av电影| 天天爽夜夜爱| 少妇熟女视频一区二区三区|