位置:首页 > 九章学会Hive - Hive的调优及方法 >

九章学会Hive - Hive的调优及方法

作者:小牛君|发布时间:2017-06-16

小牛学堂的课程大纲最近进行了再一次升级,并且同时推出Java大数据平台开发班、Python爬虫与数据挖掘班、Spark项目班、Spark大神班、机器学习算法实战班、BI数据分析实战班, 目前这类人群凤毛麟角,导致这个行业的平均薪资极高,为此小牛学堂集合了行业的诸多大牛开设对应班级,为想学习的同学提供机会!
如果想了解详细情况,请联系 今日值班讲师 或者直接加入千人QQ群进行咨询:210992946

以下是本文正文:


1.  hive调优

explain

explain select sum(salary) from t_salary;

首先,查询计划会打印出抽象的语法书。

它表明 Hive 是如何将查询解析成 token(符号) literal(字面值)的。

STAGE PLANS:

    Stage: Stage-1

      Map Reduce

        Map Operator Tree:

            TableScan

            alias: t_salary

            Statistics: Num rows: 32 Data   size: 129 Basic stats: COMPLETE Column stats: NONE

            Select Operator

              expressions: salary (type:   float)

              outputColumnNames: salary

              Statistics: Num rows: 32 Data   size: 129 Basic stats: COMPLETE Column stats: NONE

              Group By Operator

                aggregations: sum(salary)

                mode: hash

                outputColumnNames: _col0

                Statistics: Num rows: 1 Data   size: 8 Basic stats: COMPLETE Column stats: NONE

                Reduce Output Operator

                  sort order:

                  Statistics: Num rows: 1   Data size: 8 Basic stats: COMPLETE Column stats: NONE

                  value expressions: _col0   (type: double)

        Reduce Operator Tree:

          Group By Operator

            aggregations: sum(VALUE._col0)

            mode: mergepartial

            outputColumnNames: _col0

            Statistics: Num rows: 1 Data size: 8 Basic stats: COMPLETE Column   stats: NONE

            File Output Operator

            compressed: false

            Statistics: Num rows: 1 Data   size: 8 Basic stats: COMPLETE Column stats: NONE

            table:

                input format:   org.apache.hadoop.mapred.TextInputFormat

                output format:   org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat

                serde:   org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe

 

    Stage: Stage-0

      Fetch Operator

        limit: -1

        Processor Tree:

          ListSink

 

explain extended select sum(salary) from t_salary;

Explain

ABSTRACT SYNTAX TREE:

 

TOK_QUERY

     TOK_FROM

        TOK_TABREF

           TOK_TABNAME

            t_salary

     TOK_INSERT

        TOK_DESTINATION

           TOK_DIR

            TOK_TMP_FILE

        TOK_SELECT

           TOK_SELEXPR

            TOK_FUNCTION

               sum

               TOK_TABLE_OR_COL

                  salary

 

 

STAGE DEPENDENCIES:

    Stage-1 is a root stage

    Stage-0 depends on stages: Stage-1

 

