<output id="qn6qe"></output>

    1. <output id="qn6qe"><tt id="qn6qe"></tt></output>
    2. <strike id="qn6qe"></strike>

      亚洲 日本 欧洲 欧美 视频,日韩中文字幕有码av,一本一道av中文字幕无码,国产线播放免费人成视频播放,人妻少妇偷人无码视频,日夜啪啪一区二区三区,国产尤物精品自在拍视频首页,久热这里只有精品12

      Spark入門實戰系列--6.SparkSQL(中)--深入了解SparkSQL運行計劃及調優

      【注】該系列文章以及使用到安裝包/測試數據 可以在《傾情大奉送--Spark入門實戰系列》獲取

      1.1  運行環境說明

      1.1.1 硬軟件環境

      l  主機操作系統:Windows 64位,雙核4線程,主頻2.2G10G內存

      l  虛擬軟件:VMware? Workstation 9.0.0 build-812388

      l  虛擬機操作系統:CentOS6.5 64位,單核

      l  虛擬機運行環境:

      ?  JDK1.7.0_55 64

      ?  Hadoop2.2.0(需要編譯為64位)

      ?  Scala2.10.4

      ?  Spark1.1.0(需要編譯)

      ?  Hive0.13.1(源代碼編譯,參見1.2

      1.1.2 集群網絡環境

      本次實驗環境只需要hadoop1一臺機器即可,網絡環境配置如下:

      序號

      IP地址

      機器名

      類型

      用戶名

      目錄

      1

      192.168.0.61

      hadoop1

      NN/DN

      hadoop

      /app 程序所在路徑

      /app/scala-...

      /app/hadoop

      /app/complied

      1.2 編譯Hive

      1.2.1 下載Hive源代碼包

      這里選擇下載的版本為hive-0.13.1,這個版本需要到apache的歸檔服務器下載,下載地址:http://archive.apache.org/dist/hive/hive-0.13.1/,選擇apache-hive-0.13.1-src.tar.gz文件進行下載:

      clip_image002

      1.2.2 上傳Hive源代碼包

      把下載的hive-0.13.0.tar.gz安裝包,使用SSH Secure File Transfer工具(參見第2課《Spark編譯與部署(上)--基礎環境搭建》1.3.1介紹)上傳到/home/hadoop/upload 目錄下。

      1.2.3 解壓縮并移動到編譯目錄

      到上傳目錄下,用如下命令解壓縮hive安裝文件:

      $cd /home/hadoop/upload

      $tar -zxf apache-hive-0.13.1-src.tar.gz

      改名并移動到/app/complied目錄下:

      $sudo mv apache-hive-0.13.1-src /app/complied/hive-0.13.1-src

      $ll /app/complied

      1.2.4 編譯Hive

      編譯Hive源代碼的時候,需要從網上下載依賴包,所以整個編譯過程機器必須保證在聯網狀態。編譯執行如下腳本:

      $cd /app/complied/hive-0.13.1-src/

      $export MAVEN_OPTS="-Xmx2g -XX:MaxPermSize=512M -XX:ReservedCodeCacheSize=512m"

      $mvn -Phadoop-2,dist -Dmaven.test.skip=true clean package

      clip_image004

      在編譯過程中可能出現速度慢或者中斷,可以再次啟動編譯,編譯程序會在上次的編譯中斷處繼續進行編譯,整個編譯過程耗時與網速緊密相關,網速較快的情況需要1個小時左右(上圖的時間是多次編譯后最后成功的界面)。最終編譯的結果為$HIVE_HOME/packaging/target/apache-hive-0.13.1-bin.tar.gz

      通過如下命令查看最終編譯完成整個目錄大小,可以看到大小為353.6M左右

      $du -s /app/complied/hive-0.13.1-src

      clip_image006

      【注】已經編譯好的Hive包在本系列配套資源/install/6.hive-0.13.1-src.tar.gz,讀者直接使用

      1.3 首次運行hive-console

      1.3.1 獲取Spark源代碼

      由于首次運行hive-console需要在Spark源代碼進行編譯,關于Spark源代碼的獲取可以參考第二課《Spark編譯與部署(下)--Spark編譯安裝》方式進行獲取,連接地址為 http://spark.apache.org/downloads.html,獲取源代碼后把Spark源代碼移動到/app/complied目錄,并命名為spark-1.1.0-hive

      1.3.2 配置/etc/profile環境變量

      第一步   使用如下命令打開/etc/profile文件:

      $sudo vi /etc/profile

      第二步   設置如下參數:

      export HADOOP_HOME=/app/hadoop/hadoop-2.2.0

      export HIVE_HOME=/app/complied/hive-0.13.1-src

      export HIVE_DEV_HOME=/app/complied/hive-0.13.1-src

      clip_image008

      第三步   生效配置并驗證

      $sudo vi /etc/profile

      $echo $HIVE_DEV_HOME

      1.3.3 運行sbt進行編譯

      運行hive/console不需要啟動Spark,需要進入到Spark根目錄下使用sbt/sbt hive/console進行首次運行編譯,編譯以后下次可以直接啟動。編譯Spark源代碼的時候,需要從網上下載依賴包,所以整個編譯過程機器必須保證在聯網狀態。編譯命令如下:

      $cd /app/complied/spark-1.1.0-hive

      $sbt/sbt hive/console

      clip_image010

      編譯時間會很長,在編譯過程中可能出現速度慢或者中斷,可以再次啟動編譯,編譯程序會在上次的編譯中斷處繼續進行編譯,整個編譯過程耗時與網速緊密相關。

      clip_image012

      通過如下命令查看最終編譯完成整個目錄大小,可以看到大小為267.9M左右

      $du -s /app/complied/spark-1.1.0-hive

      clip_image014

      【注】已經編譯好的Spark for hive-console包在本系列配套資源/install/6.spark-1.1.0-hive.tar.gz,可直接使用

      1.4 使用hive-console

      1.4.1 啟動hive-console

      進入到spark根目錄下,使用如下命令啟動hive-console

      $cd /app/complied/spark-1.1.0-hive

      $sbt/sbt hive/console

      clip_image016

      1.4.2 輔助命令HelpTab

      可以使用:help查看幫助內容

      scala>:help

      clip_image018

      可以使用tab鍵查看所有可使用命令、函數

      clip_image020

      1.4.3 常用操作

      首先定義Person類,在該類中定義nameagestate三個列,然后把該類注冊為people表并裝載數據,最后通過查詢到數據存放到query

      scala>case class Person(name:String, age:Int, state:String)

      scala>sparkContext.parallelize(Person("Michael",29,"CA")::Person("Andy",30,"NY")::Person("Justin",19,"CA")::Person("Justin",25,"CA")::Nil).registerTempTable("people")

      clip_image022

      scala>val query= sql("select * from people")

      clip_image024

      1.4.3.1 查看查詢的schema

      scala>query.printSchema

      scala>query.collect()

      clip_image026

      1.4.3.2 查看查詢的整個運行計劃

      scala>query.queryExecution

      clip_image028

      1.4.4 查看查詢的Unresolved LogicalPlan

      scala>query.queryExecution.logical

      clip_image030

      1.4.4.1 查看查詢的Analyzed LogicalPlan

      scala>query.queryExecution.analyzed

      clip_image032

      1.4.4.2 查看優化后的LogicalPlan

      scala>query.queryExecution.optimizedPlan

      clip_image034

      1.4.4.3 查看物理計劃

      scala>query.queryExecution.sparkPlan

      clip_image036

      1.4.4.4 查看RDD的轉換過程

      scala>query.toDebugString

      clip_image038

      1.4.5 不同數據源的運行計劃

      上面常用操作里介紹了源自RDD的數據, SparkSQL也可以源自多個數據源:jsonFileparquetFileHive等。

      1.4.5.1 讀取Json格式數據

      第一步   Json測試數據

      Json文件支持嵌套表,SparkSQL也可以讀入嵌套表,如下面形式的Json數據,可以使用jsonFile讀入SparkSQL。該文件可以在配套資源/data/class6中找到,在以下測試中把文件放到 /home/hadoop/upload/class6 路徑中

      { 

         "fullname": "Sean Kelly",    

         "org": "SK Consulting",    

         "emailaddrs": [    

            {"type": "work", "value": "kelly@seankelly.biz"},    

            {"type": "home", "pref": 1, "value": "kelly@seankelly.tv"}    

         ],    

          "telephones": [    

            {"type": "work", "pref": 1, "value": "+1 214 555 1212"},    

            {"type": "fax", "value": "+1 214 555 1213"},    

            {"type": "mobile", "value": "+1 214 555 1214"}    

         ],    

         "addresses": [    

            {"type": "work", "format": "us",    

             "value": "1234 Main StnSpringfield, TX 78080-1216"},    

            {"type": "home", "format": "us",    

             "value": "5678 Main StnSpringfield, TX 78080-1316"}    

         ],    

          "urls": [    

            {"type": "work", "value": "http://seankelly.biz/"},    

            {"type": "home", "value": "http://seankelly.tv/"}    

         ]    

      }

      第二步   讀入Json數據

      使用jsonFile讀入數據并注冊成表jsonPerson,然后定義一個查詢jsonQuery

      scala>jsonFile("/home/hadoop/upload/class6/nestjson.json").registerTempTable("jsonPerson")

      scala>val jsonQuery = sql("select * from jsonPerson")

      clip_image040

      第三步   查看jsonQueryschema

      scala>jsonQuery.printSchema

      clip_image042

      第四步   查看jsonQuery的整個運行計劃

      scala>jsonQuery.queryExecution

      clip_image044

      1.4.5.2 讀取Parquet格式數據

      Parquet數據放在配套資源/data/class6/wiki_parquet中,在以下測試中把文件放到 /home/hadoop/upload/class6 路徑下

      第一步   讀入Parquet數據

      parquet文件讀入并注冊成表parquetWiki,然后定義一個查詢parquetQuery

      scala>parquetFile("/home/hadoop/upload/class6/wiki_parquet").registerTempTable("parquetWiki")

      scala>val parquetQuery = sql("select * from parquetWiki")

      clip_image046

      有報錯但不影響使用

      clip_image048

      第二步   查詢parquetQueryschema

      scala>parquetQuery.printSchema

      clip_image050

      第三步   查詢parquetQuery的整個運行計劃

      scala>parquetQuery.queryExecution

      clip_image052

      第四步   查詢取樣

      scala>parquetQuery.takeSample(false,10,2)

      clip_image054

      clip_image056

      1.4.5.3 讀取hive內置測試數據

      TestHive類中已經定義了大量的hive0.13的測試數據的表格式,如srcsales等等,在hive-console中可以直接使用;第一次使用的時候,hive-console會裝載一次。下面我們使用sales表看看其schema和整個運行計劃。

      第一步   讀入測試數據并定義一個查詢hiveQuery

      scala>val hiveQuery = sql("select * from sales")

      clip_image058

      第二步   查看hiveQueryschema

      scala>hiveQuery.printSchema

      clip_image060 

      第三步   查看hiveQuery的整個運行計劃

      scala>hiveQuery.queryExecution

      clip_image062

      第四步   其他SQL語句的運行計劃

      scala>val hiveQuery = sql("select * from (select * from src limit 5) a limit 3")

      clip_image064

      scala>val hiveQuery = sql("select * FROM (select * FROM src) a")

      clip_image066

      scala>hiveQuery.where('key === 100).queryExecution.toRdd.collect

      clip_image068

      1.4.6 不同查詢的運行計劃

      1.4.6.1 聚合查詢

      scala>sql("select name, age,state as location from people").queryExecution

      clip_image070

      scala>sql("select name from (select name,state as location from people) a where location='CA'").queryExecution

      clip_image072

      scala>sql("select sum(age) from people").queryExecution

      scala>sql("select sum(age) from people").toDebugString

      clip_image074

      clip_image076

      scala>sql("select state,avg(age) from people group by state").queryExecution

      scala>sql("select state,avg(age) from people group by state").toDebugString

      clip_image078

      clip_image080

       

      1.4.6.2 Join操作

      scala>sql("select a.name,b.name from people a join people b where a.name=b.name").queryExecution

      scala>sql("select a.name,b.name from people a join people b where a.name=b.name").toDebugString

      clip_image082

      clip_image084

      1.4.6.3 Distinct操作

      scala>sql("select distinct a.name,b.name from people a join people b where a.name=b.name").queryExecution

      scala>sql("select distinct a.name,b.name from people a join people b where a.name=b.name").toDebugString

      clip_image086

      clip_image088

      1.4.7 優化

      1.4.7.1 CombineFilters

      CombineFilters就是合并Filter,在含有多個Filter時發生,如下查詢:

      sql("select name from (select * from people where age >=19) a where a.age <30").queryExecution

      clip_image090

      上面的查詢,在Optimized的過程中,將age>=19age<30這兩個Filter合并了,合并成((age>=19) && (age<30))。其實上面還做了一個其他的優化,就是project的下推,子查詢使用了表的所有列,而主查詢使用了列name,在查詢數據的時候子查詢優化成只查列name

      1.4.7.2 PushPredicateThroughProject

      PushPredicateThroughProject就是project下推,和上面例子中的project一樣

      sql("select name from (select name,state as location from people) a where location='CA'").queryExecution

      clip_image092

      1.4.7.3 ConstantFolding

      ConstantFolding是常量疊加,用于表達式。如下面的例子:

      sql("select name,1+2 from people").queryExecution

      clip_image094

      2SparkSQL調優

      Spark是一個快速的內存計算框架,同時是一個并行運算的框架,在計算性能調優的時候,除了要考慮廣為人知的木桶原理外,還要考慮平行運算的Amdahl定理。

      木桶原理又稱短板理論,其核心思想是:一只木桶盛水的多少,并不取決于桶壁上最高的那塊木塊,而是取決于桶壁上最短的那塊。將這個理論應用到系統性能優化上,系統的最終性能取決于系統中性能表現最差的組件。例如,即使系統擁有充足的內存資源和CPU資源,但是如果磁盤I/O性能低下,那么系統的總體性能是取決于當前最慢的磁盤I/O速度,而不是當前最優越的CPU或者內存。在這種情況下,如果需要進一步提升系統性能,優化內存或者CPU資源是毫無用處的。只有提高磁盤I/O性能才能對系統的整體性能進行優化。

      clip_image096

      Amdahl定理,一個計算機科學界的經驗法則,因吉恩·阿姆達爾而得名。它代表了處理器平行運算之后效率提升的能力。并行計算中的加速比是用并行前的執行速度和并行后的執行速度之比來表示的,它表示了在并行化之后的效率提升情況。阿姆達爾定律是固定負載(計算總量不變時)時的量化標準。可用公式:clip_image098來表示。式中clip_image100分別表示問題規模的串行分量(問題中不能并行化的那一部分)和并行分量,p表示處理器數量。當clip_image102時,上式的極限是clip_image104,其中clip_image106。這意味著無論我們如何增大處理器數目,加速比是無法高于這個數的。

            SparkSQL作為Spark的一個組件,在調優的時候,也要充分考慮到上面的兩個原理,既要考慮如何充分的利用硬件資源,又要考慮如何利用好分布式系統的并行計算。由于測試環境條件有限,本篇不能做出更詳盡的實驗數據來說明,只能在理論上加以說明。

      2.1 并行性

      SparkSQL在集群中運行,將一個查詢任務分解成大量的Task分配給集群中的各個節點來運行。通常情況下,Task的數量是大于集群的并行度。比如前面第六章和第七章查詢數據時,shuffle的時候使用了缺省的spark.sql.shuffle.partitions,即200partition,也就是200Task

      clip_image108

      而實驗的集群環境卻只能并行3Task,也就是說同時只能有3Task保持Running

      clip_image110

      這時大家就應該明白了,要跑完這200Task就要跑200/3=67批次。如何減少運行的批次呢?那就要盡量提高查詢任務的并行度。查詢任務的并行度由兩方面決定:集群的處理能力和集群的有效處理能力。

      l對于Spark Standalone集群來說,集群的處理能力是由conf/spark-env中的SPARK_WORKER_INSTANCES參數、SPARK_WORKER_CORES參數決定的;而SPARK_WORKER_INSTANCES*SPARK_WORKER_CORES不能超過物理機器的實際CPU core

      l集群的有效處理能力是指集群中空閑的集群資源,一般是指使用spark-submitspark-shell時指定的--total-executor-cores,一般情況下,我們不需要指定,這時候,Spark Standalone集群會將所有空閑的core分配給查詢,并且在Task輪詢運行過程中,Standalone集群會將其他spark應用程序運行完后空閑出來的core也分配給正在運行中的查詢。

      綜上所述,SparkSQL的查詢并行度主要和集群的core數量相關,合理配置每個節點的core可以提高集群的并行度,提高查詢的效率。

      2.2 高效的數據格式

      高效的數據格式,一方面是加快了數據的讀入速度,另一方面可以減少內存的消耗。高效的數據格式包括多個方面:

      2.2.1 數據本地性

      分布式計算系統的精粹在于移動計算而非移動數據,但是在實際的計算過程中,總存在著移動數據的情況,除非是在集群的所有節點上都保存數據的副本。移動數據,將數據從一個節點移動到另一個節點進行計算,不但消耗了網絡IO,也消耗了磁盤IO,降低了整個計算的效率。為了提高數據的本地性,除了優化算法(也就是修改spark內存,難度有點高),就是合理設置數據的副本。設置數據的副本,這需要通過配置參數并長期觀察運行狀態才能獲取的一個經驗值。

      下面是Spark webUI監控Stage的一個圖:

      lPROCESS_LOCAL是指讀取緩存在本地節點的數據

      lNODE_LOCAL是指讀取本地節點硬盤數據

      lANY是指讀取非本地節點數據

      l通常讀取數據PROCESS_LOCAL>NODE_LOCAL>ANY,盡量使數據以PROCESS_LOCALNODE_LOCAL方式讀取。其中PROCESS_LOCAL還和cache有關。

      clip_image112

       

      2.2.2 合適的數據類型

      對于要查詢的數據,定義合適的數據類型也是非常有必要。對于一個tinyint可以使用的數據列,不需要為了方便定義成int類型,一個tinyint的數據占用了1byte,而int占用了4byte。也就是說,一旦將這數據進行緩存的話,內存的消耗將增加數倍。在SparkSQL里,定義合適的數據類型可以節省有限的內存資源。

      2.2.3 合適的數據列

      對于要查詢的數據,在寫SQL語句的時候,盡量寫出要查詢的列名,如Select a,b from tbl,而不是使用Select * from tbl;這樣不但可以減少磁盤IO,也減少緩存時消耗的內存。

      2.2.4 優的數據存儲格式

      在查詢的時候,最終還是要讀取存儲在文件系統中的文件。采用更優的數據存儲格式,將有利于數據的讀取速度。查看SparkSQLStage,可以發現,很多時候,數據讀取消耗占有很大的比重。對于sqlContext來說,支持 textFiileSequenceFileParquetFilejsonFile;對于hiveContext來說,支持AvroFileORCFileParquet File,以及各種壓縮。根據自己的業務需求,測試并選擇合適的數據存儲格式將有利于提高SparkSQL的查詢效率。

      2.3 內存的使用

      spark應用程序最糾結的地方就是內存的使用了,也是最能體現“細節是魔鬼”的地方。Spark的內存配置項有不少,其中比較重要的幾個是:

      lSPARK_WORKER_MEMORY,在conf/spark-env.sh中配置SPARK_WORKER_MEMORY SPARK_WORKER_INSTANCES,可以充分的利用節點的內存資源,SPARK_WORKER_INSTANCES*SPARK_WORKER_MEMORY不要超過節點本身具備的內存容量;

      lexecutor-memory,在spark-shellspark-submit提交spark應用程序時申請使用的內存數量;不要超過節點的SPARK_WORKER_MEMORY

      lspark.storage.memoryFraction spark應用程序在所申請的內存資源中可用于cache的比例

      lspark.shuffle.memoryFraction spark應用程序在所申請的內存資源中可用于shuffle的比例

      在實際使用上,對于后兩個參數,可以根據常用查詢的內存消耗情況做適當的變更。另外,在SparkSQL使用上,有幾點建議:

      l對于頻繁使用的表或查詢才進行緩存,對于只使用一次的表不需要緩存;

      l對于join操作,優先緩存較小的表;

      l要多注意Stage的監控,多思考如何才能更多的Task使用PROCESS_LOCAL

      l要多注意Storage的監控,多思考如何才能Fraction cached的比例更多

      clip_image114

      2.4 合適的Task

      對于SparkSQL,還有一個比較重要的參數,就是shuffle時候的Task數量,通過spark.sql.shuffle.partitions來調節。調節的基礎是spark集群的處理能力和要處理的數據量,spark的默認值是200Task過多,會產生很多的任務啟動開銷,Task多少,每個Task的處理時間過長,容易straggle

      2.5 其他的一些建議

      優化的方面的內容很多,但大部分都是細節性的內容,下面就簡單地提提:

      l  想要獲取更好的表達式查詢速度,可以將spark.sql.codegen設置為Ture

      l  對于大數據集的計算結果,不要使用collect() ,collect()就結果返回給driver,很容易撐爆driver的內存;一般直接輸出到分布式文件系統中;

      l  對于Worker傾斜,設置spark.speculation=true 將持續不給力的節點去掉;

      l  對于數據傾斜,采用加入部分中間步驟,如聚合后cache,具體情況具體分析;

      l  適當的使用序化方案以及壓縮方案;

      l  善于利用集群監控系統,將集群的運行狀況維持在一個合理的、平穩的狀態;

      l  善于解決重點矛盾,多觀察Stage中的Task,查看最耗時的Task,查找原因并改善;

       

      posted @ 2015-08-27 06:59  shishanyuan  閱讀(18864)  評論(0)    收藏  舉報
      主站蜘蛛池模板: 日韩精品一区二区三区日韩| 精品尤物TV福利院在线网站| 日本一区二区三区专线| 国产va免费精品观看精品| 人妻少妇偷人无码视频| 欧美三级在线播放| 无码专区 人妻系列 在线| 少妇午夜啪爽嗷嗷叫视频| 免费观看性行为视频的网站| 成人动漫在线观看| 国产情侣激情在线对白| 女性| 国产一区二区在线观看粉嫩| 久久综合伊人77777| 久久亚洲精品中文字幕| 嫩b人妻精品一区二区三区| 亚洲色成人一区二区三区| 国产老妇伦国产熟女老妇高清| 亚洲国产成人无码电影| 97人妻中文字幕总站| 一区二区三区不卡国产| 国产成人剧情AV麻豆果冻| 强伦姧人妻免费无码电影| 人人妻人人狠人人爽天天综合网| 99久久国产综合精品成人影院| 欧美乱强伦xxxx孕妇| 国产精品视频一区不卡| 九九热精品在线观看| 亚洲国产美女精品久久久| 九九热精品免费在线视频| 久久精品欧美日韩精品| 国产自拍一区二区三区在线| 中文字幕无线码免费人妻| 亚洲av成人一区在线| 国产另类ts人妖一区二区| 99久久国产综合精品女同| 国产一区二区不卡在线| 成人性能视频在线| 国产精品美女久久久久久麻豆| 精品国产女同疯狂摩擦2| 国产精品亚洲二区在线播放|