<output id="qn6qe"></output>

    1. <output id="qn6qe"><tt id="qn6qe"></tt></output>
    2. <strike id="qn6qe"></strike>

      亚洲 日本 欧洲 欧美 视频,日韩中文字幕有码av,一本一道av中文字幕无码,国产线播放免费人成视频播放,人妻少妇偷人无码视频,日夜啪啪一区二区三区,国产尤物精品自在拍视频首页,久热这里只有精品12

      Spark入門實戰系列--7.Spark Streaming(下)--實時流計算Spark Streaming實戰

      【注】該系列文章以及使用到安裝包/測試數據 可以在《傾情大奉送--Spark入門實戰系列》獲取

      1實例演示

      1.1 流數據模擬器

      1.1.1 流數據說明

      在實例演示中模擬實際情況,需要源源不斷地接入流數據,為了在演示過程中更接近真實環境將定義流數據模擬器。該模擬器主要功能:通過Socket方式監聽指定的端口號,當外部程序通過該端口連接并請求數據時,模擬器將定時將指定的文件數據隨機獲取發送給外部程序。

      1.1.2 模擬器代碼

      import java.io.{PrintWriter}

      import java.net.ServerSocket

      import scala.io.Source

       

      object StreamingSimulation {

        // 定義隨機獲取整數的方法

        def index(length: Int) = {

          import java.util.Random

          val rdm = new Random

          rdm.nextInt(length)

        }

       

        def main(args: Array[String]) {

          // 調用該模擬器需要三個參數,分為為文件路徑、端口號和間隔時間(單位:毫秒)

          if (args.length != 3) {

            System.err.println("Usage: <filename> <port> <millisecond>")

            System.exit(1)

          }

       

          // 獲取指定文件總的行數

          val filename = args(0)

          val lines = Source.fromFile(filename).getLines.toList

          val filerow = lines.length

       

          // 指定監聽某端口,當外部程序請求時建立連接

          val listener = new ServerSocket(args(1).toInt)

          while (true) {

            val socket = listener.accept()

            new Thread() {

              override def run = {

                println("Got client connected from: " + socket.getInetAddress)

                val out = new PrintWriter(socket.getOutputStream(), true)

                while (true) {

                  Thread.sleep(args(2).toLong)

                  // 當該端口接受請求時,隨機獲取某行數據發送給對方

                  val content = lines(index(filerow))

                  println(content)

                  out.write(content + '\n')

                  out.flush()

                }

                socket.close()

              }

            }.start()

          }

        }

      }

      clip_image002

      1.1.3 生成打包文件

      【注】可以參見第3課《Spark編程模型(下)--IDEA搭建及實戰》進行打包

      clip_image004

      在打包配置界面中,需要在Class Path加入:/app/scala-2.10.4/lib/scala-swing.jar /app/scala-2.10.4/lib/scala-library.jar /app/scala-2.10.4/lib/scala-actors.jar ,各個jar包之間用空格分開,

      點擊菜單Build->Build Artifacts,彈出選擇動作,選擇Build或者Rebuild動作,使用如下命令復制打包文件到Spark根目錄下

      cd /home/hadoop/IdeaProjects/out/artifacts/LearnSpark_jar

      cp LearnSpark.jar /app/hadoop/spark-1.1.0/

      ll /app/hadoop/spark-1.1.0/

      clip_image006

      1.2 實例1:讀取文件演示

      1.2.1 演示說明

      在該實例中Spark Streaming將監控某目錄中的文件,獲取在間隔時間段內變化的數據,然后通過Spark Streaming計算出改時間段內單詞統計數。

      1.2.2 演示代碼

      import org.apache.spark.SparkConf

      import org.apache.spark.streaming.{Seconds, StreamingContext}

      import org.apache.spark.streaming.StreamingContext._

       

      object FileWordCount {

        def main(args: Array[String]) {

          val sparkConf = new SparkConf().setAppName("FileWordCount").setMaster("local[2]")

       

          // 創建Streaming的上下文,包括Spark的配置和時間間隔,這里時間為間隔20

          val ssc = new StreamingContext(sparkConf, Seconds(20))

       

          // 指定監控的目錄,在這里為/home/hadoop/temp/

          val lines = ssc.textFileStream("/home/hadoop/temp/")

       

          // 對指定文件夾變化的數據進行單詞統計并且打印

          val words = lines.flatMap(_.split(" "))

          val wordCounts = words.map(x => (x, 1)).reduceByKey(_ + _)

          wordCounts.print()

       

             // 啟動Streaming

          ssc.start()

          ssc.awaitTermination()

        }

      }

      clip_image008

      1.2.3 運行代碼

      第一步   創建Streaming監控目錄

      創建/home/hadoop/tempSpark Streaming監控的目錄,通過在該目錄中定時添加文件內容,然后由Spark Streaming統計出單詞個數

      clip_image010

      第二步   使用如下命令啟動Spark集群

      $cd /app/hadoop/spark-1.1.0

      $sbin/start-all.sh

      第三步   IDEA中運行Streaming程序

      IDEA中運行該實例,由于該實例沒有輸入參數故不需要配置參數,在運行日志中將定時打印時間戳。如果在監控目錄中加入文件內容,將輸出時間戳的同時將輸出單詞統計個數。

      clip_image012

      1.2.4 添加文本及內容

      clip_image014

      clip_image016

      1.2.5 查看結果

      第一步   查看IDEA中運行情況

      IDEA的運行日志窗口中,可以觀察到輸出時間戳的同時將輸出單詞統計個數

      clip_image018

      第二步   通過webUI監控運行情況

      http://hadoop1:4040監控Spark Streaming運行情況,可以觀察到每20秒運行一次作業

      clip_image020

      并且與其他運行作業相比在監控菜單增加了"Streaming"項目,點擊可以看到監控內容:

      clip_image022

      1.3 實例2:網絡數據演示

      1.3.1 演示說明

      在該實例中將由4.1流數據模擬以1秒的頻度發送模擬數據,Spark Streaming通過Socket接收流數據并每20秒運行一次用來處理接收到數據,處理完畢后打印該時間段內數據出現的頻度,即在各處理段時間之間狀態并無關系。

      1.3.2 演示代碼

      import org.apache.spark.{SparkContext, SparkConf}

      import org.apache.spark.streaming.{Milliseconds, Seconds, StreamingContext}

      import org.apache.spark.streaming.StreamingContext._

      import org.apache.spark.storage.StorageLevel

       

      object NetworkWordCount {

        def main(args: Array[String]) {

          val conf = new SparkConf().setAppName("NetworkWordCount").setMaster("local[2]")

          val sc = new SparkContext(conf)

          val ssc = new StreamingContext(sc, Seconds(20))

       

          // 通過Socket獲取數據,該處需要提供Socket的主機名和端口號,數據保存在內存和硬盤中

          val lines = ssc.socketTextStream(args(0), args(1).toInt, StorageLevel.MEMORY_AND_DISK_SER)

       

          // 對讀入的數據進行分割、計數

          val words = lines.flatMap(_.split(","))

          val wordCounts = words.map(x => (x, 1)).reduceByKey(_ + _)

       

          wordCounts.print()

          ssc.start()

          ssc.awaitTermination()

        }

      }

      clip_image024

      1.3.3 運行代碼

      第一步   啟動流數據模擬器

      啟動4.1打包好的流數據模擬器,在該實例中將定時發送/home/hadoop/upload/class7目錄下的people.txt數據文件(該文件可以在本系列配套資源目錄/data/class7中找到),其中people.txt數據內容如下:

      clip_image026

      模擬器Socket端口號為9999,頻度為1秒,

      $cd /app/hadoop/spark-1.1.0

      $java -cp LearnSpark.jar class7.StreamingSimulation /home/hadoop/upload/class7/people.txt 9999 1000

      clip_image028

      在沒有程序連接時,該程序處于阻塞狀態

      第二步   IDEA中運行Streaming程序

      IDEA中運行該實例,該實例需要配置連接Socket主機名和端口號,在這里配置參數機器名為hadoop1和端口號為9999

      clip_image030

      1.3.4 查看結果

      第一步   觀察模擬器發送情況

      IDEA中的Spark Streaming程序運行與模擬器建立連接,當模擬器檢測到外部連接時開始發送測試數據,數據是隨機的在指定的文件中獲取一行數據并發送,時間間隔為1

      clip_image032

      第二步   在監控頁面觀察執行情況

      webUI上監控作業運行情況,可以觀察到每20秒運行一次作業

      clip_image034

      第三步   IDEA運行情況

      IDEA的運行窗口中,可以觀測到的統計結果,通過分析在Spark Streaming每段時間內單詞數為20,正好是20秒內每秒發送總數。

      clip_image036

      1.4 實例3:銷售數據統計演示

      1.4.1 演示說明

      在該實例中將由4.1流數據模擬器以1秒的頻度發送模擬數據(銷售數據),Spark Streaming通過Socket接收流數據并每5秒運行一次用來處理接收到數據,處理完畢后打印該時間段內銷售數據總和,需要注意的是各處理段時間之間狀態并無關系。

      1.4.2 演示代碼

      import org.apache.log4j.{Level, Logger}

      import org.apache.spark.{SparkContext, SparkConf}

      import org.apache.spark.streaming.{Milliseconds, Seconds, StreamingContext}

      import org.apache.spark.streaming.StreamingContext._

      import org.apache.spark.storage.StorageLevel

       

      object SaleAmount {

        def main(args: Array[String]) {

          if (args.length != 2) {

            System.err.println("Usage: SaleAmount <hostname> <port> ")

            System.exit(1)

          }

          Logger.getLogger("org.apache.spark").setLevel(Level.ERROR)

          Logger.getLogger("org.eclipse.jetty.server").setLevel(Level.OFF)

       

          val conf = new SparkConf().setAppName("SaleAmount").setMaster("local[2]")

          val sc = new SparkContext(conf)

          val ssc = new StreamingContext(sc, Seconds(5))

       

         // 通過Socket獲取數據,該處需要提供Socket的主機名和端口號,數據保存在內存和硬盤中

          val lines = ssc.socketTextStream(args(0), args(1).toInt, StorageLevel.MEMORY_AND_DISK_SER)

          val words = lines.map(_.split(",")).filter(_.length == 6)

          val wordCounts = words.map(x=>(1, x(5).toDouble)).reduceByKey(_ + _)

       

          wordCounts.print()

          ssc.start()

          ssc.awaitTermination()

        }

      }

      clip_image038

      1.4.3 運行代碼

      第一步   啟動流數據模擬器

      啟動4.1打包好的流數據模擬器,在該實例中將定時發送第五課/home/hadoop/upload/class5/saledata目錄下的tbStockDetail.txt數據文件(參見第五課《5.Hive(下)--Hive實戰》中2.1.2數據描述,該文件可以在本系列配套資源目錄/data/class5/saledata中找到),其中表tbStockDetail字段分別為訂單號、行號、貨品、數量、金額,數據內容如下:

      clip_image040

      模擬器Socket端口號為9999,頻度為1

      $cd /app/hadoop/spark-1.1.0

      $java -cp LearnSpark.jar class7.StreamingSimulation /home/hadoop/upload/class5/saledata/tbStockDetail.txt 9999 1000

      clip_image042

      IDEA中運行該實例,該實例需要配置連接Socket主機名和端口號,在這里配置參數機器名為hadoop1和端口號為9999

      clip_image044

      1.4.4 查看結果

      第一步   觀察模擬器發送情況

      IDEA中的Spark Streaming程序運行與模擬器建立連接,當模擬器檢測到外部連接時開始發送銷售數據,時間間隔為1

      clip_image046

       

      第二步   IDEA運行情況

      IDEA的運行窗口中,可以觀察到每5秒運行一次作業(兩次運行間隔為5000毫秒),運行完畢后打印該時間段內銷售數據總和。

      clip_image048

      第三步   在監控頁面觀察執行情況

      webUI上監控作業運行情況,可以觀察到每5秒運行一次作業

      clip_image050

      1.5 實例4Stateful演示

      1.5.1 演示說明

      該實例為Spark Streaming狀態操作,模擬數據由4.1流數據模擬以1秒的頻度發送,Spark Streaming通過Socket接收流數據并每5秒運行一次用來處理接收到數據,處理完畢后打印程序啟動后單詞出現的頻度,相比較前面4.3實例在該實例中各時間段之間狀態是相關的。

      1.5.2 演示代碼

      import org.apache.log4j.{Level, Logger}

      import org.apache.spark.{SparkContext, SparkConf}

      import org.apache.spark.streaming.{Seconds, StreamingContext}

      import org.apache.spark.streaming.StreamingContext._

       

      object StatefulWordCount {

        def main(args: Array[String]) {

          if (args.length != 2) {

            System.err.println("Usage: StatefulWordCount <filename> <port> ")

            System.exit(1)

          }

          Logger.getLogger("org.apache.spark").setLevel(Level.ERROR)

          Logger.getLogger("org.eclipse.jetty.server").setLevel(Level.OFF)

       

          // 定義更新狀態方法,參數values為當前批次單詞頻度,state為以往批次單詞頻度

          val updateFunc = (values: Seq[Int], state: Option[Int]) => {

            val currentCount = values.foldLeft(0)(_ + _)

            val previousCount = state.getOrElse(0)

            Some(currentCount + previousCount)

          }

       

          val conf = new SparkConf().setAppName("StatefulWordCount").setMaster("local[2]")

          val sc = new SparkContext(conf)

       

          // 創建StreamingContextSpark Steaming運行時間間隔為5

          val ssc = new StreamingContext(sc, Seconds(5))

          // 定義checkpoint目錄為當前目錄

          ssc.checkpoint(".")

       

          // 獲取從Socket發送過來數據

          val lines = ssc.socketTextStream(args(0), args(1).toInt)

          val words = lines.flatMap(_.split(","))

          val wordCounts = words.map(x => (x, 1))

       

          // 使用updateStateByKey來更新狀態,統計從運行開始以來單詞總的次數

          val stateDstream = wordCounts.updateStateByKey[Int](updateFunc)

          stateDstream.print()

          ssc.start()

          ssc.awaitTermination()

        }

      }

      clip_image052

      1.5.3 運行代碼

      第一步   啟動流數據模擬器

      啟動4.1打包好的流數據模擬器,在該實例中將定時發送/home/hadoop/upload/class7目錄下的people.txt數據文件(該文件可以在本系列配套資源目錄/data/class7中找到),其中people.txt數據內容如下:

      clip_image026[1]

      模擬器Socket端口號為9999,頻度為1

      $cd /app/hadoop/spark-1.1.0

      $java -cp LearnSpark.jar class7.StreamingSimulation /home/hadoop/upload/class7/people.txt 9999 1000

      clip_image028[1]

      在沒有程序連接時,該程序處于阻塞狀態,IDEA中運行Streaming程序

      IDEA中運行該實例,該實例需要配置連接Socket主機名和端口號,在這里配置參數機器名為hadoop1和端口號為9999

      clip_image054

      1.5.4 查看結果

      第一步   IDEA運行情況

      IDEA的運行窗口中,可以觀察到第一次運行統計單詞總數為1,第二次為6,第N次為5(N-1)+1,即統計單詞的總數為程序運行單詞數總和。

      clip_image056

      第二步   在監控頁面觀察執行情況

      webUI上監控作業運行情況,可以觀察到每5秒運行一次作業

      clip_image058

      第三步   查看CheckPoint情況

      在項目根目錄下可以看到checkpoint文件

      clip_image060

      1.6 實例5Window演示

      1.6.1 演示說明

      該實例為Spark Streaming窗口操作,模擬數據由4.1流數據模擬以1秒的頻度發送,Spark Streaming通過Socket接收流數據并每10秒運行一次用來處理接收到數據,處理完畢后打印程序啟動后單詞出現的頻度。相比前面的實例,Spark Streaming窗口統計是通過reduceByKeyAndWindow()方法實現的,在該方法中需要指定窗口時間長度和滑動時間間隔。

      1.6.2 演示代碼

      import org.apache.log4j.{Level, Logger}

      import org.apache.spark.{SparkContext, SparkConf}

      import org.apache.spark.storage.StorageLevel

      import org.apache.spark.streaming._

      import org.apache.spark.streaming.StreamingContext._

       

      object WindowWordCount {

        def main(args: Array[String]) {

          if (args.length != 4) {

            System.err.println("Usage: WindowWorldCount <filename> <port> <windowDuration> <slideDuration>")

            System.exit(1)

          }

          Logger.getLogger("org.apache.spark").setLevel(Level.ERROR)

          Logger.getLogger("org.eclipse.jetty.server").setLevel(Level.OFF)

       

          val conf = new SparkConf().setAppName("WindowWordCount").setMaster("local[2]")

          val sc = new SparkContext(conf)

       

           // 創建StreamingContext

          val ssc = new StreamingContext(sc, Seconds(5))

           // 定義checkpoint目錄為當前目錄

          ssc.checkpoint(".")

       

          // 通過Socket獲取數據,該處需要提供Socket的主機名和端口號,數據保存在內存和硬盤中

          val lines = ssc.socketTextStream(args(0), args(1).toInt, StorageLevel.MEMORY_ONLY_SER)

          val words = lines.flatMap(_.split(","))

       

          // windows操作,第一種方式為疊加處理,第二種方式為增量處理

          val wordCounts = words.map(x => (x , 1)).reduceByKeyAndWindow((a:Int,b:Int) => (a + b), Seconds(args(2).toInt), Seconds(args(3).toInt))

      //val wordCounts = words.map(x => (x , 1)).reduceByKeyAndWindow(_+_, _-_,Seconds(args(2).toInt), Seconds(args(3).toInt))

       

          wordCounts.print()

          ssc.start()

          ssc.awaitTermination()

        }

      }

      clip_image062

      1.6.3 運行代碼

      第一步   啟動流數據模擬器

      啟動4.1打包好的流數據模擬器,在該實例中將定時發送/home/hadoop/upload/class7目錄下的people.txt數據文件(該文件可以在本系列配套資源目錄/data/class7中找到),其中people.txt數據內容如下:

      clip_image026[2]

      模擬器Socket端口號為9999,頻度為1

      $cd /app/hadoop/spark-1.1.0

      $java -cp LearnSpark.jar class7.StreamingSimulation /home/hadoop/upload/class7/people.txt 9999 1000

      clip_image028[2]

      在沒有程序連接時,該程序處于阻塞狀態,IDEA中運行Streaming程序

      IDEA中運行該實例,該實例需要配置連接Socket主機名和端口號,在這里配置參數機器名為hadoop1、端口號為9999、時間窗口為30秒和滑動時間間隔10

      clip_image064

      1.6.4 查看結果

      第一步   IDEA運行情況

      IDEA的運行窗口中,可以觀察到第一次運行統計單詞總數為4,第二次為14,第N次為10(N-1)+4,即統計單詞的總數為程序運行單詞數總和。

      clip_image066

      第二步   在監控頁面觀察執行情況

      webUI上監控作業運行情況,可以觀察到每10秒運行一次作業

      clip_image068

       

      posted @ 2015-09-07 07:01  shishanyuan  閱讀(27666)  評論(14)    收藏  舉報
      主站蜘蛛池模板: 国产成人无码免费视频在线| 丰满大爆乳波霸奶| 美女黄网站人色视频免费国产| 玩两个丰满老熟女久久网 | 日韩激情无码免费毛片| 成人区人妻精品一区二区| 日韩人妻无码精品久久| 18禁国产一区二区三区| 另类 专区 欧美 制服| 亚洲区色欧美另类图片| 国产一区二区黄色激情片| 国内少妇偷人精品免费| 成人无码视频97免费| 在线播放国产女同闺蜜| 奇米四色7777中文字幕| 午夜免费啪视频| 特级精品毛片免费观看| 国产精品欧美福利久久| 亚洲欧美牲交| 好紧好滑好湿好爽免费视频| 香港特级三A毛片免费观看| 在线观看亚洲精品国产| 国产在线中文字幕精品| 日韩内射美女人妻一区二区三区 | 噜噜综合亚洲av中文无码| 久草热大美女黄色片免费看| 亚洲综合无码一区二区| 欧美牲交a欧美牲交aⅴ免费| 国产精品天干天干综合网| 男女做aj视频免费的网站| 被灌满精子的少妇视频| 99在线精品免费视频九九视| 久久久噜噜噜久久| 少妇人妻偷人精品视频| 亚洲一区二区三区水蜜桃 | 婺源县| 天堂а√在线地址中文在线| 国产特级毛片AAAAAA视频| 蜜臀精品视频一区二区三区| 99精品人妻少妇一区| 国产午夜精品福利免费看|