STAGE PLANS:

    Stage: Stage-1

      Map Reduce

        Map Operator Tree:

            TableScan

            alias: t_salary

            Statistics: Num rows: 32 Data   size: 129 Basic stats: COMPLETE Column stats: NONE

            GatherStats: false

            Select Operator

              expressions: salary (type:   float)

              outputColumnNames: salary

              Statistics: Num rows: 32 Data   size: 129 Basic stats: COMPLETE Column stats: NONE

              Group By Operator

                aggregations: sum(salary)

                mode: hash

                outputColumnNames: _col0

                Statistics: Num rows: 1 Data   size: 8 Basic stats: COMPLETE Column stats: NONE

                Reduce Output Operator

                  sort order:

                  Statistics: Num rows: 1   Data size: 8 Basic stats: COMPLETE Column stats: NONE

                  tag: -1

                  value expressions: _col0   (type: double)

                  auto parallelism: false

        Path -> Alias:

          hdfs://mini1:9000/user/hive/warehouse/test.db/t_salary [t_salary]

        Path -> Partition:

          hdfs://mini1:9000/user/hive/warehouse/test.db/t_salary

            Partition

            base file name: t_salary

            input format:   org.apache.hadoop.mapred.TextInputFormat

            output format:   org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat

            properties:

              COLUMN_STATS_ACCURATE true

              bucket_count -1

              columns id,empid,salary

              columns.comments   '??ID','??ID','????'

              columns.types int:int:float

                comment ?????

              field.delim

              file.inputformat   org.apache.hadoop.mapred.TextInputFormat

              file.outputformat   org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat

              location   hdfs://mini1:9000/user/hive/warehouse/test.db/t_salary

              name test.t_salary

              numFiles 1

              numRows 0

              rawDataSize 0

              serialization.ddl struct   t_salary { i32 id, i32 empid, float salary}

              serialization.format

              serialization.lib   org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe

              totalSize 129

              transient_lastDdlTime   1490706218

            serde:   org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe

           

              input format:   org.apache.hadoop.mapred.TextInputFormat

              output format:   org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat

              properties:

                COLUMN_STATS_ACCURATE true

                bucket_count -1

                columns id,empid,salary

                columns.comments   '??ID','??ID','????'

                columns.types int:int:float

                comment ?????

                field.delim

                file.inputformat   org.apache.hadoop.mapred.TextInputFormat

                file.outputformat   org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat

                location   hdfs://mini1:9000/user/hive/warehouse/test.db/t_salary

                name test.t_salary

                numFiles 1

                numRows 0

                rawDataSize 0

                serialization.ddl struct   t_salary { i32 id, i32 empid, float salary}

                serialization.format

                serialization.lib   org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe

                totalSize 129

                transient_lastDdlTime   1490706218

              serde:   org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe

              name: test.t_salary

            name: test.t_salary

        Truncated Path -> Alias:

          /test.db/t_salary [t_salary]

        Needs Tagging: false

        Reduce Operator Tree:

          Group By Operator

            aggregations: sum(VALUE._col0)

            mode: mergepartial

            outputColumnNames: _col0

            Statistics: Num rows: 1 Data size: 8 Basic stats: COMPLETE Column   stats: NONE

            File Output Operator

            compressed: false

            GlobalTableId: 0

            directory: hdfs://mini1:9000/tmp/hive/root/a3799134-d5d0-49fc-a840-9a0c4dbd90c8

/hive_2017-04-03_17-29-49_558_6616053246494828033-1/-mr-10000

/.hive-staging_hive_2017-04-03_17-29-49_558_6616053246494828033-1/-ext-10001

            NumFilesPerFileSink: 1

            Statistics: Num rows: 1 Data   size: 8 Basic stats: COMPLETE Column stats: NONE

            Stats Publishing Key Prefix:   hdfs://mini1:9000/tmp/hive/root/a3799134-d5d0-49fc-a840-9a0c4dbd90c8

/hive_2017-04-03_17-29-49_558_6616053246494828033-1/-mr-10000

/.hive-staging_hive_2017-04-03_17-29-49_558_6616053246494828033-1/-ext-10001/

            table:

                input format:   org.apache.hadoop.mapred.TextInputFormat

                output format:   org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat

                properties:

                  columns _col0

                  columns.types double

                  escape.delim \

                    hive.serialization.extend.additional.nesting.levels true

                  serialization.format 1

                  serialization.lib   org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe

                serde:   org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe

            TotalFiles: 1

            GatherStats: false

            MultiFileSpray: false

 

  Stage:   Stage-0

      Fetch Operator

        limit: -1

        Processor Tree:

          ListSink

 

 

 

Map 端聚合

Map 端部分聚合,相当于 Combiner

set hive.map.aggr=true;

 

hive.groupby.mapaggr.checkinterval:在Map端进行聚合操作的条目数目

 

hive.groupby.skewindata=true;

数据倾斜时负载均衡,当选项设定为 true,生成的查询计划会有两个 MR Job

第一个 MR Job 中,Map 的输出结果集合会随机分布到 Reduce 中,每个 Reduce做部分聚合操作,并输出结果

这样处理的结果是相同的 Group By Key 有可能被分发到不同的 Reduce 中,从而达到负载均衡的目的;

第二个 MR Job 再根据预处理的数据结果按照 Group By Key 分布到 Reduce

(这个过程可以保证相同的 Group By Key被分布到同一个 Reduce 中),最后完成最终的聚合操作。

 

开启压缩

任务中间压缩

