<output id="qn6qe"></output>

    1. <output id="qn6qe"><tt id="qn6qe"></tt></output>
    2. <strike id="qn6qe"></strike>

      亚洲 日本 欧洲 欧美 视频,日韩中文字幕有码av,一本一道av中文字幕无码,国产线播放免费人成视频播放,人妻少妇偷人无码视频,日夜啪啪一区二区三区,国产尤物精品自在拍视频首页,久热这里只有精品12

      Spark入門(mén)實(shí)戰(zhàn)系列--7.Spark Streaming(上)--實(shí)時(shí)流計(jì)算Spark Streaming原理介紹

      【注】該系列文章以及使用到安裝包/測(cè)試數(shù)據(jù) 可以在《傾情大奉送--Spark入門(mén)實(shí)戰(zhàn)系列》獲取

      1Spark Streaming簡(jiǎn)介

      1.1 概述

      Spark Streaming Spark核心API的一個(gè)擴(kuò)展,可以實(shí)現(xiàn)高吞吐量的、具備容錯(cuò)機(jī)制的實(shí)時(shí)流數(shù)據(jù)的處理。支持從多種數(shù)據(jù)源獲取數(shù)據(jù),包括KafkFlumeTwitterZeroMQKinesis 以及TCP sockets,從數(shù)據(jù)源獲取數(shù)據(jù)之后,可以使用諸如mapreducejoinwindow等高級(jí)函數(shù)進(jìn)行復(fù)雜算法的處理。最后還可以將處理結(jié)果存儲(chǔ)到文件系統(tǒng),數(shù)據(jù)庫(kù)和現(xiàn)場(chǎng)儀表盤(pán)。在“One Stack rule them all”的基礎(chǔ)上,還可以使用Spark的其他子框架,如集群學(xué)習(xí)、圖計(jì)算等,對(duì)流數(shù)據(jù)進(jìn)行處理。

      Spark Streaming處理的數(shù)據(jù)流圖:

      clip_image002

      Spark的各個(gè)子框架,都是基于核心Spark的,Spark Streaming在內(nèi)部的處理機(jī)制是,接收實(shí)時(shí)流的數(shù)據(jù),并根據(jù)一定的時(shí)間間隔拆分成一批批的數(shù)據(jù),然后通過(guò)Spark Engine處理這些批數(shù)據(jù),最終得到處理后的一批批結(jié)果數(shù)據(jù)。

      對(duì)應(yīng)的批數(shù)據(jù),在Spark內(nèi)核對(duì)應(yīng)一個(gè)RDD實(shí)例,因此,對(duì)應(yīng)流數(shù)據(jù)的DStream可以看成是一組RDDs,即RDD的一個(gè)序列。通俗點(diǎn)理解的話(huà),在流數(shù)據(jù)分成一批一批后,通過(guò)一個(gè)先進(jìn)先出的隊(duì)列,然后 Spark Engine從該隊(duì)列中依次取出一個(gè)個(gè)批數(shù)據(jù),把批數(shù)據(jù)封裝成一個(gè)RDD,然后進(jìn)行處理,這是一個(gè)典型的生產(chǎn)者消費(fèi)者模型,對(duì)應(yīng)的就有生產(chǎn)者消費(fèi)者模型的問(wèn)題,即如何協(xié)調(diào)生產(chǎn)速率和消費(fèi)速率。

      1.2 術(shù)語(yǔ)定義

      l離散流(discretized stream)或DStream:這是Spark Streaming對(duì)內(nèi)部持續(xù)的實(shí)時(shí)數(shù)據(jù)流的抽象描述,即我們處理的一個(gè)實(shí)時(shí)數(shù)據(jù)流,在Spark Streaming中對(duì)應(yīng)于一個(gè)DStream 實(shí)例。

      l批數(shù)據(jù)(batch data:這是化整為零的第一步,將實(shí)時(shí)流數(shù)據(jù)以時(shí)間片為單位進(jìn)行分批,將流處理轉(zhuǎn)化為時(shí)間片數(shù)據(jù)的批處理。隨著持續(xù)時(shí)間的推移,這些處理結(jié)果就形成了對(duì)應(yīng)的結(jié)果數(shù)據(jù)流了。

      l時(shí)間片或批處理時(shí)間間隔( batch interval:這是人為地對(duì)流數(shù)據(jù)進(jìn)行定量的標(biāo)準(zhǔn),以時(shí)間片作為我們拆分流數(shù)據(jù)的依據(jù)。一個(gè)時(shí)間片的數(shù)據(jù)對(duì)應(yīng)一個(gè)RDD實(shí)例。

      l窗口長(zhǎng)度(window length:一個(gè)窗口覆蓋的流數(shù)據(jù)的時(shí)間長(zhǎng)度。必須是批處理時(shí)間間隔的倍數(shù),

      l滑動(dòng)時(shí)間間隔:前一個(gè)窗口到后一個(gè)窗口所經(jīng)過(guò)的時(shí)間長(zhǎng)度。必須是批處理時(shí)間間隔的倍數(shù)

      lInput DStream :一個(gè)input DStream是一個(gè)特殊的DStream,將Spark Streaming連接到一個(gè)外部數(shù)據(jù)源來(lái)讀取數(shù)據(jù)。

      1.3 StormSpark Streming比較

      l處理模型以及延遲

      雖然兩框架都提供了可擴(kuò)展性(scalability)和可容錯(cuò)性(fault tolerance),但是它們的處理模型從根本上說(shuō)是不一樣的。Storm可以實(shí)現(xiàn)亞秒級(jí)時(shí)延的處理,而每次只處理一條event,而Spark Streaming可以在一個(gè)短暫的時(shí)間窗口里面處理多條(batches)Event。所以說(shuō)Storm可以實(shí)現(xiàn)亞秒級(jí)時(shí)延的處理,而Spark Streaming則有一定的時(shí)延。

      l容錯(cuò)和數(shù)據(jù)保證

      然而兩者的代價(jià)都是容錯(cuò)時(shí)候的數(shù)據(jù)保證,Spark Streaming的容錯(cuò)為有狀態(tài)的計(jì)算提供了更好的支持。在Storm中,每條記錄在系統(tǒng)的移動(dòng)過(guò)程中都需要被標(biāo)記跟蹤,所以Storm只能保證每條記錄最少被處理一次,但是允許從錯(cuò)誤狀態(tài)恢復(fù)時(shí)被處理多次。這就意味著可變更的狀態(tài)可能被更新兩次從而導(dǎo)致結(jié)果不正確。

      任一方面,Spark Streaming僅僅需要在批處理級(jí)別對(duì)記錄進(jìn)行追蹤,所以他能保證每個(gè)批處理記錄僅僅被處理一次,即使是node節(jié)點(diǎn)掛掉。雖然說(shuō)Storm Trident library可以保證一條記錄被處理一次,但是它依賴(lài)于事務(wù)更新?tīng)顟B(tài),而這個(gè)過(guò)程是很慢的,并且需要由用戶(hù)去實(shí)現(xiàn)。

      l實(shí)現(xiàn)和編程API

      Storm主要是由Clojure語(yǔ)言實(shí)現(xiàn),Spark Streaming是由Scala實(shí)現(xiàn)。如果你想看看這兩個(gè)框架是如何實(shí)現(xiàn)的或者你想自定義一些東西你就得記住這一點(diǎn)。Storm是由BackType Twitter開(kāi)發(fā),而Spark Streaming是在UC Berkeley開(kāi)發(fā)的。

      Storm提供了Java API,同時(shí)也支持其他語(yǔ)言的API Spark Streaming支持ScalaJava語(yǔ)言(其實(shí)也支持Python)

      l批處理框架集成

      Spark Streaming的一個(gè)很棒的特性就是它是在Spark框架上運(yùn)行的。這樣你就可以想使用其他批處理代碼一樣來(lái)寫(xiě)Spark Streaming程序,或者是在Spark中交互查詢(xún)。這就減少了單獨(dú)編寫(xiě)流批量處理程序和歷史數(shù)據(jù)處理程序。

      l生產(chǎn)支持

      Storm已經(jīng)出現(xiàn)好多年了,而且自從2011年開(kāi)始就在Twitter內(nèi)部生產(chǎn)環(huán)境中使用,還有其他一些公司。而Spark Streaming是一個(gè)新的項(xiàng)目,并且在2013年僅僅被Sharethrough使用(據(jù)作者了解)

      Storm Hortonworks Hadoop數(shù)據(jù)平臺(tái)中流處理的解決方案,而Spark Streaming出現(xiàn)在 MapR的分布式平臺(tái)和Cloudera的企業(yè)數(shù)據(jù)平臺(tái)中。除此之外,Databricks是為Spark提供技術(shù)支持的公司,包括了Spark Streaming

      雖然說(shuō)兩者都可以在各自的集群框架中運(yùn)行,但是Storm可以在Mesos上運(yùn)行, Spark Streaming可以在YARNMesos上運(yùn)行。

      2運(yùn)行原理

      2.1 Streaming架構(gòu)

      SparkStreaming是一個(gè)對(duì)實(shí)時(shí)數(shù)據(jù)流進(jìn)行高通量、容錯(cuò)處理的流式處理系統(tǒng),可以對(duì)多種數(shù)據(jù)源(如KdfkaFlumeTwitterZeroTCP 套接字)進(jìn)行類(lèi)似MapReduceJoin等復(fù)雜操作,并將結(jié)果保存到外部文件系統(tǒng)、數(shù)據(jù)庫(kù)或應(yīng)用到實(shí)時(shí)儀表盤(pán)。

      l計(jì)算流程Spark Streaming是將流式計(jì)算分解成一系列短小的批處理作業(yè)。這里的批處理引擎是Spark Core,也就是把Spark Streaming的輸入數(shù)據(jù)按照batch size(如1秒)分成一段一段的數(shù)據(jù)(Discretized Stream),每一段數(shù)據(jù)都轉(zhuǎn)換成Spark中的RDDResilient Distributed Dataset),然后將Spark Streaming中對(duì)DStreamTransformation操作變?yōu)獒槍?duì)Spark中對(duì)RDDTransformation操作,將RDD經(jīng)過(guò)操作變成中間結(jié)果保存在內(nèi)存中。整個(gè)流式計(jì)算根據(jù)業(yè)務(wù)的需求可以對(duì)中間的結(jié)果進(jìn)行疊加或者存儲(chǔ)到外部設(shè)備。下圖顯示了Spark Streaming的整個(gè)流程。

      clip_image004

      Spark Streaming構(gòu)架

      l容錯(cuò)性:對(duì)于流式計(jì)算來(lái)說(shuō),容錯(cuò)性至關(guān)重要。首先我們要明確一下SparkRDD的容錯(cuò)機(jī)制。每一個(gè)RDD都是一個(gè)不可變的分布式可重算的數(shù)據(jù)集,其記錄著確定性的操作繼承關(guān)系(lineage),所以只要輸入數(shù)據(jù)是可容錯(cuò)的,那么任意一個(gè)RDD的分區(qū)(Partition)出錯(cuò)或不可用,都是可以利用原始輸入數(shù)據(jù)通過(guò)轉(zhuǎn)換操作而重新算出的。  

      對(duì)于Spark Streaming來(lái)說(shuō),其RDD的傳承關(guān)系如下圖所示,圖中的每一個(gè)橢圓形表示一個(gè)RDD,橢圓形中的每個(gè)圓形代表一個(gè)RDD中的一個(gè)Partition,圖中的每一列的多個(gè)RDD表示一個(gè)DStream(圖中有三個(gè)DStream),而每一行最后一個(gè)RDD則表示每一個(gè)Batch Size所產(chǎn)生的中間結(jié)果RDD。我們可以看到圖中的每一個(gè)RDD都是通過(guò)lineage相連接的,由于Spark Streaming輸入數(shù)據(jù)可以來(lái)自于磁盤(pán),例如HDFS(多份拷貝)或是來(lái)自于網(wǎng)絡(luò)的數(shù)據(jù)流(Spark Streaming會(huì)將網(wǎng)絡(luò)輸入數(shù)據(jù)的每一個(gè)數(shù)據(jù)流拷貝兩份到其他的機(jī)器)都能保證容錯(cuò)性,所以RDD中任意的Partition出錯(cuò),都可以并行地在其他機(jī)器上將缺失的Partition計(jì)算出來(lái)。這個(gè)容錯(cuò)恢復(fù)方式比連續(xù)計(jì)算模型(如Storm)的效率更高。

      clip_image006

      Spark StreamingRDDlineage關(guān)系圖

      l實(shí)時(shí)性:對(duì)于實(shí)時(shí)性的討論,會(huì)牽涉到流式處理框架的應(yīng)用場(chǎng)景。Spark Streaming將流式計(jì)算分解成多個(gè)Spark Job,對(duì)于每一段數(shù)據(jù)的處理都會(huì)經(jīng)過(guò)Spark DAG圖分解以及Spark的任務(wù)集的調(diào)度過(guò)程。對(duì)于目前版本的Spark Streaming而言,其最小的Batch Size的選取在0.5~2秒鐘之間(Storm目前最小的延遲是100ms左右),所以Spark Streaming能夠滿(mǎn)足除對(duì)實(shí)時(shí)性要求非常高(如高頻實(shí)時(shí)交易)之外的所有流式準(zhǔn)實(shí)時(shí)計(jì)算場(chǎng)景。

      l擴(kuò)展性與吞吐量Spark目前在EC2上已能夠線性擴(kuò)展到100個(gè)節(jié)點(diǎn)(每個(gè)節(jié)點(diǎn)4Core),可以以數(shù)秒的延遲處理6GB/s的數(shù)據(jù)量(60M records/s),其吞吐量也比流行的Storm25倍,圖4Berkeley利用WordCountGrep兩個(gè)用例所做的測(cè)試,在Grep這個(gè)測(cè)試中,Spark Streaming中的每個(gè)節(jié)點(diǎn)的吞吐量是670k records/s,而Storm115k records/s

      clip_image008

      Spark StreamingStorm吞吐量比較圖

      2.2 編程模型

      DStreamDiscretized Stream)作為Spark Streaming的基礎(chǔ)抽象,它代表持續(xù)性的數(shù)據(jù)流。這些數(shù)據(jù)流既可以通過(guò)外部輸入源賴(lài)獲取,也可以通過(guò)現(xiàn)有的Dstreamtransformation操作來(lái)獲得。在內(nèi)部實(shí)現(xiàn)上,DStream由一組時(shí)間序列上連續(xù)的RDD來(lái)表示。每個(gè)RDD都包含了自己特定時(shí)間間隔內(nèi)的數(shù)據(jù)流。如圖7-3所示。

       clip_image010

      7-3   DStream中在時(shí)間軸下生成離散的RDD序列

      clip_image012

      對(duì)DStream中數(shù)據(jù)的各種操作也是映射到內(nèi)部的RDD上來(lái)進(jìn)行的,如圖7-4所示,對(duì)Dtream的操作可以通過(guò)RDDtransformation生成新的DStream。這里的執(zhí)行引擎是Spark

      2.2.1 如何使用Spark Streaming

      作為構(gòu)建于Spark之上的應(yīng)用框架,Spark Streaming承襲了Spark的編程風(fēng)格,對(duì)于已經(jīng)了解Spark的用戶(hù)來(lái)說(shuō)能夠快速地上手。接下來(lái)以Spark Streaming官方提供的WordCount代碼為例來(lái)介紹Spark Streaming的使用方式。

      import org.apache.spark._

      import org.apache.spark.streaming._

      import org.apache.spark.streaming.StreamingContext._

       

      // Create a local StreamingContext with two working thread and batch interval of 1 second.

      // The master requires 2 cores to prevent from a starvation scenario.

      val conf = new SparkConf().setMaster("local[2]").setAppName("NetworkWordCount")

      val ssc = new StreamingContext(conf, Seconds(1))

       

      // Create a DStream that will connect to hostname:port, like localhost:9999

      val lines = ssc.socketTextStream("localhost", 9999)

       

      // Split each line into words

      val words = lines.flatMap(_.split(" "))

      import org.apache.spark.streaming.StreamingContext._

      // Count each word in each batch

      val pairs = words.map(word => (word, 1))

      val wordCounts = pairs.reduceByKey(_ + _)

       

      // Print the first ten elements of each RDD generated in this DStream to the console

      wordCounts.print()

      ssc.start()              // Start the computation

      ssc.awaitTermination()  // Wait for the computation to terminate

      1.創(chuàng)建StreamingContext對(duì)象Spark初始化需要?jiǎng)?chuàng)建SparkContext對(duì)象一樣,使用Spark Streaming就需要?jiǎng)?chuàng)建StreamingContext對(duì)象。創(chuàng)建StreamingContext對(duì)象所需的參數(shù)與SparkContext基本一致,包括指明Master,設(shè)定名稱(chēng)(NetworkWordCount)。需要注意的是參數(shù)Seconds(1)Spark Streaming需要指定處理數(shù)據(jù)的時(shí)間間隔,如上例所示的1s,那么Spark Streaming會(huì)以1s為時(shí)間窗口進(jìn)行數(shù)據(jù)處理。此參數(shù)需要根據(jù)用戶(hù)的需求和集群的處理能力進(jìn)行適當(dāng)?shù)脑O(shè)置;

      2.創(chuàng)建InputDStream如同StormSpoutSpark Streaming需要指明數(shù)據(jù)源。如上例所示的socketTextStreamSpark Streamingsocket連接作為數(shù)據(jù)源讀取數(shù)據(jù)。當(dāng)然Spark Streaming支持多種不同的數(shù)據(jù)源,包括Kafka FlumeHDFS/S3KinesisTwitter等數(shù)據(jù)源;

      3.操作DStream對(duì)于從數(shù)據(jù)源得到的DStream,用戶(hù)可以在其基礎(chǔ)上進(jìn)行各種操作,如上例所示的操作就是一個(gè)典型的WordCount執(zhí)行流程:對(duì)于當(dāng)前時(shí)間窗口內(nèi)從數(shù)據(jù)源得到的數(shù)據(jù)首先進(jìn)行分割,然后利用MapReduceByKey方法進(jìn)行計(jì)算,當(dāng)然最后還有使用print()方法輸出結(jié)果;

      4.啟動(dòng)Spark Streaming之前所作的所有步驟只是創(chuàng)建了執(zhí)行流程,程序沒(méi)有真正連接上數(shù)據(jù)源,也沒(méi)有對(duì)數(shù)據(jù)進(jìn)行任何操作,只是設(shè)定好了所有的執(zhí)行計(jì)劃,當(dāng)ssc.start()啟動(dòng)后程序才真正進(jìn)行所有預(yù)期的操作。

      至此對(duì)于Spark Streaming的如何使用有了一個(gè)大概的印象,在后面的章節(jié)我們會(huì)通過(guò)源代碼深入探究一下Spark Streaming的執(zhí)行流程。

      2.2.2 DStream的輸入源

      Spark Streaming中所有的操作都是基于流的,而輸入源是這一系列操作的起點(diǎn)。輸入 DStreams DStreams 接收的流都代表輸入數(shù)據(jù)流的來(lái)源,在Spark Streaming 提供兩種內(nèi)置數(shù)據(jù)流來(lái)源:

      l  基礎(chǔ)來(lái)源 StreamingContext API 中直接可用的來(lái)源。例如:文件系統(tǒng)、Socket(套接字)連接和 Akka actors

      l  高級(jí)來(lái)源 KafkaFlumeKinesisTwitter 等,可以通過(guò)額外的實(shí)用工具類(lèi)創(chuàng)建。

      2.2.2.1 基礎(chǔ)來(lái)源

      在前面分析怎樣使用Spark Streaming的例子中我們已看到ssc.socketTextStream()方法,可以通過(guò) TCP 套接字連接,從從文本數(shù)據(jù)中創(chuàng)建了一個(gè) DStream。除了套接字,StreamingContext API還提供了方法從文件和 Akka actors 中創(chuàng)建 DStreams作為輸入源。

      Spark Streaming提供了streamingContext.fileStream(dataDirectory)方法可以從任何文件系統(tǒng)(如:HDFSS3NFS 等)的文件中讀取數(shù)據(jù),然后創(chuàng)建一個(gè)DStreamSpark Streaming 監(jiān)控 dataDirectory 目錄和在該目錄下任何文件被創(chuàng)建處理(不支持在嵌套目錄下寫(xiě)文件)。需要注意的是:讀取的必須是具有相同的數(shù)據(jù)格式的文件;創(chuàng)建的文件必須在 dataDirectory 目錄下,并通過(guò)自動(dòng)移動(dòng)或重命名成數(shù)據(jù)目錄;文件一旦移動(dòng)就不能被改變,如果文件被不斷追加,新的數(shù)據(jù)將不會(huì)被閱讀。對(duì)于簡(jiǎn)單的文本文,可以使用一個(gè)簡(jiǎn)單的方法streamingContext.textFileStream(dataDirectory)來(lái)讀取數(shù)據(jù)。

      Spark Streaming也可以基于自定義 Actors 的流創(chuàng)建DStream ,通過(guò) Akka actors 接受數(shù)據(jù)流,使用方法streamingContext.actorStream(actorProps, actor-name)Spark Streaming使用 streamingContext.queueStream(queueOfRDDs)方法可以創(chuàng)建基于 RDD 隊(duì)列的DStream,每個(gè)RDD 隊(duì)列將被視為 DStream 中一塊數(shù)據(jù)流進(jìn)行加工處理。

      2.2.2.2 高級(jí)來(lái)源

      這一類(lèi)的來(lái)源需要外部 non-Spark 庫(kù)的接口,其中一些有復(fù)雜的依賴(lài)關(guān)系( KafkaFlume)。因此通過(guò)這些來(lái)源創(chuàng)建 DStreams 需要明確其依賴(lài)。例如,如果想創(chuàng)建一個(gè)使用 Twitter tweets 的數(shù)據(jù)的DStream 流,必須按以下步驟來(lái)做:

      1)在 SBT Maven工程里添加 spark-streaming-twitter_2.10 依賴(lài)。

      2)開(kāi)發(fā):導(dǎo)入 TwitterUtils 包,通過(guò) TwitterUtils.createStream 方法創(chuàng)建一個(gè)DStream

      3)部署:添加所有依賴(lài)的 jar (包括依賴(lài)的spark-streaming-twitter_2.10 及其依賴(lài)),然后部署應(yīng)用程序。

      需要注意的是,這些高級(jí)的來(lái)源一般在Spark Shell中不可用,因此基于這些高級(jí)來(lái)源的應(yīng)用不能在Spark Shell中進(jìn)行測(cè)試。如果你必須在Spark shell中使用它們,你需要下載相應(yīng)的Maven工程的Jar依賴(lài)并添加到類(lèi)路徑中。

      其中一些高級(jí)來(lái)源如下:

      lTwitter Spark StreamingTwitterUtils工具類(lèi)使用Twitter4jTwitter4J 庫(kù)支持通過(guò)任何方法提供身份驗(yàn)證信息,你可以得到公眾的流,或得到基于關(guān)鍵詞過(guò)濾流。

      lFlume Spark Streaming可以從Flume中接受數(shù)據(jù)。

      lKafka Spark Streaming可以從Kafka中接受數(shù)據(jù)。

      lKinesis Spark Streaming可以從Kinesis中接受數(shù)據(jù)。

      需要重申的一點(diǎn)是在開(kāi)始編寫(xiě)自己的 SparkStreaming 程序之前,一定要將高級(jí)來(lái)源依賴(lài)的Jar添加到SBT Maven 項(xiàng)目相應(yīng)的artifact中。常見(jiàn)的輸入源和其對(duì)應(yīng)的Jar包如下圖所示。

       clip_image014

      另外,輸入DStream也可以創(chuàng)建自定義的數(shù)據(jù)源,需要做的就是實(shí)現(xiàn)一個(gè)用戶(hù)定義的接收器。

      2.2.3 DStream的操作

      RDD類(lèi)似,DStream也提供了自己的一系列操作方法,這些操作可以分成三類(lèi):普通的轉(zhuǎn)換操作、窗口轉(zhuǎn)換操作和輸出操作。

      2.2.3.1 普通的轉(zhuǎn)換操作

      普通的轉(zhuǎn)換操作如下表所示:

      轉(zhuǎn)換

      描述

      map(func)

      DStream的每個(gè)元素通過(guò)函數(shù)func返回一個(gè)新的DStream

      flatMap(func)

      類(lèi)似與map操作,不同的是每個(gè)輸入元素可以被映射出0或者更多的輸出元素。

      filter(func)

      在源DSTREAM上選擇Func函數(shù)返回僅為true的元素,最終返回一個(gè)新的DSTREAM

      repartition(numPartitions)

      通過(guò)輸入的參數(shù)numPartitions的值來(lái)改變DStream的分區(qū)大小。

      union(otherStream)

      返回一個(gè)包含源DStream與其他 DStream的元素合并后的新DSTREAM

      count()

      對(duì)源DStream內(nèi)部的所含有的RDD的元素?cái)?shù)量進(jìn)行計(jì)數(shù),返回一個(gè)內(nèi)部的RDD只包含一個(gè)元素的DStreaam

      reduce(func)

      使用函數(shù)func(有兩個(gè)參數(shù)并返回一個(gè)結(jié)果)將源DStream 中每個(gè)RDD的元素進(jìn)行聚 合操作,返回一個(gè)內(nèi)部所包含的RDD只有一個(gè)元素的新DStream

      countByValue()

      計(jì)算DStream中每個(gè)RDD內(nèi)的元素出現(xiàn)的頻次并返回新的DStream[(K,Long)],其中KRDD中元素的類(lèi)型,Long是元素出現(xiàn)的頻次。

      reduceByKey(func, [numTasks])

      當(dāng)一個(gè)類(lèi)型為(KV)鍵值對(duì)的DStream被調(diào)用的時(shí)候,返回類(lèi)型為類(lèi)型為(KV)鍵值對(duì)的新 DStream,其中每個(gè)鍵的值V都是使用聚合函數(shù)func匯總。注意:默認(rèn)情況下,使用 Spark的默認(rèn)并行度提交任務(wù)(本地模式下并行度為2,集群模式下位8),可以通過(guò)配置numTasks設(shè)置不同的并行任務(wù)數(shù)。

      join(otherStream, [numTasks])

      當(dāng)被調(diào)用類(lèi)型分別為(KV)和(KW)鍵值對(duì)的2個(gè)DStream 時(shí),返回類(lèi)型為(K,(VW))鍵值對(duì)的一個(gè)新 DSTREAM

      cogroup(otherStream, [numTasks])

      當(dāng)被調(diào)用的兩個(gè)DStream分別含有(K, V) (K, W)鍵值對(duì)時(shí),返回一個(gè)(K, Seq[V], Seq[W])類(lèi)型的新的DStream

      transform(func)

      通過(guò)對(duì)源DStream的每RDD應(yīng)用RDD-to-RDD函數(shù)返回一個(gè)新的DStream,這可以用來(lái)在DStream做任意RDD操作。

      updateStateByKey(func)

      返回一個(gè)新?tīng)顟B(tài)的DStream,其中每個(gè)鍵的狀態(tài)是根據(jù)鍵的前一個(gè)狀態(tài)和鍵的新值應(yīng)用給定函數(shù)func后的更新。這個(gè)方法可以被用來(lái)維持每個(gè)鍵的任何狀態(tài)數(shù)據(jù)。

      在上面列出的這些操作中,transform()方法和updateStateByKey()方法值得我們深入的探討一下:

      l  transform(func)操作

      transform操作(轉(zhuǎn)換操作)連同其其類(lèi)似的 transformWith操作允許DStream 上應(yīng)用任意RDD-to-RDD函數(shù)。它可以被應(yīng)用于未在 DStream API 中暴露任何的RDD操作。例如,在每批次的數(shù)據(jù)流與另一數(shù)據(jù)集的連接功能不直接暴露在DStream API 中,但可以輕松地使用transform操作來(lái)做到這一點(diǎn),這使得DStream的功能非常強(qiáng)大。例如,你可以通過(guò)連接預(yù)先計(jì)算的垃圾郵件信息的輸入數(shù)據(jù)流(可能也有Spark生成的),然后基于此做實(shí)時(shí)數(shù)據(jù)清理的篩選,如下面官方提供的偽代碼所示。事實(shí)上,也可以在transform方法中使用機(jī)器學(xué)習(xí)和圖形計(jì)算的算法。

      l  updateStateByKey操作

      updateStateByKey 操作可以讓你保持任意狀態(tài),同時(shí)不斷有新的信息進(jìn)行更新。要使用此功能,必須進(jìn)行兩個(gè)步驟 :

      (1)  定義狀態(tài) - 狀態(tài)可以是任意的數(shù)據(jù)類(lèi)型。

      (2)  定義狀態(tài)更新函數(shù) - 用一個(gè)函數(shù)指定如何使用先前的狀態(tài)和從輸入流中獲取的新值 更新?tīng)顟B(tài)。

      讓我們用一個(gè)例子來(lái)說(shuō)明,假設(shè)你要進(jìn)行文本數(shù)據(jù)流中單詞計(jì)數(shù)。在這里,正在運(yùn)行的計(jì)數(shù)是狀態(tài)而且它是一個(gè)整數(shù)。我們定義了更新功能如下:

       clip_image016

      此函數(shù)應(yīng)用于含有鍵值對(duì)的DStream中(如前面的示例中,在DStream中含有(word1)鍵值對(duì))。它會(huì)針對(duì)里面的每個(gè)元素(如wordCount中的word)調(diào)用一下更新函數(shù),newValues是最新的值,runningCount是之前的值。

      clip_image018

      2.2.3.2 窗口轉(zhuǎn)換操作

      Spark Streaming 還提供了窗口的計(jì)算,它允許你通過(guò)滑動(dòng)窗口對(duì)數(shù)據(jù)進(jìn)行轉(zhuǎn)換,窗口轉(zhuǎn)換操作如下:

      轉(zhuǎn)換

      描述

      window(windowLength, slideInterval)

      返回一個(gè)基于源DStream的窗口批次計(jì)算后得到新的DStream

      countByWindow(windowLength,slideInterval)

      返回基于滑動(dòng)窗口的DStream中的元素的數(shù)量。

      reduceByWindow(func, windowLength,slideInterval)

      基于滑動(dòng)窗口對(duì)源DStream中的元素進(jìn)行聚合操作,得到一個(gè)新的DStream

      reduceByKeyAndWindow(func,windowLength, slideInterval, [numTasks])

      基于滑動(dòng)窗口對(duì)(KV)鍵值對(duì)類(lèi)型的DStream中的值按K使用聚合函數(shù)func進(jìn)行聚合操作,得到一個(gè)新的DStream

      reduceByKeyAndWindow(func, invFunc,windowLength, slideInterval, [numTasks])

      一個(gè)更高效的reduceByKkeyAndWindow()的實(shí)現(xiàn)版本,先對(duì)滑動(dòng)窗口中新的時(shí)間間隔內(nèi)數(shù)據(jù)增量聚合并移去最早的與新增數(shù)據(jù)量的時(shí)間間隔內(nèi)的數(shù)據(jù)統(tǒng)計(jì)量。例如,計(jì)算t+4秒這個(gè)時(shí)刻過(guò)去5秒窗口的WordCount,那么我們可以將t+3時(shí)刻過(guò)去5秒的統(tǒng)計(jì)量加上[t+3t+4]的統(tǒng)計(jì)量,在減去[t-2t-1]的統(tǒng)計(jì)量,這種方法可以復(fù)用中間三秒的統(tǒng)計(jì)量,提高統(tǒng)計(jì)的效率。

      countByValueAndWindow(windowLength,slideInterval, [numTasks])

      基于滑動(dòng)窗口計(jì)算源DStream中每個(gè)RDD內(nèi)每個(gè)元素出現(xiàn)的頻次并返回DStream[(K,Long)],其中KRDD中元素的類(lèi)型,Long是元素頻次。與countByValue一樣,reduce任務(wù)的數(shù)量可以通過(guò)一個(gè)可選參數(shù)進(jìn)行配置。

      clip_image020

      批處理間隔示意圖

      Spark Streaming中,數(shù)據(jù)處理是按批進(jìn)行的,而數(shù)據(jù)采集是逐條進(jìn)行的,因此在Spark Streaming中會(huì)先設(shè)置好批處理間隔(batch duration),當(dāng)超過(guò)批處理間隔的時(shí)候就會(huì)把采集到的數(shù)據(jù)匯總起來(lái)成為一批數(shù)據(jù)交給系統(tǒng)去處理。

      對(duì)于窗口操作而言,在其窗口內(nèi)部會(huì)有N個(gè)批處理數(shù)據(jù),批處理數(shù)據(jù)的大小由窗口間隔(window duration)決定,而窗口間隔指的就是窗口的持續(xù)時(shí)間,在窗口操作中,只有窗口的長(zhǎng)度滿(mǎn)足了才會(huì)觸發(fā)批數(shù)據(jù)的處理。除了窗口的長(zhǎng)度,窗口操作還有另一個(gè)重要的參數(shù)就是滑動(dòng)間隔(slide duration),它指的是經(jīng)過(guò)多長(zhǎng)時(shí)間窗口滑動(dòng)一次形成新的窗口,滑動(dòng)窗口默認(rèn)情況下和批次間隔的相同,而窗口間隔一般設(shè)置的要比它們兩個(gè)大。在這里必須注意的一點(diǎn)是滑動(dòng)間隔和窗口間隔的大小一定得設(shè)置為批處理間隔的整數(shù)倍。

      如批處理間隔示意圖所示,批處理間隔是1個(gè)時(shí)間單位,窗口間隔是3個(gè)時(shí)間單位,滑動(dòng)間隔是2個(gè)時(shí)間單位。對(duì)于初始的窗口time 1-time 3,只有窗口間隔滿(mǎn)足了才觸發(fā)數(shù)據(jù)的處理。這里需要注意的一點(diǎn)是,初始的窗口有可能流入的數(shù)據(jù)沒(méi)有撐滿(mǎn),但是隨著時(shí)間的推進(jìn),窗口最終會(huì)被撐滿(mǎn)。當(dāng)每個(gè)2個(gè)時(shí)間單位,窗口滑動(dòng)一次后,會(huì)有新的數(shù)據(jù)流入窗口,這時(shí)窗口會(huì)移去最早的兩個(gè)時(shí)間單位的數(shù)據(jù),而與最新的兩個(gè)時(shí)間單位的數(shù)據(jù)進(jìn)行匯總形成新的窗口(time3-time5)。

      對(duì)于窗口操作,批處理間隔、窗口間隔和滑動(dòng)間隔是非常重要的三個(gè)時(shí)間概念,是理解窗口操作的關(guān)鍵所在。

      2.2.3.3 輸出操作

      Spark Streaming允許DStream的數(shù)據(jù)被輸出到外部系統(tǒng),如數(shù)據(jù)庫(kù)或文件系統(tǒng)。由于輸出操作實(shí)際上使transformation操作后的數(shù)據(jù)可以通過(guò)外部系統(tǒng)被使用,同時(shí)輸出操作觸發(fā)所有DStreamtransformation操作的實(shí)際執(zhí)行(類(lèi)似于RDD操作)。以下表列出了目前主要的輸出操作:

      轉(zhuǎn)換

      描述

      print()

      Driver中打印出DStream中數(shù)據(jù)的前10個(gè)元素。

      saveAsTextFiles(prefix, [suffix])

      DStream中的內(nèi)容以文本的形式保存為文本文件,其中每次批處理間隔內(nèi)產(chǎn)生的文件以prefix-TIME_IN_MS[.suffix]的方式命名。

      saveAsObjectFiles(prefix, [suffix])

      DStream中的內(nèi)容按對(duì)象序列化并且以SequenceFile的格式保存。其中每次批處理間隔內(nèi)產(chǎn)生的文件以prefix-TIME_IN_MS[.suffix]的方式命名。

      saveAsHadoopFiles(prefix, [suffix])

      DStream中的內(nèi)容以文本的形式保存為Hadoop文件,其中每次批處理間隔內(nèi)產(chǎn)生的文件以prefix-TIME_IN_MS[.suffix]的方式命名。

      foreachRDD(func)

      最基本的輸出操作,將func函數(shù)應(yīng)用于DStream中的RDD上,這個(gè)操作會(huì)輸出數(shù)據(jù)到外部系統(tǒng),比如保存RDD到文件或者網(wǎng)絡(luò)數(shù)據(jù)庫(kù)等。需要注意的是func函數(shù)是在運(yùn)行該streaming應(yīng)用的Driver進(jìn)程里執(zhí)行的。

      dstream.foreachRDD是一個(gè)非常強(qiáng)大的輸出操作,它允將許數(shù)據(jù)輸出到外部系統(tǒng)。但是 ,如何正確高效地使用這個(gè)操作是很重要的,下面展示了如何去避免一些常見(jiàn)的錯(cuò)誤。

      通常將數(shù)據(jù)寫(xiě)入到外部系統(tǒng)需要?jiǎng)?chuàng)建一個(gè)連接對(duì)象(如 TCP連接到遠(yuǎn)程服務(wù)器),并用它來(lái)發(fā)送數(shù)據(jù)到遠(yuǎn)程系統(tǒng)。出于這個(gè)目的,開(kāi)發(fā)者可能在不經(jīng)意間在Spark driver端創(chuàng)建了連接對(duì)象,并嘗試使用它保存RDD中的記錄到Spark worker上,如下面代碼:

      clip_image022

      這是不正確的,這需要連接對(duì)象進(jìn)行序列化并從Driver端發(fā)送到Worker上。連接對(duì)象很少在不同機(jī)器間進(jìn)行這種操作,此錯(cuò)誤可能表現(xiàn)為序列化錯(cuò)誤(連接對(duì)不可序列化),初始化錯(cuò)誤(連接對(duì)象在需要在Worker 上進(jìn)行需要初始化) 等等,正確的解決辦法是在 worker上創(chuàng)建的連接對(duì)象。

      通常情況下,創(chuàng)建一個(gè)連接對(duì)象有時(shí)間和資源開(kāi)銷(xiāo)。因此,創(chuàng)建和銷(xiāo)毀的每條記錄的連接對(duì)象可能招致不必要的資源開(kāi)銷(xiāo),并顯著降低系統(tǒng)整體的吞吐量 。一個(gè)更好的解決方案是使用rdd.foreachPartition方法創(chuàng)建一個(gè)單獨(dú)的連接對(duì)象,然后使用該連接對(duì)象輸出的所有RDD分區(qū)中的數(shù)據(jù)到外部系統(tǒng)。

       這緩解了創(chuàng)建多條記錄連接的開(kāi)銷(xiāo)。最后,還可以進(jìn)一步通過(guò)在多個(gè)RDDs/ batches上重用連接對(duì)象進(jìn)行優(yōu)化。一個(gè)保持連接對(duì)象的靜態(tài)池可以重用在多個(gè)批處理的RDD上將其輸出到外部系統(tǒng),從而進(jìn)一步降低了開(kāi)銷(xiāo)。

       需要注意的是,在靜態(tài)池中的連接應(yīng)該按需延遲創(chuàng)建,這樣可以更有效地把數(shù)據(jù)發(fā)送到外部系統(tǒng)。另外需要要注意的是:DStreams延遲執(zhí)行的,就像RDD的操作是由actions觸發(fā)一樣。默認(rèn)情況下,輸出操作會(huì)按照它們?cè)?span lang="EN-US">Streaming應(yīng)用程序中定義的順序一個(gè)個(gè)執(zhí)行。

      2.3  容錯(cuò)、持久化和性能調(diào)優(yōu)

      2.3.1 容錯(cuò)

      DStream基于RDD組成,RDD的容錯(cuò)性依舊有效,我們首先回憶一下SparkRDD的基本特性。

      lRDD是一個(gè)不可變的、確定性的可重復(fù)計(jì)算的分布式數(shù)據(jù)集。RDD的某些partition丟失了,可以通過(guò)血統(tǒng)(lineage)信息重新計(jì)算恢復(fù);

      l如果RDD任何分區(qū)因worker節(jié)點(diǎn)故障而丟失,那么這個(gè)分區(qū)可以從原來(lái)依賴(lài)的容錯(cuò)數(shù)據(jù)集中恢復(fù);

      l由于Spark中所有的數(shù)據(jù)的轉(zhuǎn)換操作都是基于RDD的,即使集群出現(xiàn)故障,只要輸入數(shù)據(jù)集存在,所有的中間結(jié)果都是可以被計(jì)算的。

      Spark Streaming是可以從HDFSS3這樣的文件系統(tǒng)讀取數(shù)據(jù)的,這種情況下所有的數(shù)據(jù)都可以被重新計(jì)算,不用擔(dān)心數(shù)據(jù)的丟失。但是在大多數(shù)情況下,Spark Streaming是基于網(wǎng)絡(luò)來(lái)接受數(shù)據(jù)的,此時(shí)為了實(shí)現(xiàn)相同的容錯(cuò)處理,在接受網(wǎng)絡(luò)的數(shù)據(jù)時(shí)會(huì)在集群的多個(gè)Worker節(jié)點(diǎn)間進(jìn)行數(shù)據(jù)的復(fù)制(默認(rèn)的復(fù)制數(shù)是2),這導(dǎo)致產(chǎn)生在出現(xiàn)故障時(shí)被處理的兩種類(lèi)型的數(shù)據(jù):

      1Data received and replicated :一旦一個(gè)Worker節(jié)點(diǎn)失效,系統(tǒng)會(huì)從另一份還存在的數(shù)據(jù)中重新計(jì)算。

      2Data received but buffered for replication :一旦數(shù)據(jù)丟失,可以通過(guò)RDD之間的依賴(lài)關(guān)系,從HDFS這樣的外部文件系統(tǒng)讀取數(shù)據(jù)。

      此外,有兩種故障,我們應(yīng)該關(guān)心:

      1Worker節(jié)點(diǎn)失效:通過(guò)上面的講解我們知道,這時(shí)系統(tǒng)會(huì)根據(jù)出現(xiàn)故障的數(shù)據(jù)的類(lèi)型,選擇是從另一個(gè)有復(fù)制過(guò)數(shù)據(jù)的工作節(jié)點(diǎn)上重新計(jì)算,還是直接從從外部文件系統(tǒng)讀取數(shù)據(jù)。

      2Driver(驅(qū)動(dòng)節(jié)點(diǎn))失效 :如果運(yùn)行 Spark Streaming應(yīng)用時(shí)驅(qū)動(dòng)節(jié)點(diǎn)出現(xiàn)故障,那么很明顯的StreamingContext已經(jīng)丟失,同時(shí)在內(nèi)存中的數(shù)據(jù)全部丟失。對(duì)于這種情況,Spark Streaming應(yīng)用程序在計(jì)算上有一個(gè)內(nèi)在的結(jié)構(gòu)——在每段micro-batch數(shù)據(jù)周期性地執(zhí)行同樣的Spark計(jì)算。這種結(jié)構(gòu)允許把應(yīng)用的狀態(tài)(亦稱(chēng)checkpoint)周期性地保存到可靠的存儲(chǔ)空間中,并在driver重新啟動(dòng)時(shí)恢復(fù)該狀態(tài)。具體做法是在ssc.checkpoint(<checkpoint directory>)函數(shù)中進(jìn)行設(shè)置,Spark Streaming就會(huì)定期把DStream的元信息寫(xiě)入到HDFS中,一旦驅(qū)動(dòng)節(jié)點(diǎn)失效,丟失的StreamingContext會(huì)通過(guò)已經(jīng)保存的檢查點(diǎn)信息進(jìn)行恢復(fù)。

      最后我們談一下Spark Stream的容錯(cuò)在Spark 1.2版本的一些改進(jìn):

      實(shí)時(shí)流處理系統(tǒng)必須要能在24/7時(shí)間內(nèi)工作,因此它需要具備從各種系統(tǒng)故障中恢復(fù)過(guò)來(lái)的能力。最開(kāi)始,SparkStreaming就支持從driverworker故障恢復(fù)的能力。然而有些數(shù)據(jù)源的輸入可能在故障恢復(fù)以后丟失數(shù)據(jù)。在Spark1.2版本中,Spark已經(jīng)在SparkStreaming中對(duì)預(yù)寫(xiě)日志(也被稱(chēng)為journaling)作了初步支持,改進(jìn)了恢復(fù)機(jī)制,并使更多數(shù)據(jù)源的零數(shù)據(jù)丟失有了可靠。

      對(duì)于文件這樣的源數(shù)據(jù),driver恢復(fù)機(jī)制足以做到零數(shù)據(jù)丟失,因?yàn)樗械臄?shù)據(jù)都保存在了像HDFSS3這樣的容錯(cuò)文件系統(tǒng)中了。但對(duì)于像KafkaFlume等其它數(shù)據(jù)源,有些接收到的數(shù)據(jù)還只緩存在內(nèi)存中,尚未被處理,它們就有可能會(huì)丟失。這是由于Spark應(yīng)用的分布操作方式引起的。當(dāng)driver進(jìn)程失敗時(shí),所有在standalone/yarn/mesos集群運(yùn)行的executor,連同它們?cè)趦?nèi)存中的所有數(shù)據(jù),也同時(shí)被終止。對(duì)于Spark Streaming來(lái)說(shuō),從諸如KafkaFlume的數(shù)據(jù)源接收到的所有數(shù)據(jù),在它們處理完成之前,一直都緩存在executor的內(nèi)存中。縱然driver重新啟動(dòng),這些緩存的數(shù)據(jù)也不能被恢復(fù)。為了避免這種數(shù)據(jù)損失,在Spark1.2發(fā)布版本中引進(jìn)了預(yù)寫(xiě)日志(WriteAheadLogs)功能。

      預(yù)寫(xiě)日志功能的流程是:1)一個(gè)SparkStreaming應(yīng)用開(kāi)始時(shí)(也就是driver開(kāi)始時(shí)),相關(guān)的StreamingContext使用SparkContext啟動(dòng)接收器成為長(zhǎng)駐運(yùn)行任務(wù)。這些接收器接收并保存流數(shù)據(jù)到Spark內(nèi)存中以供處理。2)接收器通知driver3)接收塊中的元數(shù)據(jù)(metadata)被發(fā)送到driverStreamingContext。這個(gè)元數(shù)據(jù)包括:(a)定位其在executor內(nèi)存中數(shù)據(jù)的塊referenceid,(b)塊數(shù)據(jù)在日志中的偏移信息(如果啟用了)。

      用戶(hù)傳送數(shù)據(jù)的生命周期如下圖所示。

       clip_image024

      類(lèi)似Kafka這樣的系統(tǒng)可以通過(guò)復(fù)制數(shù)據(jù)保持可靠性。允許預(yù)寫(xiě)日志兩次高效地復(fù)制同樣的數(shù)據(jù):一次由Kafka,而另一次由SparkStreamingSpark未來(lái)版本將包含Kafka容錯(cuò)機(jī)制的原生支持,從而避免第二個(gè)日志。

      2.3.2 持久化

      RDD一樣,DStream同樣也能通過(guò)persist()方法將數(shù)據(jù)流存放在內(nèi)存中,默認(rèn)的持久化方式是MEMORY_ONLY_SER,也就是在內(nèi)存中存放數(shù)據(jù)同時(shí)序列化的方式,這樣做的好處是遇到需要多次迭代計(jì)算的程序時(shí),速度優(yōu)勢(shì)十分的明顯。而對(duì)于一些基于窗口的操作,如reduceByWindowreduceByKeyAndWindow,以及基于狀態(tài)的操作,如updateStateBykey,其默認(rèn)的持久化策略就是保存在內(nèi)存中。

      對(duì)于來(lái)自網(wǎng)絡(luò)的數(shù)據(jù)源(KafkaFlumesockets等),默認(rèn)的持久化策略是將數(shù)據(jù)保存在兩臺(tái)機(jī)器上,這也是為了容錯(cuò)性而設(shè)計(jì)的。

      另外,對(duì)于窗口和有狀態(tài)的操作必須checkpoint,通過(guò)StreamingContextcheckpoint來(lái)指定目錄,通過(guò) Dtreamcheckpoint指定間隔時(shí)間,間隔必須是滑動(dòng)間隔(slide interval)的倍數(shù)。

      2.3.3 性能調(diào)優(yōu)

      1.  優(yōu)化運(yùn)行時(shí)間

      l 增加并行度 確保使用整個(gè)集群的資源,而不是把任務(wù)集中在幾個(gè)特定的節(jié)點(diǎn)上。對(duì)于包含shuffle的操作,增加其并行度以確保更為充分地使用集群資源;

      l 減少數(shù)據(jù)序列化,反序列化的負(fù)擔(dān) Spark Streaming默認(rèn)將接受到的數(shù)據(jù)序列化后存儲(chǔ),以減少內(nèi)存的使用。但是序列化和反序列話(huà)需要更多的CPU時(shí)間,因此更加高效的序列化方式(Kryo)和自定義的系列化接口可以更高效地使用CPU

      l 設(shè)置合理的batch duration(批處理時(shí)間間)Spark Streaming中,Job之間有可能存在依賴(lài)關(guān)系,后面的Job必須確保前面的作業(yè)執(zhí)行結(jié)束后才能提交。若前面的Job執(zhí)行的時(shí)間超出了批處理時(shí)間間隔,那么后面的Job就無(wú)法按時(shí)提交,這樣就會(huì)進(jìn)一步拖延接下來(lái)的Job,造成后續(xù)Job的阻塞。因此設(shè)置一個(gè)合理的批處理間隔以確保作業(yè)能夠在這個(gè)批處理間隔內(nèi)結(jié)束時(shí)必須的;

      l  減少因任務(wù)提交和分發(fā)所帶來(lái)的負(fù)擔(dān) 通常情況下,Akka框架能夠高效地確保任務(wù)及時(shí)分發(fā),但是當(dāng)批處理間隔非常小(500ms)時(shí),提交和分發(fā)任務(wù)的延遲就變得不可接受了。使用StandaloneCoarse-grained Mesos模式通常會(huì)比使用Fine-grained Mesos模式有更小的延遲。

      2.  優(yōu)化內(nèi)存使用

      l控制batch size(批處理間隔內(nèi)的數(shù)據(jù)量) Spark Streaming會(huì)把批處理間隔內(nèi)接收到的所有數(shù)據(jù)存放在Spark內(nèi)部的可用內(nèi)存區(qū)域中,因此必須確保當(dāng)前節(jié)點(diǎn)Spark的可用內(nèi)存中少能容納這個(gè)批處理時(shí)間間隔內(nèi)的所有數(shù)據(jù),否則必須增加新的資源以提高集群的處理能力;

      l及時(shí)清理不再使用的數(shù)據(jù) 前面講到Spark Streaming會(huì)將接受的數(shù)據(jù)全部存儲(chǔ)到內(nèi)部可用內(nèi)存區(qū)域中,因此對(duì)于處理過(guò)的不再需要的數(shù)據(jù)應(yīng)及時(shí)清理,以確保Spark Streaming有富余的可用內(nèi)存空間。通過(guò)設(shè)置合理的spark.cleaner.ttl時(shí)長(zhǎng)來(lái)及時(shí)清理超時(shí)的無(wú)用數(shù)據(jù),這個(gè)參數(shù)需要小心設(shè)置以免后續(xù)操作中所需要的數(shù)據(jù)被超時(shí)錯(cuò)誤處理;

      l觀察及適當(dāng)調(diào)整GC策略 GC會(huì)影響Job的正常運(yùn)行,可能延長(zhǎng)Job的執(zhí)行時(shí)間,引起一系列不可預(yù)料的問(wèn)題。觀察GC的運(yùn)行情況,采用不同的GC策略以進(jìn)一步減小內(nèi)存回收對(duì)Job運(yùn)行的影響。

       

      參考資料:

      (1)Spark Streaminghttp://blog.debugo.com/spark-streaming/

      posted @ 2015-09-01 06:51  shishanyuan  閱讀(107354)  評(píng)論(7)    收藏  舉報(bào)
      主站蜘蛛池模板: 久久国内精品自在自线91| 亚洲人成网站在线观看播放不卡| 精品国产免费一区二区三区香蕉| 虎白女粉嫩尤物福利视频| 国产成人a在线观看视频免费| 久久久久久久无码高潮| 国产女同一区二区在线| 亚洲日韩精品一区二区三区无码| 亚洲精品久久久久久无码色欲四季 | 成人福利国产午夜AV免费不卡在线 | 久本草在线中文字幕亚洲| 国内在线视频一区二区三区| 偷拍美女厕所尿尿嘘嘘小便| 欧美亚洲另类制服卡通动漫| 中文字幕在线视频不卡一区二区| 人妻少妇偷人无码视频| 免费观看日本污污ww网站69| 亚洲国产精品成人一区二区在线| 好深好湿好硬顶到了好爽| 欧美丰满熟妇性xxxx| 欧美特级午夜一区二区三区| 伊人久久大香线蕉网av| 我要看亚洲黄色太黄一级黄| 成人国产精品日本在线观看| 亚洲色av天天天天天天| 无套内谢少妇一二三四| 国产在线视频www色| 成人av专区精品无码国产| 午夜射精日本三级| 欧美性受xxxx黑人猛交| 国产成人卡2卡3卡4乱码| 国产在线精品一区二区三区不卡| 国产仑乱无码内谢| 国内精品伊人久久久久777| 九九热精品免费在线视频| 五月婷久久麻豆国产| 中文字幕无线码免费人妻| 东京热大乱系列无码| 在线观看国产一区亚洲bd| 91久久国产成人免费观看| 日韩成av在线免费观看|