<output id="qn6qe"></output>

    1. <output id="qn6qe"><tt id="qn6qe"></tt></output>
    2. <strike id="qn6qe"></strike>

      亚洲 日本 欧洲 欧美 视频,日韩中文字幕有码av,一本一道av中文字幕无码,国产线播放免费人成视频播放,人妻少妇偷人无码视频,日夜啪啪一区二区三区,国产尤物精品自在拍视频首页,久热这里只有精品12

      大數據基礎原理

      1. 背景

      在各行各業的發展中,無論來源、記錄方式如何,人們必然會積累各種各樣的數據,并且傾向于通過統計數據分析現實情況,以此作為指導行動方向的依據。因此,統計學中一直圍繞著數據進行建模與問題分析,給出對數據背后反映問題的判斷。

      由于計算機的發展,承載數據統計和分析的實體自然而然地變成了各式各樣的計算機。在計算機和互聯網的發展下,在數據分析領域也催生了一些新的計算形態。而且盡管各種行業的數據模型不同, 但是不同需求之間是有共性的。

      如今已經為人熟知的大數據的概念,包括規模的增長,異構多樣的數據帶來的各種處理需求,其實在計算機領域是可以預見到的。在小數據,單機的情況下可以良好工作的算法,在數據規模激增的情況下不一定行得通,需要以各種新的設計來解決相應的問題。

      在硬件上,表現為多核處理器和多機系統的發展;在軟件層面,有對應高性能并行計算的MPI,當然也有以Google的MapReduce為濫觴的一系列集群式ETL的數據處理框架。

      Hadoop是大數據處理框架在開源社區的先驅,下文僅討論Hadoop生態相關的計算框架。

      發展歷程

      由于提升單臺計算機的成本曲線讓人望而生畏, 由相對廉價的計算機集群執行的分布式計算成為了一種現實的選擇。

      因為互聯網業務的特點,處理大量的文檔,網絡請求日志來生成報告乃至進行機器學習,大規模圖學習成為其常見的場景。伴隨著這些場景,Google的工程師實踐并提出了MapReduce計算模型,同時貢獻了GFS,Big Table。之后, 業界逐漸出現并演化了一批用于大數據處理的框架和組件,Hadoop即是其中的早期代表。

      Hadoop的組成至少包含以下部分:

      yarn是計算集群任務調度的服務中心;map reduce計算引擎;hdfs的文件系統。

      MapReduce本身的概念其實并不難理解,但是其面對真實場景需要解決大量的工程問題。

      以Hadoop為例,其本身就要處理很多異常情況。典型問題,如在集群中出現節點異常是家常便飯,每一步都可能出錯或者被意外中斷;mapreduce的數據分片經歷了大量的基準測試和優化,在不同場景下要達到高性能的配置也相當復雜。hadoop的實現也沒有達到一個完美的地步。hadoop只能處理批量離線數據,而且為了使用它執行一個操作經常需要做很多的MapReduce組合。由于其數據在處理過程中大部分都需要落盤,因此hadoop在性能上并沒有優勢,直接使用map reduce的方式到今天為止已經完全淡出工業界了。

      盡管如此,入門大數據領域的時候了解MapReduce仍然是有其意義的。

      2. MapReduce計算模型

      我們了解一下MapReduce的計算模型。

      MapReduce是一個在多個節點的集群中處理數據的計算框架/過程。一個MapReduce的系統通常由map、shuffle和reduce組成。數據的流向是從master節點發送數據開始, 到reduce節點結束。

      一個典型的map reduce包含下圖幾個階段:

       

      map,reduce的計算形式:

      map (k1, v1) -> list(k2, v2)

      reduce (k2, list(v2)) -> list(v2)

      map階段之前,數據會形成分片,由master節點派發給不同的worker節點;每個數據的分片中可能包含了不同的key;

      map階段,每個worker節點執行map計算任務;

      map階段是并行的,在worker節點計算輸出后保存(本地),傳給下一個階段;

      從map到reduce,數據會經歷shuffle,其過程取決于具體的實現,并不是統一的;

      reduce階段也是在worker節點完成。worker接收到master節點分配的reduce任務,拉取map階段的輸出作為輸入,計算最終的輸出。跟Map階段不同,Reduce階段不能并行。

      如何容錯

      master節點記錄了每個節點的任務狀態,并且通過ping來判斷worker節點是否失敗; master節點在worker任務失敗的時候會重置worker節點的狀態;整個任務的執行依賴于worker對任務處理的原子性。

      combiner函數

      MapReduce同時包含了combiner函數,也是針對map的輸出進行處理,跟reducer的區別是,它是處理worker本地數據的,其輸出將作為reducer的輸入;針對本來要發送給reducer的數據,做一些局部的排序或合并。而有序的數據在處理上可以取出小塊切片后reduce而不影響結果,既能減少網絡IO,也能減少reduce階段的計算量。

      shuffle階段

      shuffle包含map階段和reduce階段的操作,是一個統稱;

      在map階段,根據key,val將數據劃分到指定的reduce任務,這個叫做partitioner,也是shuffle的一個階段;

      在reduce階段,shuffle根據map輸出的結果,對內存和磁盤內的數據處理,對將由同一個reducer處理的數據進行合并。

      需要注意的是,reduce階段的shuffle排序和合并的算法在使用Spark框架的時候無法定制,僅能通過comparator指定如何排序。

      combiner優化

      當reduce滿足結合律,可以使用reducer直接替換combiner

      如果reduce不滿足結合律,無法直接用reducer替換combiner,具體問題具體分析

      2.1 MapReduce combiner的實際計算樣例

      計算平均值:

        combiner是優化中間計算的一種方式,當使用combiner時,需要注意reducer是否滿足結合率;

      不妨設兩個節點上有待執行map的數據集合,為A1,A2,顯然,列數據的平均值不能用各自平均值取平均: AVG(AVG(A1), AVG(A2)) != AVG(A1∪A2)

      這種情況需要在每條數據附加一個記錄數1,最后在reduce階段以總數和記錄總數求得平均值;

      計算中位數

      另一個例子是計算數據的中位數: 這個目標任務的combiner函數不能是簡單的取中位數。

      一種方案是將map 輸出的v2存為列表,這種方法會占用較多的內存;

      在第一種方案的基礎上其實可以進行優化:不保留所有的v2,而是記錄v2的取值和對應的計數,最后得到的就是類似<k1, v1, count1>, <k2, v2, count2> ...的輸出,其中列表是排好序的,在reduce階段,遍歷key的列表,在count1+count2...超過一半的總數時得到對應的中位數;同樣可以優化內存的使用。

      2.2. Spark計算框架

      Spark是一個最為流行的大數據計算引擎。Spark基于hadoop的mapreduce計算模型做出抽象,并且在原有的資源調度、迭代計算的基礎上做了許多優化,同時保留了對hadoop的hdfs,yarn等依賴。盡管它跟hadoop有關聯,但是其跟hadoop并不是互為替換的關系。除此之外,伴隨著更加方便的API和程序框架誕生的其他計算組件,則構建了Spark大數據處理的生態。

      2.3 Hadoop Shuffle和Spark Shuffle

      從功能上看,Hadoop和Spark的Shuffle是類似的,沒有區別。從實現的細節上,兩者才有不同。map端的shuffle一般為shuffle的Write階段,reduce端的shuffle一般為shuffle的read階段。

      相對于Hadoop的 MapReduce,最開始的Spark在默認的情況下不對 Shuffle 的數據進行排序,即Hash Shuffle,目的是為了減少shuffle時候的性能浪費,對不需要排序的數據忽略這一步。但是,后來Spark的Shuffle經歷了Hash、Sort、Tungsten-Sort(堆外內存)三階段發展歷程。

      對于Hash Shuffle來說,在 Map Task 過程按照 Hash 的方式重組 Partition 的數據,不進行排序。每個 Map Task 為每個 Reduce Task 生成一個文件,通常會產生大量的文件(即對應為 M*R 個中間文件),伴隨大量的隨機磁盤 I/O 操作與大量的內存開銷。

      在Spark 1.2以后的版本中,默認的ShuffleManager改成了SortShuffleManager。SortShuffleManager相較于HashShuffleManager來說,有了一定的改進。主要就在于,每個Task在進行shuffle操作時,雖然也會產生較多的臨時磁盤文件,但是最后會將所有的臨時文件合并(merge)成一個磁盤文件,因此每個Task就只有一個磁盤文件。在下一個stage的shuffle read task拉取自己的數據時,只要根據索引讀取每個磁盤文件中的部分數據即可[4]。而且針對不同的算子,會判斷是否啟用SortShuffle,從而避免不必要的排序。

       

      2.4 Spark SQL, Spark Streaming, Structured Streaming

      在Spark中,會用到RDD,DataFrame,DataSet(類似DataFrame)幾種核心數據結構。

      RDD為resilient distributed dataset,彈性分布式數據集,其結構上是Java 或Scala對象的集合.。Spark中在其上定義了很多操作,并且其接口類似函數式編程,能夠簡潔直觀地表示一系列操作。其缺點是性能難以優化,而且會帶來使用的時候本質上要自己維護DAG的困難。

      DataFrame是數據的一個不可變分布式集合,其結構是列式存儲,區別于RDD, 可以通過其列名直接訪問數據列;相較RDD,DataFrame 是高級的 API,提供類似于SQL 的 query 接口。

       

       

      Spark Steaming的出現早于Structured Streaming,處理的對象是DStream, 而Structured Streaming是Spark 2.0發布帶來的特性,處理的對象是DataFrame。

      兩者都是以時間劃分出微批處理,針對一定時間間隔的小批數據進行處理,以此模擬流式數據處理。

      其中,Spark Steaming接近RDD對象數據結構,缺點是效率不高。而Structured Streaming同樣是微批處理的形式,但其在DataFrame基礎上的流式處理性能提升了。

      簡單來說,跟Hadoop相比,Spark具有如下的優點:

      基于DAG的任務執行調度機制,算子的更強表現力,降低了開發的心智負擔,更加貼近實際的開發;更加高效的計算執行模式,讓更多的數據計算直接在內存中執行減少IO開銷。

      3. 設計模式、算子與流式計算

      MapReduce設計模式

      從MapReduce開始,其實就已經有了很多設計模式,可以參見《MapReduce設計模式》

      一些典型的設計模式如下:

      數值概要設計模式 numerical summarization pattern,計算總數,最大值,最小值等都是概要模式

      索引模式 indexing pattern 產生一個數據集的索引以達到快速檢索的目的,例如文本的倒排索引

      過濾模式 filtering pattern 過濾掉一些不需要的數據。

      Spark的算子或多或少是這些map reduce設計模式的體現。

      算子的分類

      (1)Transformation變換/轉換算子:這種變換不會執行真正的作業,因為一些變換不需要馬上得到結果,而且可以在多個變換后計算復合操作,減少計算量。

      (2)Action行動算子:這類算子會觸發提交job作業,并將數據輸出到Spark系統。

      Transformation變換/轉換算子

      1. map算子

      2. flatMap算子

      3. mapPartitions算子

      4. mapPartitionsWithIndex算子

      5. cache算子、persist算子

      6. union算子

      7. cartesian算子

      8. groupBy算子

      9. filter算子

      10. sample算子

      11. combineByKey 算子 reduceByKey算子

      12. join算子

      Action類型的算子:

      1.foreach

      2.saveAsTextFile

      3.collect

      4.count

      4. Hadoop生態的組件

      Pig 和Hive

      Pig和HiveQL是在HDFS上使用的數據查詢語言。

      HBase, Cassandra, HDFS

      HBase是仿照谷歌BitTable的開源實現,是面向列的開源數據庫;HDFS是GFS的開源實現,是一種文件系統;Cassandra是個分布式的key-value數據庫。

      Pregel,GraphX

      圖數據結構是分布式計算中一個重要的領域。圖的數據可以分為網絡、樹、類RDBMS結構、稀疏矩陣以及其它一些結構(from Spark GraphX in Action)。為了處理圖,在OLTP領域有Neo4j, NebulaGraph等組件;而在OLAP領域以Pregel,GraphX等為代表。

      Pregel是一個迭代式的分布式圖計算算法和系統,在此基礎上有Spark的GraphX。到目前為止,GraphX還有一些尚未實現的特性和優化空間。

      數據轉換Sqoop

      sqoop 是一個數據庫轉換的工具,提供了從hadoop文件格式到關系型數據庫之間的數據互轉。

      Storm,Flink

      前面講到Spark可以進行流式數據的處理,然而Spark到目前為止的流式處理延遲只能達到秒級,而另外兩種流式處理框架Storm、Flink可以達到μs到ms級。

      這里只講Flink。為什么Spark跟Flink會有如此大的差異,主要還是數據處理原理的差異。Spark的處理方式是將流式處理看作批處理的一種特殊形式,每收到一個間隔的數據才進行處理,在實時性上難以提升。

      而Flink處理的模型是基于算子(operator)的連續數據流。Flink設計了一套高度靈活的窗口機制,從而對數據能夠執行真正的流式計算,每有一條數據就能進行期望的算子操作。

       

       

       

      上圖中是一個Dataflow,數據src,數據sink,以及transformation算子均為節點,它們通過stream相連.數據從一個operator出發,經過stream被其他operator處理。同時,一個 stream 可以包含多個分區,一個算子可以被分成多個算子的子任務,每一個子任務是在不同的線程或者不同的機器節點中獨立執行的。

      Flink也提供了跟Spark一樣的DataSet,DataStream API,下圖是Flink的架構。

       

      5. 計算樣例

      PageRank如何使用MapReduce計算

      PageRank是搜索引擎中一個計算網頁重要性的算法。可以使用Map Reduce實現簡化的PageRank。在一個圖中,網頁之間通過超鏈接相連,因此可以用一個有向圖/鄰接矩陣表示。

       

      PageRank的數據結構:

      矩陣F(A,B),項F(x,y)表示從A到B的出鏈關系,1為有出鏈

      算法:

      初始化的時候每個網頁是一個相同的均值,每次的網頁i的值為剛好其他網頁權重按比例分配給它的權重之和。

      而對網頁i來說,它分配給其他網頁的權重是其已有權重的平均值。即W(u) = Σ D(v屬于Su)/Out(v)。

      Map的時候,針對網頁u,計算網頁u到其他網頁的權重,每條記錄<u,W(u)>變成列表<vi,W(u)/count(vi∈Su)> (i = 1,2,...)

      Reduce的時候,由vi為key,加和所有的權重,即得到網頁v的權重。(v跟u不是同一個網頁)

      一次計算得到的權重不是最終結果,PageRank要經過多次迭代計算得到最終各個網頁的權重[7]。

      這是一個馬爾科夫隨機過程。為了避免該過程不收斂,可以加入虛擬的節點,以及一個概率點擊系數,表示從任意節點可能跳轉到該節點的概率。

       

      References

      [1] Dean J ,  Ghemawat S . MapReduce: Simplified Data Processing on Large Clusters[C]// Proceedings of the 6th conference on Symposium on Opearting Systems Design & Implementation - Volume 6. USENIX Association, 2004.

      [2] MapReduce-MPI Library, https://mapreduce.sandia.gov/

      [3] Hadoop Map/Reduce教程, http://hadoop.apache.org/docs/r1.0.4/cn/mapred_tutorial.html

      [4] MapReduce Shuffle 和 Spark Shuffle, https://cloud.tencent.com/developer/article/1651735, 2020

      [5] Carbone P , Katsifodimos A , ? Kth, et al. Apache flink : Stream and batch processing in a single engine.  2015.

      [6] Data Streaming Fault Tolerance, https://ci.apache.org/projects/flink/flink-docs-release-1.3/internals/stream_checkpointing.html#checkpointing

      [7] PageRank in Map Reduce, https://medium.com/swlh/pagerank-on-mapreduce-55bcb76d1c99

       

      posted @ 2021-04-18 21:16  stackupdown  閱讀(599)  評論(0)    收藏  舉報
      主站蜘蛛池模板: 亚洲国产成人无码影院| 亚洲成a人无码av波多野| 亚洲老熟女一区二区三区| 久久久精品午夜免费不卡| 丰满高跟丝袜老熟女久久| 隆安县| 亚洲精品色哟哟一区二区| 亚洲国模精品一区二区| 伊人色综合一区二区三区影院视频 | 免费无遮挡毛片中文字幕| 丰满熟妇人妻av无码区| 日韩理伦片一区二区三区| 亚洲中文精品一区二区| 国产成人久久综合一区| 日本无产久久99精品久久| 久久人妻无码一区二区三区av| 91久久国产成人免费观看| 亚洲精品美女久久7777777| 日日噜久久人妻一区二区| 久久99久国产麻精品66| 在线亚洲妇色中文色综合| 天天摸天天做天天添欧美| 日本va欧美va精品发布| 日韩成人一区二区二十六区| 亚洲国产精品成人无码区| 福利一区二区在线视频| 亚洲熟女乱色综合亚洲图片| 国产女同疯狂作爱系列| 99在线精品视频观看免费| 黑人巨大无码中文字幕无码| 亚洲成人av综合一区| 国产日产免费高清欧美一区| 欧美精品日韩精品一卡| 日本一卡2卡3卡四卡精品网站| 无码伊人久久大杳蕉中文无码 | 泊头市| 国产亚洲精品成人av久| 久久人与动人物a级毛片| 亚洲人成伊人成综合网小说| 亚洲一区二区国产av| 精品国产大片中文字幕|