set hive.exec.compress.intermediate=true;

set hive.intermediate.compression.codec=org.apache.hadoop.io.compress.SnappyCodec;(常用)

set hive.intermediate.compression.type=BLOCK;

<property>

    <name>hive.exec.compress.intermediate</name>

    <value>true</value>

</property>

<property>

    <name>hive.intermediate.compression.codec</name>

    <value>org.apache.hadoop.io.compress.GzipCodec<value/>

</property>

<property>

    <name>hive.intermediate.compression.type</name>

    <value>BLOCK<value/>

</property>

最终结果压缩(compress.type--针对SequenceFile而言)

set hive.exec.compress.output=true;

<property>

    <name>hive.exec.compress.output</name>

    <value>false</value>

</property>

api

set mapred.output.compression.codec=org.apache.hadoop.io.compress.GzipCodec;

set mapred.output.compression.type=BLOCK;

hadoopapi

set mapreduce.output.fileoutputformat.compress=true;

set mapreduce.output.fileoutputformat.compress.type=BLOCK;

set mapreduce.output.fileoutputformat.compress.codec=org.apache.hadoop.io.compress.GzipCodec;

必须设置如下参数,决定压缩是否生效

set hive.exec.compress.output=true;

 

操作例子:

set hive.exec.compress.output=true;

set   mapreduce.output.fileoutputformat.compress.type=BLOCK;

set mapreduce.output.fileoutputformat.compress.codec=org.apache.hadoop.io.compress.GzipCodec;

 

create table seqt

stored as sequencefile

as select * from access;

 

insert overwrite table seqt select * from   access;

 

dfs -ls /user/hive/warehouse/seqt ;

dfs -cat /user/hive/warehouse/seqt/000000_0;

dfs -text   /user/hive/warehouse/codectest/000000_0.gz;

 

 

create table t1

   stored as orc

   as

   select * from t3;

 

 

 

 

开启jvm重用

set mapreduce.job.jvm.numtasks=10;

<property>

  <name>mapreduce.job.jvm.numtasks</name>

  <value>1</value>

  <description>How many tasks to run per jvm. If set to -1, there is no limit. </description>

</property>

 

开启并行执行hive job

set hive.exec.parallel=true;

set hive.exec.parallel.thread.number=8

 

关闭推测执行

set mapreduce.map.speculati=false;

set mapreduce.reduce.speculati=false;

 

mapreduce.map.speculative:如果为trueMap Task可以推测执行,即一个Map Task可以启动Speculative Task运行并行执行,该Speculative Task与原始Task同时处理同一份数据,谁先处理完,则将谁的结果作为最终结果。默认为true

mapreduce.reduce.speculative:同上,默认值为true

mapreduce.job.speculative.speculative-cap-running-tasks:能够推测重跑正在运行任务(单个JOB)的百分之几,默认是:0.1

mapreduce.job.speculative.speculative-cap-total-tasks:能够推测重跑全部任务(单个JOB)的百分之几,默认是:0.01

mapreduce.job.speculative.minimum-allowed-tasks:可以推测重新执行允许的最小任务数。默认是:10

首先,mapreduce.job.speculative.minimum-allowed-tasksmapreduce.job.speculative.speculative-cap-total-tasks * 总任务数,取最大值。

然后,拿到上一步的值和mapreduce.job.speculative.speculative-cap-running-tasks * 正在运行的任务数,取最大值,该值就是猜测执行的运行的任务数

mapreduce.job.speculative.retry-after-no-speculate:等待时间(毫秒)做下一轮的猜测,如果没有任务,推测在这一轮。默认:1000ms

mapreduce.job.speculative.retry-after-speculate:等待时间(毫秒)做下一轮的猜测,如果有任务推测在这一轮。默认:15000ms

mapreduce.job.speculative.slowtaskthreshold:标准差,任务的平均进展率必须低于所有正在运行任务的平均值才会被认为是太慢的任务,默认值:1.0

 

 

 

当多个GROUP BY语句有相同的分组列,则会优化为一个MR任务

set hive.multigroupby.singlemar=true;

 

 

虚拟列

INPUT__FILE__NAME map任务读入File的全路径

BLOCK__OFFSET__INSIDE__FILE

  RCFileSequenceFile显示Block file Offset(当前块在文件中的偏移量)

  TextFile,显示当前行在文件中的偏移量

