<output id="qn6qe"></output>

    1. <output id="qn6qe"><tt id="qn6qe"></tt></output>
    2. <strike id="qn6qe"></strike>

      亚洲 日本 欧洲 欧美 视频,日韩中文字幕有码av,一本一道av中文字幕无码,国产线播放免费人成视频播放,人妻少妇偷人无码视频,日夜啪啪一区二区三区,国产尤物精品自在拍视频首页,久热这里只有精品12

      Stream4Graph:動態圖上的增量計算

      作者:張奇

      眾所周知,當我們需要對數據做關聯性分析的時候,一般會采用表連接(SQL join)的方式完成。但是SQL join時的笛卡爾積計算需要維護大量的中間結果,從而對整體的數據分析性能帶來巨大影響。相比而言,基于圖的方式維護數據的關聯性,原本的關聯性分析可以轉換為圖上的遍歷操作,從而大幅降低數據分析的成本。

      然而,隨著數據規模的不斷增長,以及對數據處理更強的實時性需求,如何高效地解決大規模圖數據上的實時計算問題,就變得越來越緊迫。傳統的計算引擎,如Spark、Flink對于圖數據的處理已經逐漸不能滿足業務日益增長的訴求,因此設計一套面向大規模圖數據的實時處理引擎,將會對大數據處理技術革新帶來巨大的幫助。

      螞蟻圖計算團隊開源的流圖計算引擎GeaFlow,結合了圖處理和流處理的技術優勢,實現了動態圖上的增量計算能力,在高性能關聯性分析的基礎上,進一步提升了圖計算的實時性。接下來向大家介紹圖計算技術的特點,業內如何解決大規模實時圖計算問題,以及GeaFlow在動態圖上的計算性能表現。

      1. 圖計算

      圖是一種數學結構,由節點和邊組成。節點代表各種實體,比如人、地點、事物或概念,而邊則表示這些節點之間的關系。例如:

      • 社交媒體:節點可以代表用戶,邊可以表示朋友關系。
      • 網頁:節點代表網頁,邊代表超鏈接。
      • 交通網絡:節點代表城市,邊代表道路或航線。

      圖本身代表了節點與節點之間的鏈接關系,而針對這些關系,我們可以利用圖中的節點和邊來進行信息處理、分析和挖掘,幫助我們理解復雜系統中的關系和模式。在圖上開展的計算活動就是圖計算。圖計算有很多應用場景,比如通過社交網絡分析可以識別用戶之間的聯系,發現社群結構;通過分析網頁間的鏈接關系來計算網頁排名;通過用戶的行為和偏好構建關系圖,推薦相關內容和產品。

      我們就以簡單的社交網絡分析算法,弱聯通分量(Weakly Connected Components, WCC)為例。弱聯通分量可以幫助我們識別用戶之間的“朋友圈”或“社區”,比如某個社交平臺上,一群用戶通過點贊、評論或關注形成一個大的弱聯通分量,而某些用戶可能沒有連接到這個大分量,形成更小的弱聯通分量。

      如果僅僅針對上面這張小圖來構建弱聯通分量算法,那么非常簡單,我們只需要在個人PC上構建簡單的點邊結構然后走圖遍歷即可。但如果圖的規模擴展的千億甚至萬億,這時就需要用到大規模分布式圖計算引擎來處理了。

      2. 分布式圖計算:Spark GraphX

      針對圖的處理一般有圖計算引擎和圖數據庫兩大類,圖數據庫有Neo4j?、TigerGraph?等,圖計算引擎有Spark GraphX、Pregel等。在本文我們主要討論圖計算引擎,以Spark GraphX為例,Spark GraphX是Apache Spark的一個組件,專門用于圖計算和圖分析。GraphX結合了Spark的強大數據處理能力與圖計算的靈活性,擴展了 Spark 的核心功能,為用戶提供了一個統一的API,便于處理圖數據。

      那么在Spark GraphX上是如何處理圖算法的呢?GraphX通過引入一種點和邊都附帶屬性的有向多圖擴展了Spark RDD這種抽象數據結構,為用戶提供了一個類似于Pregel計算模型的以點為中心的并行抽象。用戶需要為GraphX提供原始圖graph、初始消息initialMsg、核心計算邏輯vprog、發送消息控制組件sendMsg、合并消息組件mergeMsg,計算開始時,GraphX初始階段會激活所有點進行初始化,然后按照用戶提供的發送消息組件確定接下來向那些點發送消息。在之后的迭代里,只有收到消息的點才會被激活,進行接下來的計算,如此循環往復直到鏈路中沒有被新激活的點或者到達最大迭代次數,最后輸出計算結果。

        def apply[VD: ClassTag, ED: ClassTag, A: ClassTag]
           (graph: Graph[VD, ED],
            initialMsg: A,
            maxIterations: Int = Int.MaxValue,
            activeDirection: EdgeDirection = EdgeDirection.Either)
           (vprog: (VertexId, VD, A) => VD,
            sendMsg: EdgeTriplet[VD, ED] => Iterator[(VertexId, A)],
            mergeMsg: (A, A) => A)
          : Graph[VD, ED]
      {
        var g = graph.mapVertices((vid, vdata) => vprog(vid, vdata, initialMsg))
      
        // compute the messages
        var messages = GraphXUtils.mapReduceTriplets(g, sendMsg, mergeMsg)
      
        // Loop
        var prevG: Graph[VD, ED] = null
          var i = 0
          while (isActiveMessagesNonEmpty && i < maxIterations) {
            // Receive the messages and update the vertices.
            prevG = g
            g = g.joinVertices(messages)(vprog)
            graphCheckpointer.update(g)
      
            // Send new messages, skipping edges where neither side received
            // a message. 
            messages = GraphXUtils.mapReduceTriplets(
              g, sendMsg, mergeMsg, Some((oldMessages, activeDirection)))
          }
      }
      

      總的來說,用戶首先需要將存儲介質中原始的表結構數據轉換為GraphX中的點邊數據類型,然后交給Spark進行處理,這是針對靜態圖進行離線處理。但是我們知道,現實世界中,圖數據的規模和數據內節點之間的關系都不是一成不變的,并且在大數據時代其變化非常快。如何實時高效的處理不斷變化的圖數據(動態圖),是一個值得深思的問題。

      3. 動態圖計算:Spark Streaming

      針對動態圖的處理,常見的解決方案是Spark Streaming框架,它可以從很多數據源消費數據并對數據進行處理。它是是Spark核心API的一個擴展,可以實現高吞吐量的、具備容錯機制的實時流數據的處理。

      如上圖所示是Spark Streaming對實時數據進行處理的流程。首先Spark中的每個Receiver接收到實時消息流后,對實時消息進行解析和切分,之后將生成的圖數據存儲在每個Executor中。每當數據累積到一定的批次,就會觸發一次全量計算,最后將計算出的結果輸出給用戶,這也稱之為基于快照的圖計算方案。

      但這種方案有一個比較大的缺點,就是它存在著重復計算的問題,假如我們需要以1小時一個窗口做一次計算,那么在使用Spark進行計算時,不僅要將當前窗口的數據計算進去,歷史所有數據也需要進行回溯,存在大量重復計算,這樣做效率不高,因此我們需要一套能夠進行增量計算的圖計算方案。

      4. 動態圖增量計算:GeaFlow

      我們知道在傳統的流計算引擎中,如Flink,其處理模型允許系統能夠處理不斷流入的數據事件。處理每個事件時,Flink 可以評估變化并僅針對變化的部分執行計算。這意味著在增量計算過程中,Flink 會關注最新到達的數據,而不是整個數據集。于是受到Flink增量計算的啟發,我們自研了增量圖計算系統GeaFlow(也叫流圖計算引擎),能夠很好的支持增量圖迭代計算。

      那么GeaFlow是如何實現增量圖計算的呢?首先,實時數據通過connector消息源輸入的GeaFlow中,GeaFlow依據實時數據,生成內部的點邊結構數據,并且將點邊數據插入進底圖中。當前窗口的實時數據涉及到的點會被激活,觸發圖迭代計算。

      這里以WCC算法為例,對聯通分量算法而言,在一個時間窗口內每條邊對應的src id和tar id對應的頂點會被激活,第一次迭代需要將其id信息通知其鄰居節點。如果鄰居節點收到消息后,發現需要更新自己的信息,那么它需要繼續將更新消息通知給它的鄰居節點;如果說鄰居節點不需要更新自己的信息,那么它就不需要通知其鄰居節點,它對應的迭代終止。

      5. GeaFlow架構簡析

      GeaFlow引擎主要由三大主要部分組成,DSL、Framework和State,同時向上為用戶提供了Stream API、靜態圖API和動態圖API。DSL主要負責圖查詢語言SQL+ISO/GQL的解析和執行計劃的優化,同時負責schema的推導,也向外部承接了多種Connector,比如hive、hudi、kafka、odps等。Framework層負責運行時的調度和容災,shuffle以及框架內各個組件的管理協調。State層負責存儲底層圖數據和數據的持久化,同時也負責索引、下推等眾多性能優化工作。

      6. GeaFlow性能測試

      為了驗證GeaFlow的增量圖計算性能,我們設計了這樣的實驗。一批數據按照固定時間窗口實時輸入到計算引擎中,我們分別用Spark和GeaFlow對全圖做聯通分量算法計算,比較兩者計算耗時。實驗在3臺24核內存128G的機器上開展,使用的數據集是公開數據集soc-Livejournal,測試的圖算法是弱聯通分量算法。我們以50w條數據作為一個計算窗口,每輸入到引擎中50w條數據,就觸發一次圖計算。

      Spark作為批處理引擎,對于每一批窗口來的數據,不管窗口規模是大是小,都需要對增量圖數據連同歷史圖數據進行全量計算。在Spark上,可以直接調用Spark GraphX內部內置的WCC算法進行計算。

      object SparkTest {
      
        def main(args: Array[String]): Unit = {
      
          val iter_num: Int = args(0).toInt
          val parallel: Int = args(1).toInt
      
          val spark = SparkSession.builder.appName("HDFS Data Load").config("spark.default.parallelism", args(1)).getOrCreate
      
          val sc = new JavaSparkContext(spark.sparkContext)
          val graph = GraphLoader.edgeListFile(sc, "hdfs://rayagsecurity-42-033147014062:9000/" + args(2), numEdgePartitions = parallel)
      
          val result = graph.connectedComponents(10)
          handleResult(result)
          print("finish")
      
        }
      
        def handleResult[VD, ED](graph: Graph[VD, ED]): Unit = {
          graph.vertices.foreachPartition(_.foreach(tuple => {
      
          }))
        }
      }
      

      GeaFlow上支持SQL+ISO/GQL的圖查詢語言,我們使用圖查詢語言調用GeaFlow內置的增量聯通分量圖算法進行測試,圖查詢語言代碼如下:

      CREATE TABLE IF NOT EXISTS tables (
        f1 bigint,
        f2 bigint
      ) WITH (
        type='file',
        geaflow.dsl.window.size='16000',
        geaflow.dsl.column.separator='\t',
        test.source.parallel = '32',
        geaflow.dsl.file.path = 'hdfs://xxxx:9000/com-friendster.ungraph.txt'
      );
      
      CREATE GRAPH modern (
        Vertex v1 (
          id int ID
        ),
        Edge e1 (
          srcId int SOURCE ID,
          targetId int DESTINATION ID
        )
      ) WITH (
        storeType='memory',
        shardCount = 256
      );
      
      INSERT INTO modern(v1.id, e1.srcId, e1.targetId)
      (
        SELECT f1, f1, f2
        FROM tables
      );
      
      INSERT INTO modern(v1.id)
      (
        SELECT f2
        FROM tables
      );
      
      CREATE TABLE IF NOT EXISTS tbl_result (
        vid bigint,
        	component bigint
      ) WITH (
        ignore='true',
        type ='file'
      );
      
      use GRAPH modern;
      
      INSERT INTO tbl_result
      CALL inc_wcc(10) YIELD (vid, component)
      RETURN vid, component
      ;
      

      下圖是對兩者進行聯通分量算法實驗時得到的實驗結果。以50w條數據為一個窗口進行迭代計算,Spark中存在大量的重復計算,因為其還要回溯全量的歷史數據進行計算。而GeaFlow只會激活當前窗口中涉及到的點邊進行增量計算,計算可在秒級別完成,每個窗口的計算時間基本穩定。隨著數據量的不斷增大,Spark進行計算時所需要回溯的歷史數據就越多,在其機器容量沒有達到上限的情況下,其計算時延和數據量呈正相關分布。相同情況下GeaFlow的計算時間也會略微增大,但基本可以在秒級別完成。

      7. 總結

      傳統的圖計算方案(如Spark GraphX)在近實時場景中存在重復計算問題,受Flink流處理模型和傳統圖計算的啟發,我們給出了一套能夠支持增量圖計算的方案。總的來說GeaFlow主要有以下幾個方面的優勢:

      1. GeaFlow在處理增量實時計算時,性能優于Spark Streaming + GraphX方案,尤其是在大規模數據集上。
      2. GeaFlow通過增量計算避免了全量數據的重復處理,計算效率更高,計算時間更短性能不明顯下降。
      3. GeaFlow支持SQL+GQL混合處理語言,更適合開發復雜的圖數據處理任務。

      GeaFlow項目代碼已全部開源,我們完成了部分流圖引擎基礎能力的構建,未來希望基于GeaFlow構建面向圖數據的統一湖倉處理引擎,以解決多樣化的大數據關聯性分析訴求。同時我們也在積極籌備加入Apache基金會,豐富大數據開源生態,因此非常歡迎對圖技術有濃厚興趣同學加入社區共建。

      社區中有諸多有趣的工作尚待完成,你可以從如下簡單的「Good First Issue」開始,期待你加入同行。

      • 支持Paimon Connector插件,連接數據湖生態。(Issue 361
      • 優化GQL match語句性能。(Issue 363
      • 新增ISO/GQL語法,支持same謂詞。(Issue 368
      • ...

      參考鏈接

      1. GeaFlow項目地址:https://github.com/TuGraph-family/tugraph-analytics
      2. soc-Livejournal數據集地址:https://snap.stanford.edu/data/soc-LiveJournal1.html
      3. GeaFlow Issues:https://github.com/TuGraph-family/tugraph-analytics/issues
      posted @ 2025-03-03 15:14  Florian  閱讀(266)  評論(0)    收藏  舉報
      主站蜘蛛池模板: 欧美国产日韩久久mv| 国产乱码精品一区二区三| 国产精品一区中文字幕| 俺来也俺去啦最新在线| 漂亮的保姆hd完整版免费韩国| 久久精品国产亚洲精品| 国产稚嫩高中生呻吟激情在线视频| 亚洲大尺度无码无码专线| 被灌满精子的少妇视频| 日韩人妻无码一区二区三区久久| 好吊视频在线一区二区三区| 亚洲一区二区av高清| 日韩av毛片福利国产福利| 2021国产在线视频| 亚洲午夜精品久久久久久抢| 99精品国产综合久久久久五月天 | 久久一日本道色综合久久| 国产黄色看三级三级三级| 车致| 亚洲精品一区二区天堂| 7m精品福利视频导航| 国产精品国产三级国快看| 亚洲人成小说网站色在线| 91孕妇精品一区二区三区| 高清无码在线视频| 日韩av中文字幕有码| 一区二区三区四区国产综合| 99精品国产一区在线看| 国产av麻豆mag剧集| 国产亚洲精品视频一二区| 性欧美暴力猛交69hd| 人妻系列中文字幕精品| 成av人片一区二区久久| 国产无套粉嫩白浆在线| 亚洲区成人综合一区二区| 亚洲成人精品综合在线| 伊人久久大香线蕉AV网禁呦| 久久成人 久久鬼色| 欧美z0zo人禽交另类视频| 一个色综合国产色综合| 和林格尔县|