<output id="qn6qe"></output>

    1. <output id="qn6qe"><tt id="qn6qe"></tt></output>
    2. <strike id="qn6qe"></strike>

      亚洲 日本 欧洲 欧美 视频,日韩中文字幕有码av,一本一道av中文字幕无码,国产线播放免费人成视频播放,人妻少妇偷人无码视频,日夜啪啪一区二区三区,国产尤物精品自在拍视频首页,久热这里只有精品12

      Spark入門(mén)實(shí)戰(zhàn)系列--9.Spark圖計(jì)算GraphX介紹及實(shí)例

      【注】該系列文章以及使用到安裝包/測(cè)試數(shù)據(jù) 可以在《傾情大奉送--Spark入門(mén)實(shí)戰(zhàn)系列》獲取

      1GraphX介紹

      1.1 GraphX應(yīng)用背景

      Spark GraphX是一個(gè)分布式圖處理框架,它是基于Spark平臺(tái)提供對(duì)圖計(jì)算和圖挖掘簡(jiǎn)潔易用的而豐富的接口,極大的方便了對(duì)分布式圖處理的需求。

      眾所周知·,社交網(wǎng)絡(luò)中人與人之間有很多關(guān)系鏈,例如TwitterFacebook、微博和微信等,這些都是大數(shù)據(jù)產(chǎn)生的地方都需要圖計(jì)算,現(xiàn)在的圖處理基本都是分布式的圖處理,而并非單機(jī)處理。Spark GraphX由于底層是基于Spark來(lái)處理的,所以天然就是一個(gè)分布式的圖處理系統(tǒng)。

      圖的分布式或者并行處理其實(shí)是把圖拆分成很多的子圖,然后分別對(duì)這些子圖進(jìn)行計(jì)算,計(jì)算的時(shí)候可以分別迭代進(jìn)行分階段的計(jì)算,即對(duì)圖進(jìn)行并行計(jì)算。下面我們看一下圖計(jì)算的簡(jiǎn)單示例:

      clip_image002

      從圖中我們可以看出:拿到Wikipedia的文檔以后,可以變成Link Table形式的視圖,然后基于Link Table形式的視圖可以分析成Hyperlinks超鏈接,最后我們可以使用PageRank去分析得出Top Communities。在下面路徑中的Editor GraphCommunity,這個(gè)過(guò)程可以稱(chēng)之為Triangle Computation,這是計(jì)算三角形的一個(gè)算法,基于此會(huì)發(fā)現(xiàn)一個(gè)社區(qū)。從上面的分析中我們可以發(fā)現(xiàn)圖計(jì)算有很多的做法和算法,同時(shí)也發(fā)現(xiàn)圖和表格可以做互相的轉(zhuǎn)換。

      1.2  GraphX的框架

      設(shè)計(jì)GraphX時(shí),點(diǎn)分割和GAS都已成熟,在設(shè)計(jì)和編碼中針對(duì)它們進(jìn)行了優(yōu)化,并在功能和性能之間尋找最佳的平衡點(diǎn)。如同Spark本身,每個(gè)子模塊都有一個(gè)核心抽象。GraphX的核心抽象是Resilient Distributed Property Graph,一種點(diǎn)和邊都帶屬性的有向多重圖。它擴(kuò)展了Spark RDD的抽象,有TableGraph兩種視圖,而只需要一份物理存儲(chǔ)。兩種視圖都有自己獨(dú)有的操作符,從而獲得了靈活操作和執(zhí)行效率。

      clip_image004

      如同SparkGraphX的代碼非常簡(jiǎn)潔。GraphX的核心代碼只有3千多行,而在此之上實(shí)現(xiàn)的Pregel模式,只要短短的20多行。GraphX的代碼結(jié)構(gòu)整體下圖所示,其中大部分的實(shí)現(xiàn),都是圍繞Partition的優(yōu)化進(jìn)行的。這在某種程度上說(shuō)明了點(diǎn)分割的存儲(chǔ)和相應(yīng)的計(jì)算優(yōu)化,的確是圖計(jì)算框架的重點(diǎn)和難點(diǎn)。

      1.3 發(fā)展歷程

      l早在0.5版本,Spark就帶了一個(gè)小型的Bagel模塊,提供了類(lèi)似Pregel的功能。當(dāng)然,這個(gè)版本還非常原始,性能和功能都比較弱,屬于實(shí)驗(yàn)型產(chǎn)品。

      l0.8版本時(shí),鑒于業(yè)界對(duì)分布式圖計(jì)算的需求日益見(jiàn)漲,Spark開(kāi)始獨(dú)立一個(gè)分支Graphx-Branch,作為獨(dú)立的圖計(jì)算模塊,借鑒GraphLab,開(kāi)始設(shè)計(jì)開(kāi)發(fā)GraphX

      l0.9版本中,這個(gè)模塊被正式集成到主干,雖然是Alpha版本,但已可以試用,小面包圈Bagel告別舞臺(tái)。1.0版本,GraphX正式投入生產(chǎn)使用。

      clip_image006

      值得注意的是,GraphX目前依然處于快速發(fā)展中,從0.8的分支到0.91.0,每個(gè)版本代碼都有不少的改進(jìn)和重構(gòu)。根據(jù)觀察,在沒(méi)有改任何代碼邏輯和運(yùn)行環(huán)境,只是升級(jí)版本、切換接口和重新編譯的情況下,每個(gè)版本有10%~20%的性能提升。雖然和GraphLab的性能還有一定差距,但憑借Spark整體上的一體化流水線處理,社區(qū)熱烈的活躍度及快速改進(jìn)速度,GraphX具有強(qiáng)大的競(jìng)爭(zhēng)力。

      2、GraphX實(shí)現(xiàn)分析

      如同Spark本身,每個(gè)子模塊都有一個(gè)核心抽象。GraphX的核心抽象是Resilient Distributed Property Graph,一種點(diǎn)和邊都帶屬性的有向多重圖。它擴(kuò)展了Spark RDD的抽象,有TableGraph兩種視圖,而只需要一份物理存儲(chǔ)。兩種視圖都有自己獨(dú)有的操作符,從而獲得了靈活操作和執(zhí)行效率。

      clip_image008

      GraphX的底層設(shè)計(jì)有以下幾個(gè)關(guān)鍵點(diǎn)。

      對(duì)Graph視圖的所有操作,最終都會(huì)轉(zhuǎn)換成其關(guān)聯(lián)的Table視圖的RDD操作來(lái)完成。這樣對(duì)一個(gè)圖的計(jì)算,最終在邏輯上,等價(jià)于一系列RDD的轉(zhuǎn)換過(guò)程。因此,Graph最終具備了RDD3個(gè)關(guān)鍵特性:ImmutableDistributedFault-Tolerant,其中最關(guān)鍵的是Immutable(不變性)。邏輯上,所有圖的轉(zhuǎn)換和操作都產(chǎn)生了一個(gè)新圖;物理上,GraphX會(huì)有一定程度的不變頂點(diǎn)和邊的復(fù)用優(yōu)化,對(duì)用戶透明。

       兩種視圖底層共用的物理數(shù)據(jù),由RDD[Vertex-Partition]RDD[EdgePartition]這兩個(gè)RDD組成。點(diǎn)和邊實(shí)際都不是以表Collection[tuple]的形式存儲(chǔ)的,而是由VertexPartition/EdgePartition在內(nèi)部存儲(chǔ)一個(gè)帶索引結(jié)構(gòu)的分片數(shù)據(jù)塊,以加速不同視圖下的遍歷速度。不變的索引結(jié)構(gòu)在RDD轉(zhuǎn)換過(guò)程中是共用的,降低了計(jì)算和存儲(chǔ)開(kāi)銷(xiāo)。

      clip_image010

      圖的分布式存儲(chǔ)采用點(diǎn)分割模式,而且使用partitionBy方法,由用戶指定不同的劃分策略(PartitionStrategy)。劃分策略會(huì)將邊分配到各個(gè)EdgePartition,頂點(diǎn)Master分配到各個(gè)VertexPartitionEdgePartition也會(huì)緩存本地邊關(guān)聯(lián)點(diǎn)的Ghost副本。劃分策略的不同會(huì)影響到所需要緩存的Ghost副本數(shù)量,以及每個(gè)EdgePartition分配的邊的均衡程度,需要根據(jù)圖的結(jié)構(gòu)特征選取最佳策略。目前有EdgePartition2dEdgePartition1dRandomVertexCutCanonicalRandomVertexCut這四種策略。

      2.1 存儲(chǔ)模式

      2.1.1 圖存儲(chǔ)模式

      巨型圖的存儲(chǔ)總體上有邊分割和點(diǎn)分割兩種存儲(chǔ)方式。2013年,GraphLab2.0將其存儲(chǔ)方式由邊分割變?yōu)辄c(diǎn)分割,在性能上取得重大提升,目前基本上被業(yè)界廣泛接受并使用。

      l邊分割(Edge-Cut:每個(gè)頂點(diǎn)都存儲(chǔ)一次,但有的邊會(huì)被打斷分到兩臺(tái)機(jī)器上。這樣做的好處是節(jié)省存儲(chǔ)空間;壞處是對(duì)圖進(jìn)行基于邊的計(jì)算時(shí),對(duì)于一條兩個(gè)頂點(diǎn)被分到不同機(jī)器上的邊來(lái)說(shuō),要跨機(jī)器通信傳輸數(shù)據(jù),內(nèi)網(wǎng)通信流量大。

      l點(diǎn)分割(Vertex-Cut:每條邊只存儲(chǔ)一次,都只會(huì)出現(xiàn)在一臺(tái)機(jī)器上。鄰居多的點(diǎn)會(huì)被復(fù)制到多臺(tái)機(jī)器上,增加了存儲(chǔ)開(kāi)銷(xiāo),同時(shí)會(huì)引發(fā)數(shù)據(jù)同步問(wèn)題。好處是可以大幅減少內(nèi)網(wǎng)通信量。

      clip_image012

      雖然兩種方法互有利弊,但現(xiàn)在是點(diǎn)分割占上風(fēng),各種分布式圖計(jì)算框架都將自己底層的存儲(chǔ)形式變成了點(diǎn)分割。主要原因有以下兩個(gè)。

      1.磁盤(pán)價(jià)格下降,存儲(chǔ)空間不再是問(wèn)題,而內(nèi)網(wǎng)的通信資源沒(méi)有突破性進(jìn)展,集群計(jì)算時(shí)內(nèi)網(wǎng)帶寬是寶貴的,時(shí)間比磁盤(pán)更珍貴。這點(diǎn)就類(lèi)似于常見(jiàn)的空間換時(shí)間的策略。

      2.在當(dāng)前的應(yīng)用場(chǎng)景中,絕大多數(shù)網(wǎng)絡(luò)都是“無(wú)尺度網(wǎng)絡(luò)”,遵循冪律分布,不同點(diǎn)的鄰居數(shù)量相差非常懸殊。而邊分割會(huì)使那些多鄰居的點(diǎn)所相連的邊大多數(shù)被分到不同的機(jī)器上,這樣的數(shù)據(jù)分布會(huì)使得內(nèi)網(wǎng)帶寬更加捉襟見(jiàn)肘,于是邊分割存儲(chǔ)方式被漸漸拋棄了。

      2.1.2 GraphX存儲(chǔ)模式

      Graphx借鑒PowerGraph,使用的是Vertex-Cut(點(diǎn)分割)方式存儲(chǔ)圖,用三個(gè)RDD存儲(chǔ)圖數(shù)據(jù)信息:

      lVertexTable(id, data)idVertex iddataEdge data

      lEdgeTable(pid, src, dst, data)pidPartion idsrc為原定點(diǎn)iddst為目的頂點(diǎn)id

      lRoutingTable(id, pid)idVertex idpidPartion id

      點(diǎn)分割存儲(chǔ)實(shí)現(xiàn)如下圖所示:

      clip_image014

      2.2 計(jì)算模式

      2.2.1 圖計(jì)算模式

      目前基于圖的并行計(jì)算框架已經(jīng)有很多,比如來(lái)自GooglePregel、來(lái)自Apache開(kāi)源的圖計(jì)算框架Giraph/HAMA以及最為著名的GraphLab,其中PregelHAMAGiraph都是非常類(lèi)似的,都是基于BSPBulk Synchronous Parallell)模式。

      Bulk Synchronous Parallell,即整體同步并行,它將計(jì)算分成一系列的超步(superstep)的迭代(iteration)。從縱向上看,它是一個(gè)串行模式,而從橫向上看,它是一個(gè)并行的模式,每?jī)蓚€(gè)superstep之間設(shè)置一個(gè)柵欄(barrier),即整體同步點(diǎn),確定所有并行的計(jì)算都完成后再啟動(dòng)下一輪superstep

      clip_image015

      每一個(gè)超步(superstep)包含三部分內(nèi)容:

      1.計(jì)算compute:每一個(gè)processor利用上一個(gè)superstep傳過(guò)來(lái)的消息和本地的數(shù)據(jù)進(jìn)行本地計(jì)算;

      2.消息傳遞:每一個(gè)processor計(jì)算完畢后,將消息傳遞個(gè)與之關(guān)聯(lián)的其它processors

      3.整體同步點(diǎn):用于整體同步,確定所有的計(jì)算和消息傳遞都進(jìn)行完畢后,進(jìn)入下一個(gè)superstep

      2.2.2GraphX計(jì)算模式

      如同Spark一樣,GraphXGraph類(lèi)提供了豐富的圖運(yùn)算符,大致結(jié)構(gòu)如下圖所示。可以在官方GraphX Programming Guide中找到每個(gè)函數(shù)的詳細(xì)說(shuō)明,本文僅講述幾個(gè)需要注意的方法。

      clip_image017

      2.2.2.1 圖的緩存

      每個(gè)圖是由3個(gè)RDD組成,所以會(huì)占用更多的內(nèi)存。相應(yīng)圖的cacheunpersistcheckpoint,更需要注意使用技巧。出于最大限度復(fù)用邊的理念,GraphX的默認(rèn)接口只提供了unpersistVertices方法。如果要釋放邊,調(diào)用g.edges.unpersist()方法才行,這給用戶帶來(lái)了一定的不便,但為GraphX的優(yōu)化提供了便利和空間。參考GraphXPregel代碼,對(duì)一個(gè)大圖,目前最佳的實(shí)踐是:

      clip_image018

      大體之意是根據(jù)GraphXGraph的不變性,對(duì)g做操作并賦回給g之后,g已不是原來(lái)的g了,而且會(huì)在下一輪迭代使用,所以必須cache。另外,必須先用prevG保留住對(duì)原來(lái)圖的引用,并在新圖產(chǎn)生后,快速將舊圖徹底釋放掉。否則,十幾輪迭代后,會(huì)有內(nèi)存泄漏問(wèn)題,很快耗光作業(yè)緩存空間。

      2.2.2.2 鄰邊聚合

      mrTripletsmapReduceTriplets)是GraphX中最核心的一個(gè)接口。Pregel也基于它而來(lái),所以對(duì)它的優(yōu)化能很大程度上影響整個(gè)GraphX的性能。mrTriplets運(yùn)算符的簡(jiǎn)化定義是:

      clip_image019

      它的計(jì)算過(guò)程為:map,應(yīng)用于每一個(gè)Triplet上,生成一個(gè)或者多個(gè)消息,消息以Triplet關(guān)聯(lián)的兩個(gè)頂點(diǎn)中的任意一個(gè)或兩個(gè)為目標(biāo)頂點(diǎn);reduce,應(yīng)用于每一個(gè)Vertex上,將發(fā)送給每一個(gè)頂點(diǎn)的消息合并起來(lái)。

      mrTriplets最后返回的是一個(gè)VertexRDD[A],包含每一個(gè)頂點(diǎn)聚合之后的消息(類(lèi)型為A),沒(méi)有接收到消息的頂點(diǎn)不會(huì)包含在返回的VertexRDD中。

      在最近的版本中,GraphX針對(duì)它進(jìn)行了一些優(yōu)化,對(duì)于Pregel以及所有上層算法工具包的性能都有重大影響。主要包括以下幾點(diǎn)。

      1. Caching for Iterative mrTriplets & Incremental Updates for Iterative mrTriplets:在很多圖分析算法中,不同點(diǎn)的收斂速度變化很大。在迭代后期,只有很少的點(diǎn)會(huì)有更新。因此,對(duì)于沒(méi)有更新的點(diǎn),下一次mrTriplets計(jì)算時(shí)EdgeRDD無(wú)需更新相應(yīng)點(diǎn)值的本地緩存,大幅降低了通信開(kāi)銷(xiāo)。

      2.Indexing Active Edges:沒(méi)有更新的頂點(diǎn)在下一輪迭代時(shí)不需要向鄰居重新發(fā)送消息。因此,mrTriplets遍歷邊時(shí),如果一條邊的鄰居點(diǎn)值在上一輪迭代時(shí)沒(méi)有更新,則直接跳過(guò),避免了大量無(wú)用的計(jì)算和通信。

      3.Join EliminationTriplet是由一條邊和其兩個(gè)鄰居點(diǎn)組成的三元組,操作Tripletmap函數(shù)常常只需訪問(wèn)其兩個(gè)鄰居點(diǎn)值中的一個(gè)。例如,在PageRank計(jì)算中,一個(gè)點(diǎn)值的更新只與其源頂點(diǎn)的值有關(guān),而與其所指向的目的頂點(diǎn)的值無(wú)關(guān)。那么在mrTriplets計(jì)算中,就不需要VertexRDDEdgeRDD3-way join,而只需要2-way join

      所有這些優(yōu)化使GraphX的性能逐漸逼近GraphLab。雖然還有一定差距,但一體化的流水線服務(wù)和豐富的編程接口,可以彌補(bǔ)性能的微小差距。

      2.2.2.3 進(jìn)化的Pregel模式

      GraphX中的Pregel接口,并不嚴(yán)格遵循Pregel模式,它是一個(gè)參考GAS改進(jìn)的Pregel模式。定義如下:

      clip_image020

      這種基于mrTrilets方法的Pregel模式,與標(biāo)準(zhǔn)Pregel的最大區(qū)別是,它的第2段參數(shù)體接收的是3個(gè)函數(shù)參數(shù),而不接收messageList。它不會(huì)在單個(gè)頂點(diǎn)上進(jìn)行消息遍歷,而是將頂點(diǎn)的多個(gè)Ghost副本收到的消息聚合后,發(fā)送給Master副本,再使用vprog函數(shù)來(lái)更新點(diǎn)值。消息的接收和發(fā)送都被自動(dòng)并行化處理,無(wú)需擔(dān)心超級(jí)節(jié)點(diǎn)的問(wèn)題。

      常見(jiàn)的代碼模板如下所示:

      clip_image021

      可以看到,GraphX設(shè)計(jì)這個(gè)模式的用意。它綜合了PregelGAS兩者的優(yōu)點(diǎn),即接口相對(duì)簡(jiǎn)單,又保證性能,可以應(yīng)對(duì)點(diǎn)分割的圖存儲(chǔ)模式,勝任符合冪律分布的自然圖的大型計(jì)算。另外,值得注意的是,官方的Pregel版本是最簡(jiǎn)單的一個(gè)版本。對(duì)于復(fù)雜的業(yè)務(wù)場(chǎng)景,根據(jù)這個(gè)版本擴(kuò)展一個(gè)定制的Pregel是很常見(jiàn)的做法。

      2.2.2.4 圖算法工具包

      GraphX也提供了一套圖算法工具包,方便用戶對(duì)圖進(jìn)行分析。目前最新版本已支持PageRank、數(shù)三角形、最大連通圖和最短路徑等6種經(jīng)典的圖算法。這些算法的代碼實(shí)現(xiàn),目的和重點(diǎn)在于通用性。如果要獲得最佳性能,可以參考其實(shí)現(xiàn)進(jìn)行修改和擴(kuò)展?jié)M足業(yè)務(wù)需求。另外,研讀這些代碼,也是理解GraphX編程最佳實(shí)踐的好方法。

      3GraphX實(shí)例

      3.1  圖例演示

      3.1.1 例子介紹

      下圖中有6個(gè)人,每個(gè)人有名字和年齡,這些人根據(jù)社會(huì)關(guān)系形成8條邊,每條邊有其屬性。在以下例子演示中將構(gòu)建頂點(diǎn)、邊和圖,打印圖的屬性、轉(zhuǎn)換操作、結(jié)構(gòu)操作、連接操作、聚合操作,并結(jié)合實(shí)際要求進(jìn)行演示。

      clip_image023

      3.1.2 程序代碼

      import org.apache.log4j.{Level, Logger}

      import org.apache.spark.{SparkContext, SparkConf}

      import org.apache.spark.graphx._

      import org.apache.spark.rdd.RDD

       

      object GraphXExample {

        def main(args: Array[String]) {

          //屏蔽日志

          Logger.getLogger("org.apache.spark").setLevel(Level.WARN)

          Logger.getLogger("org.eclipse.jetty.server").setLevel(Level.OFF)

       

          //設(shè)置運(yùn)行環(huán)境

          val conf = new SparkConf().setAppName("SimpleGraphX").setMaster("local")

          val sc = new SparkContext(conf)

       

          //設(shè)置頂點(diǎn)和邊,注意頂點(diǎn)和邊都是用元組定義的Array

          //頂點(diǎn)的數(shù)據(jù)類(lèi)型是VD:(String,Int)

          val vertexArray = Array(

            (1L, ("Alice", 28)),

            (2L, ("Bob", 27)),

            (3L, ("Charlie", 65)),

            (4L, ("David", 42)),

            (5L, ("Ed", 55)),

            (6L, ("Fran", 50))

          )

          //邊的數(shù)據(jù)類(lèi)型ED:Int

          val edgeArray = Array(

            Edge(2L, 1L, 7),

            Edge(2L, 4L, 2),

            Edge(3L, 2L, 4),

            Edge(3L, 6L, 3),

            Edge(4L, 1L, 1),

            Edge(5L, 2L, 2),

            Edge(5L, 3L, 8),

            Edge(5L, 6L, 3)

          )

       

          //構(gòu)造vertexRDDedgeRDD

          val vertexRDD: RDD[(Long, (String, Int))] = sc.parallelize(vertexArray)

          val edgeRDD: RDD[Edge[Int]] = sc.parallelize(edgeArray)

       

          //構(gòu)造圖Graph[VD,ED]

          val graph: Graph[(String, Int), Int] = Graph(vertexRDD, edgeRDD)

       

          //***********************************************************************************

          //***************************  圖的屬性    ****************************************

          //**********************************************************************************         println("***********************************************")

          println("屬性演示")

          println("**********************************************************")

          println("找出圖中年齡大于30的頂點(diǎn):")

          graph.vertices.filter { case (id, (name, age)) => age > 30}.collect.foreach {

            case (id, (name, age)) => println(s"$name is $age")

          }

       

          //邊操作:找出圖中屬性大于5的邊

          println("找出圖中屬性大于5的邊:")

      graph.edges.filter(e => e.attr > 5).collect.foreach(e => println(s"${e.srcId} to ${e.dstId} att ${e.attr}"))

          println

       

          //triplets操作,((srcId, srcAttr), (dstId, dstAttr), attr)

          println("列出邊屬性>5tripltes")

          for (triplet <- graph.triplets.filter(t => t.attr > 5).collect) {

            println(s"${triplet.srcAttr._1} likes ${triplet.dstAttr._1}")

          }

          println

       

          //Degrees操作

          println("找出圖中最大的出度、入度、度數(shù):")

          def max(a: (VertexId, Int), b: (VertexId, Int)): (VertexId, Int) = {

            if (a._2 > b._2) a else b

          }

          println("max of outDegrees:" + graph.outDegrees.reduce(max) + " max of inDegrees:" + graph.inDegrees.reduce(max) + " max of Degrees:" + graph.degrees.reduce(max))

          println

         

          //***********************************************************************************

          //***************************  轉(zhuǎn)換操作    ****************************************

          //**********************************************************************************  

          println("**********************************************************")

          println("轉(zhuǎn)換操作")

          println("**********************************************************")

          println("頂點(diǎn)的轉(zhuǎn)換操作,頂點(diǎn)age + 10")

          graph.mapVertices{ case (id, (name, age)) => (id, (name, age+10))}.vertices.collect.foreach(v => println(s"${v._2._1} is ${v._2._2}"))

          println

          println("邊的轉(zhuǎn)換操作,邊的屬性*2")

          graph.mapEdges(e=>e.attr*2).edges.collect.foreach(e => println(s"${e.srcId} to ${e.dstId} att ${e.attr}"))

          println

        

            //***********************************************************************************

          //***************************  結(jié)構(gòu)操作    ****************************************

          //**********************************************************************************  

          println("**********************************************************")

          println("結(jié)構(gòu)操作")

          println("**********************************************************")

          println("頂點(diǎn)年紀(jì)>30的子圖:")

          val subGraph = graph.subgraph(vpred = (id, vd) => vd._2 >= 30)

          println("子圖所有頂點(diǎn):")

          subGraph.vertices.collect.foreach(v => println(s"${v._2._1} is ${v._2._2}"))

          println

          println("子圖所有邊:")

          subGraph.edges.collect.foreach(e => println(s"${e.srcId} to ${e.dstId} att ${e.attr}"))

          println

       

         

            //***********************************************************************************

          //***************************  連接操作    ****************************************

          //**********************************************************************************  

          println("**********************************************************")

          println("連接操作")

          println("**********************************************************")

          val inDegrees: VertexRDD[Int] = graph.inDegrees

          case class User(name: String, age: Int, inDeg: Int, outDeg: Int)

       

          //創(chuàng)建一個(gè)新圖,頂點(diǎn)VD的數(shù)據(jù)類(lèi)型為User,并從graph做類(lèi)型轉(zhuǎn)換

          val initialUserGraph: Graph[User, Int] = graph.mapVertices { case (id, (name, age)) => User(name, age, 0, 0)}

       

          //initialUserGraphinDegreesoutDegreesRDD)進(jìn)行連接,并修改initialUserGraphinDeg值、outDeg

          val userGraph = initialUserGraph.outerJoinVertices(initialUserGraph.inDegrees) {

            case (id, u, inDegOpt) => User(u.name, u.age, inDegOpt.getOrElse(0), u.outDeg)

          }.outerJoinVertices(initialUserGraph.outDegrees) {

            case (id, u, outDegOpt) => User(u.name, u.age, u.inDeg,outDegOpt.getOrElse(0))

          }

       

          println("連接圖的屬性:")

      userGraph.vertices.collect.foreach(v => println(s"${v._2.name} inDeg: ${v._2.inDeg}  outDeg: ${v._2.outDeg}"))

          println

       

          println("出度和入讀相同的人員:")

          userGraph.vertices.filter {

            case (id, u) => u.inDeg == u.outDeg

          }.collect.foreach {

            case (id, property) => println(property.name)

          }

          println

       

            //***********************************************************************************

          //***************************  聚合操作    ****************************************

          //**********************************************************************************  

          println("**********************************************************")

          println("聚合操作")

          println("**********************************************************")

          println("找出年紀(jì)最大的追求者:")

          val oldestFollower: VertexRDD[(String, Int)] = userGraph.mapReduceTriplets[(String, Int)](

            // 將源頂點(diǎn)的屬性發(fā)送給目標(biāo)頂點(diǎn),map過(guò)程

            edge => Iterator((edge.dstId, (edge.srcAttr.name, edge.srcAttr.age))),

            // 得到最大追求者,reduce過(guò)程

            (a, b) => if (a._2 > b._2) a else b

          )

       

          userGraph.vertices.leftJoin(oldestFollower) { (id, user, optOldestFollower) =>

            optOldestFollower match {

              case None => s"${user.name} does not have any followers."

              case Some((name, age)) => s"${name} is the oldest follower of ${user.name}."

            }

          }.collect.foreach { case (id, str) => println(str)}

          println

       

           //***********************************************************************************

          //***************************  實(shí)用操作    ****************************************

          //**********************************************************************************

          println("**********************************************************")

          println("聚合操作")

          println("**********************************************************")

          println("找出5到各頂點(diǎn)的最短:")

          val sourceId: VertexId = 5L // 定義源點(diǎn)

          val initialGraph = graph.mapVertices((id, _) => if (id == sourceId) 0.0 else Double.PositiveInfinity)

          val sssp = initialGraph.pregel(Double.PositiveInfinity)(

            (id, dist, newDist) => math.min(dist, newDist),

            triplet => {  // 計(jì)算權(quán)重

              if (triplet.srcAttr + triplet.attr < triplet.dstAttr) {

                Iterator((triplet.dstId, triplet.srcAttr + triplet.attr))

              } else {

                Iterator.empty

              }

            },

            (a,b) => math.min(a,b) // 最短距離

          )

          println(sssp.vertices.collect.mkString("\n"))

       

          sc.stop()

        }

      }

      3.1.3 運(yùn)行結(jié)果

      IDEA(如何使用IDEA參見(jiàn)第3課《3.Spark編程模型(下)--IDEA搭建及實(shí)戰(zhàn)》)中首先對(duì)GraphXExample.scala代碼進(jìn)行編譯,編譯通過(guò)后進(jìn)行執(zhí)行,執(zhí)行結(jié)果如下:

      **********************************************************

      屬性演示

      **********************************************************

      找出圖中年齡大于30的頂點(diǎn):

      David is 42

      Fran is 50

      Charlie is 65

      Ed is 55

      找出圖中屬性大于5的邊:

      2 to 1 att 7

      5 to 3 att 8

       

      列出邊屬性>5tripltes

      Bob likes Alice

      Ed likes Charlie

       

      找出圖中最大的出度、入度、度數(shù):

      max of outDegrees:(5,3) max of inDegrees:(2,2) max of Degrees:(2,4)

       

      **********************************************************

      轉(zhuǎn)換操作

      **********************************************************

      頂點(diǎn)的轉(zhuǎn)換操作,頂點(diǎn)age + 10

      4 is (David,52)

      1 is (Alice,38)

      6 is (Fran,60)

      3 is (Charlie,75)

      5 is (Ed,65)

      2 is (Bob,37)

       

      邊的轉(zhuǎn)換操作,邊的屬性*2

      2 to 1 att 14

      2 to 4 att 4

      3 to 2 att 8

      3 to 6 att 6

      4 to 1 att 2

      5 to 2 att 4

      5 to 3 att 16

      5 to 6 att 6

       

      **********************************************************

      結(jié)構(gòu)操作

      **********************************************************

      頂點(diǎn)年紀(jì)>30的子圖:

      子圖所有頂點(diǎn):

      David is 42

      Fran is 50

      Charlie is 65

      Ed is 55

       

      子圖所有邊:

      3 to 6 att 3

      5 to 3 att 8

      5 to 6 att 3

       

      **********************************************************

      連接操作

      **********************************************************

      連接圖的屬性:

      David inDeg: 1  outDeg: 1

      Alice inDeg: 2  outDeg: 0

      Fran inDeg: 2  outDeg: 0

      Charlie inDeg: 1  outDeg: 2

      Ed inDeg: 0  outDeg: 3

      Bob inDeg: 2  outDeg: 2

       

      出度和入讀相同的人員:

      David

      Bob

       

      **********************************************************

      聚合操作

      **********************************************************

      找出年紀(jì)最大的追求者:

      Bob is the oldest follower of David.

      David is the oldest follower of Alice.

      Charlie is the oldest follower of Fran.

      Ed is the oldest follower of Charlie.

      Ed does not have any followers.

      Charlie is the oldest follower of Bob.

       

      **********************************************************

      實(shí)用操作

      **********************************************************

      找出5到各頂點(diǎn)的最短:

      (4,4.0)

      (1,5.0)

      (6,3.0)

      (3,8.0)

      (5,0.0)

      (2,2.0)

      clip_image025

      3.2 PageRank 演示

      3.2.1 例子介紹

      PageRank, 即網(wǎng)頁(yè)排名,又稱(chēng)網(wǎng)頁(yè)級(jí)別、Google 左側(cè)排名或佩奇排名。它是Google 創(chuàng)始人拉里· 佩奇和謝爾蓋· 布林于1997 年構(gòu)建早期的搜索系統(tǒng)原型時(shí)提出的鏈接分析算法。目前很多重要的鏈接分析算法都是在PageRank 算法基礎(chǔ)上衍生出來(lái)的。PageRank Google 用于用來(lái)標(biāo)識(shí)網(wǎng)頁(yè)的等級(jí)/ 重要性的一種方法,是Google 用來(lái)衡量一個(gè)網(wǎng)站的好壞的唯一標(biāo)準(zhǔn)。在揉合了諸如Title 標(biāo)識(shí)和Keywords 標(biāo)識(shí)等所有其它因素之后, Google 通過(guò)PageRank 來(lái)調(diào)整結(jié)果,使那些更具“等級(jí)/ 重要性”的網(wǎng)頁(yè)在搜索結(jié)果中令網(wǎng)站排名獲得提升,從而提高搜索結(jié)果的相關(guān)性和質(zhì)量。

      clip_image027

      3.2.2 測(cè)試數(shù)據(jù)

      在這里測(cè)試數(shù)據(jù)為頂點(diǎn)數(shù)據(jù)graphx-wiki-vertices.txt和邊數(shù)據(jù)graphx-wiki-edges.txt,可以在本系列附帶資源/data/class9/目錄中找到這兩個(gè)數(shù)據(jù)文件,其中格式為:

      l  頂點(diǎn)為頂點(diǎn)編號(hào)和網(wǎng)頁(yè)標(biāo)題

      clip_image029

      l  邊數(shù)據(jù)由兩個(gè)頂點(diǎn)構(gòu)成

      clip_image031

      3.2.3 程序代碼

      import org.apache.log4j.{Level, Logger}

      import org.apache.spark.{SparkContext, SparkConf}

      import org.apache.spark.graphx._

      import org.apache.spark.rdd.RDD

       

      object PageRank {

        def main(args: Array[String]) {

          //屏蔽日志

          Logger.getLogger("org.apache.spark").setLevel(Level.WARN)

          Logger.getLogger("org.eclipse.jetty.server").setLevel(Level.OFF)

       

          //設(shè)置運(yùn)行環(huán)境

          val conf = new SparkConf().setAppName("PageRank").setMaster("local")

          val sc = new SparkContext(conf)

       

          //讀入數(shù)據(jù)文件

          val articles: RDD[String] = sc.textFile("/home/hadoop/IdeaProjects/data/graphx/graphx-wiki-vertices.txt")

          val links: RDD[String] = sc.textFile("/home/hadoop/IdeaProjects/data/graphx/graphx-wiki-edges.txt")

       

          //裝載頂點(diǎn)和邊

          val vertices = articles.map { line =>

            val fields = line.split('\t')

            (fields(0).toLong, fields(1))

          }

       

          val edges = links.map { line =>

            val fields = line.split('\t')

            Edge(fields(0).toLong, fields(1).toLong, 0)

          }

       

          //cache操作

          //val graph = Graph(vertices, edges, "").persist(StorageLevel.MEMORY_ONLY_SER)

          val graph = Graph(vertices, edges, "").persist()

          //graph.unpersistVertices(false)

       

          //測(cè)試

          println("**********************************************************")

          println("獲取5個(gè)triplet信息")

          println("**********************************************************")

          graph.triplets.take(5).foreach(println(_))

       

          //pageRank算法里面的時(shí)候使用了cache(),故前面persist的時(shí)候只能使用MEMORY_ONLY

          println("**********************************************************")

          println("PageRank計(jì)算,獲取最有價(jià)值的數(shù)據(jù)")

          println("**********************************************************")

          val prGraph = graph.pageRank(0.001).cache()

       

          val titleAndPrGraph = graph.outerJoinVertices(prGraph.vertices) {

            (v, title, rank) => (rank.getOrElse(0.0), title)

          }

       

          titleAndPrGraph.vertices.top(10) {

            Ordering.by((entry: (VertexId, (Double, String))) => entry._2._1)

          }.foreach(t => println(t._2._2 + ": " + t._2._1))

       

          sc.stop()

        }

      }

      3.2.4 運(yùn)行結(jié)果

      IDEA中首先對(duì)PageRank.scala代碼進(jìn)行編譯,編譯通過(guò)后進(jìn)行執(zhí)行,執(zhí)行結(jié)果如下:

      **********************************************************

      獲取5個(gè)triplet信息

      **********************************************************

      ((146271392968588,Computer Consoles Inc.),(7097126743572404313,Berkeley Software Distribution),0)

      ((146271392968588,Computer Consoles Inc.),(8830299306937918434,University of California, Berkeley),0)

      ((625290464179456,List of Penguin Classics),(1735121673437871410,George Berkeley),0)

      ((1342848262636510,List of college swimming and diving teams),(8830299306937918434,University of California, Berkeley),0)

      ((1889887370673623,Anthony Pawson),(8830299306937918434,University of California, Berkeley),0)

       

      **********************************************************

      PageRank計(jì)算,獲取最有價(jià)值的數(shù)據(jù)

      **********************************************************

      University of California, Berkeley: 1321.111754312097

      Berkeley, California: 664.8841977233583

      Uc berkeley: 162.50132743397873

      Berkeley Software Distribution: 90.4786038848606

      Lawrence Berkeley National Laboratory: 81.90404939641944

      George Berkeley: 81.85226118457985

      Busby Berkeley: 47.871998218019655

      Berkeley Hills: 44.76406979519754

      Xander Berkeley: 30.324075347288037

      Berkeley County, South Carolina: 28.908336483710308

      clip_image033

      4參考資料

      (1)GraphX:基于Spark的彈性分布式圖計(jì)算系統(tǒng)》 http://lidrema.blog.163.com/blog/static/20970214820147199643788/

      (2)《快刀初試:Spark GraphX在淘寶的實(shí)踐》 http://www.csdn.net/article/2014-08-07/2821097

       

      posted @ 2015-09-14 08:59  shishanyuan  閱讀(56529)  評(píng)論(1)    收藏  舉報(bào)
      主站蜘蛛池模板: 嫩草欧美曰韩国产大片| 精品无码中文视频在线观看| 国产国语一级毛片| 亚洲欧美日韩国产精品一区二区| 久久精品国产福利一区二区| 久久天天躁狠狠躁夜夜av不卡| 成年女人黄小视频| 国产在线观看播放av| 久久国产精品成人影院| 国产自拍偷拍视频在线观看| 成人午夜在线观看刺激| 久久久久久久久18禁秘| 免费人妻无码不卡中文18禁| 国产精品呻吟一区二区三区| 亚洲一区二区三区久久综合| 国产成人午夜福利在线播放| 亚洲日韩精品一区二区三区无码| 免费无码一区无码东京热| 亚洲2017天堂色无码| 久久精品国产一区二区三区| 少妇上班人妻精品偷人| 狼色精品人妻在线视频| 国产精品十八禁在线观看| 久久精品国产99国产精品严洲 | 精品乱码一区二区三四五区| 岚皋县| 国产午夜伦伦午夜伦无码| 国产精品成人久久电影| 成人乱码一区二区三区av| 午夜欧美精品久久久久久久 | www内射国产在线观看| 好男人社区在线www| 巨爆乳中文字幕爆乳区| 人妻中出无码一区二区三区| 亚洲国产精品va在线观看麻豆 | 伊人久久综合无码成人网| 日韩精品区一区二区三vr| 亚洲gv天堂无码男同在线观看| 国内精品久久人妻无码不卡| 蜜芽久久人人超碰爱香蕉| 国产午夜福利在线观看播放|