ROW__OFFSET__INSIDE__BLOCK

  RCFileSequenceFile显示row number

  textfile显示为0

注:若要显示 ROW__OFFSET__INSIDE__BLOCK ,必须进行如下设置

set hive.exec.rowoffset=true;

 

例:

select INPUT__FILE__NAME, BLOCK__OFFSET__INSIDE__FILE,ROW__OFFSET__INSIDE__BLOCK,salary from t_salary;

显示如下结果:

input__file__name       block__offset__inside__file     row__offset__inside__block      salary

hdfs://mini1:9000/user/hive/warehouse/test.db/t_salary/salary.txt       0       0       5500.0

hdfs://mini1:9000/user/hive/warehouse/test.db/t_salary/salary.txt       11      0       4500.0

hdfs://mini1:9000/user/hive/warehouse/test.db/t_salary/salary.txt       22      0       1900.0

hdfs://mini1:9000/user/hive/warehouse/test.db/t_salary/salary.txt       33      0       4800.0

hdfs://mini1:9000/user/hive/warehouse/test.db/t_salary/salary.txt       44      0       6500.0

hdfs://mini1:9000/user/hive/warehouse/test.db/t_salary/salary.txt       55      0       14500.0

hdfs://mini1:9000/user/hive/warehouse/test.db/t_salary/salary.txt       67      0       44500.0

hdfs://mini1:9000/user/hive/warehouse/test.db/t_salary/salary.txt       79      0       5000.0

hdfs://mini1:9000/user/hive/warehouse/test.db/t_salary/salary.txt       91      0       2900.0

hdfs://mini1:9000/user/hive/warehouse/test.db/t_salary/salary.txt       103     0       6500.0

hdfs://mini1:9000/user/hive/warehouse/test.db/t_salary/salary.txt       116     0       6000.0

 

 

本地模式

set hive.exec.mode.local.auto=true;

 

1.job的输入数据大小必须小于参数:hive.exec.mode.local.auto.inputbytes.max(默认128MB)

2.jobmap数必须小于参数:hive.exec.mode.local.auto.tasks.max(默认4)

3.jobreduce数必须为0或者1

hive.exec.mode.local.auto.inputbytes.max=134217728

hive.exec.mode.local.auto.tasks.max=4

hive.exec.mode.local.auto=true;

hive.mapred.local.mem:本地模式启动的JVM内存大小

 

动态分区

开启动态分区支持

set hive.exec.dynamic.partition=true;

动态分区的模式,默认strict,表示必须指定至少一个分区为静态分区,nonstrict模式表示允许所有的分区字段都可以使用动态分区。

set hive.exec.dynamic.partition.mode=nonstrict;

允许所有的分区列都是动态分区列

set hive.exec.dynamic.partition.mode=nostrict;

在每个执行MR的节点上,最大可以创建多少个动态分区,如果超过了这个数量就会报错

hive.exec.max.dynamic.partitions.pernode (缺省值100):

在所有执行MR的节点上,最大一共可以创建多少个动态分区

hive.exec.max.dynamic.partitions (缺省值1000):

整个MR Job中,最大可以创建多少个HDFS文件

hive.exec.max.created.files (缺省值100000):

 

DATANODEdfs.datanode.max.xceivers=8192:允许DATANODE打开多少个文件

 

关系型数据库(如Oracle)中,对分区表Insert数据时候,数据库自动会根据分区字段的值,将数据插入到相应的分区中,Hive中也提供了类似的机制,即动态分区(Dynamic Partition),只不过,使用Hive的动态分区,需要进行相应的配置。

先看一个应用场景,源表t_lxw1234的数据如下:

1.     

2.  SELECT day,url FROM t_lxw1234;

3.  2015-05-10      url1

4.  2015-05-10      url2

5.  2015-06-14      url1

6.  2015-06-14      url2

7.  2015-06-15      url1

8.  2015-06-15      url2

9.  ……

 目标表为:

1.     

2.  CREATE TABLE t_lxw1234_partitioned (

3.  url STRING

4.  ) PARTITIONED BY (month STRING,day STRING)

5.  stored AS textfile;

6.   

需求t_lxw1234中的数据按照时间(day),插入到目标表t_lxw1234_partitioned的相应分区中。

如果按照之前介绍的往指定一个分区中Insert数据,那么这个需求很不容易实现。

这时候就需要使用动态分区来实现,使用动态分区需要注意设定以下参数:

·    hive.exec.dynamic.partition

默认值:false

是否开启动态分区功能,默认false关闭。

使用动态分区时候,该参数必须设置成true;

·    hive.exec.dynamic.partition.mode

默认值:strict

动态分区的模式,默认strict,表示必须指定至少一个分区为静态分区,nonstrict模式表示允许所有的分区字段都可以使用动态分区。

一般需要设置为nonstrict

·    hive.exec.max.dynamic.partitions.pernode

默认值:100

在每个执行MR的节点上,最大可以创建多少个动态分区。

该参数需要根据实际的数据来设定。

比如:源数据中包含了一年的数据,即day字段有365个值,那么该参数就需要设置成大于365,如果使用默认值100,则会报错。

·    hive.exec.max.dynamic.partitions

默认值:1000

在所有执行MR的节点上,最大一共可以创建多少个动态分区。

同上参数解释。

·    hive.exec.max.created.files

默认值:100000

整个MR Job中,最大可以创建多少个HDFS文件。

一般默认值足够了,除非你的数据量非常大,需要创建的文件数大于100000,可根据实际情况加以调整。

·    hive.error.on.empty.partition

默认值:false

当有空分区生成时,是否抛出异常。

一般不需要设置。

那么,上面的需求可以使用如下的语句来完成:

1.     

2.  SET hive.exec.dynamic.partition=true; 

3.  SET hive.exec.dynamic.partition.mode=nonstrict;

4.  SET hive.exec.max.dynamic.partitions.pernode = 1000;

5.  SET hive.exec.max.dynamic.partitions=1000;

6.   

7.  INSERT overwrite TABLE t_lxw1234_partitioned PARTITION (month,day)

8.  SELECT url,substr(day,1,7) AS month,day

9.  FROM t_lxw1234;

10.          

注意:在PARTITION (month,day)中指定分区字段名即可;

在SELECT子句的最后两个字段,必须对应前面PARTITION (month,day)中指定的分区字段,包括顺序。

执行结果如下:

Loading data to table liuxiaowen.t_lxw1234_partitioned partition (month=null, day=null)

Loading partition {month=2015-05, day=2015-05-10}

Loading partition {month=2015-06, day=2015-06-14}

Loading partition {month=2015-06, day=2015-06-15}

Partition liuxiaowen.t_lxw1234_partitioned{month=2015-05, day=2015-05-10} stats: [numFiles=1, numRows=2, totalSize=10, rawDataSize=8]

Partition liuxiaowen.t_lxw1234_partitioned{month=2015-06, day=2015-06-14} stats: [numFiles=1, numRows=2, totalSize=10, rawDataSize=8]

Partition liuxiaowen.t_lxw1234_partitioned{month=2015-06, day=2015-06-15} stats: [numFiles=1, numRows=2, totalSize=10, rawDataSize=8]

查看目标表有哪些分区:

hive> show partitions t_lxw1234_partitioned;

OK

month=2015-05/day=2015-05-10

month=2015-06/day=2015-06-14

month=2015-06/day=2015-06-15

 

合并小文件

hive.merg.mapfiles=true:合并map输出

hive.merge.mapredfiles=false:合并reduce输出

hive.merge.size.per.task=256*1000*1000:合并文件的大小

hive.mergejob.maponly=true:如果支持CombineHiveInputFormat则生成只有Map的任务执行merge

hive.merge.smallfiles.avgsize=16000000:文件的平均大小小于该值时,会启动一个MR任务执行merge

 

 

控制 Reduce Task  的数量

参数  mapred.reduce.tasks(默认为-1

如果是负数,那么推测 reducer 任务数量。

参数1  hive.exec.reducers.bytes.per.reducer

决定每个 reducer task可以处理的最大数据量,默认是 256MB

参数2  hive.exec.reducers.max

决定了最大reducers任务数量,默认是1009.

 

reducer 数量计算公式

N=min(reducers.max,总输入数据量


了解更多详情请联系 今日值班讲师 或者直接加入千人QQ群进行咨询:210992946

分享到: