
1. reduce 个数设置
方法1.调整hive.exec.reducers.bytes.per.reducer和hive.exec.reducers.max
hive.exec.reducers.bytes.per.reducer(每个reduce任务处理的数据量,默认为1G)
hive.exec.reducers.max(每个任务最大的reduce数,默认为999)
计算reducer数的公式:
reduce_number=min(hive.exec.reducers.maxinput_size/hive.exec.reducers.bytes.per.reducer)
推荐使用本方法控制reduce数量,因为数据量往往会变化,如果采用固定的reduce数量可能导致资源浪费或者oom等问题。
在设置reduce个数的时候需要考虑这两个原则:使大数据量利用合适的reduce数;使单个reduce任务处理合适的数据量;
hive> set hive.exec.reducers.bytes.per.reducer=500000000; (500M)
hive> select pt,count(1) from tmp_table where pt = '2012-07-04' group by pt;
分区大小占 9个G, 本次产生20个reduce
hive> set hive.exec.reducers.bytes.per.reducer=500000000; (500M)
hive> set hive.exec.reducers.max=15;
hive> select pt,count(1) from tmp_table where pt = '2012-07-04' group by pt;
本次产生15个reduce
方法2:调整mapreduce.job.reduces
该参数会直接生效,hive不再根据数据量估算reduce数量。
set mapred.reduce.tasks = 30;
reduce个数并不是越多越好;启动和初始化reduce也会消耗时间和资源;
另外,有多少个reduce,就会有多少个输出文件,如果生成了很多个小文件,那么如果这些小文件作为下一个任务的输入,则也会出现小文件过多的问题;
只有1个reduce的情况:
只有一个reduce任务的情况,除了数据量小于hive.exec.reducers.bytes.per.reducer参数值的情况外,还有以下原因,这些操作都是全局的,所以hadoop不得不用一个reduce去完成;
1.没有group by的汇总
select dt,count(1) from tmp_table where dt = '2012-07-04' group by dt;
写成
select count(1) from tmp_table where dt = '2012-07-04';
2.用了Order by
使用order by的SQL会进行全局排序,最后只有一个reduce
3.有笛卡尔积
笛卡尔积也会造成只产生一个reduce的情况
2.map个数设置:
默认map个数
default_num=total_size/block_size;
期望大小
goal_num=mapred.map.tasks;
设置处理的文件大小
split_size=max(mapred.min.split.size,block_size);
split_num=total_size/split_size;
计算的map个数
compute_map_num=min(split_num,max(default_num,goal_num))
1)如果想增加map个数,则设置mapred.map.tasks为一个较大的值。
2)如果想减小map个数,则设置mapred.min.split.size为一个较大的值。有如下两种情况:
情况1:输入文件size巨大,但不是小文件增大mapred.min.split.size的值。
情况2:输入文件数量巨大,且都是小文件,就是单个文件的size小于blockSize。
这种情况通过增大mapred.min.spllt.size不可行,
需要使用CombineFileInputFormat将多个input path合并成一个(合并小文件输入)
InputSplit送给mapper处理,从而减少mapper的数量。
set mapred.max.split.size=256000000; -- 决定每个map处理的最大的文件大小,单位为B
set mapred.min.split.size.per.node=128000000; -- 节点中可以处理的最小的文件大小
set mapred.min.split.size.per.rack=128000000; -- 机架中可以处理的最小的文件大小
set hive.input.format=org.apache.hadoop.hive.ql.io.CombineHiveInputFormat;
大于文件块大小256m的,按照256m来分隔,小于256m,大于128m的,按照128m来分隔,把那些小于128m的(包括小文件和分隔大文件剩下的)
3. JOIN数据倾斜
问题:JOIN的key中某一个value出现的次数过多,处理该value的Reducer需要处理太多数据,执行时间明显比其他Reducer长,从而使得整个Job执行时间延长。
解决方法:开启hive.optimize.skewjoin:
SET hive.optimize.skewjoin=true;
如果检测到有倾斜的key(出现次数超过hive.skewjoin.key,默认为100,000),就会对该倾斜key启动一个MapReduce Job,做Map Join;对其他非倾斜的key启动另一个Job做Join。
假设这个key只在表A中倾斜,在表B中不倾斜,则将表B放入内存中做Map Join,启动的Map数量由hive.skewjoin.mapjoin.map.tasks和hive.skewjoin.mapjoin.min.split共同决定。
开启hive.optimize.skewjoin后,会在运行时检测每个表中是否有倾斜(skewed)的key。如果没有倾斜的key,请勿设置这一参数,避免额外的扫描开销。
4.GROUP BY数据倾斜
问题:GROUP BY的key中某一个value出现的次数过多,处理该value的Reducer需要处理太多数据,执行时间明显比其他Reducer长,从而使得整个Job执行时间延长。
解决方法:开启hive.groupby.skewindata:
SET hive.groupby.skewindata=true;
设为true会多启动一个随机分配的Job,可减少倾斜情况下的Shuffle量。
需要注意的是,此参数设置为true后,会立即采用倾斜GROUP BY的执行策略,而不会像skew join那样先行判断是否满足倾斜条件,用户设置此参数的时候需要知道查询涉及表是否有数据倾斜,但不需要知道具体的倾斜key为多少。
如果没有数据倾斜,请保持该参数为false。
5.map join
Hive会自动判断是否满足Map Join条件,条件是Join表中有一个表小于hive.mapjoin.smalltable.filesize的值(默认为32,000,000)。
也可手动指定将哪个表放入内存,做Map Join:
SELECT /*+ MAPJOIN(things) */ sales.*, things.* FROM sales JOIN things ON (sales.id = things.id);
超过一定大小的表不适合放入内存,否则可能出现执行缓慢或OOM等情况。
Map Join对于outer join有限制:FULL OUTER JOIN无法使用Map Join;LEFT OUTER JOIN只有右边的表可以放入内存;RIGHT OUTER JOIN只有左边的表可以放入内存。
6.Join大表和小表的书写位置
应该将条目少的表/子查询放在Join操作符的左边。因为在Join操作的Reduce阶段,位于Join操作符左边的表的内容会被加载进内存,将条目少的表放在左边,可以有效减少发生内存溢出错误的几率。
7. Bucket Join
使用Bucket表是一种提高Join效率的方法。详见BucketJoin
创建Bucket表:
CREATE TABLE bucketed_users(id INT , name STRING) CLUSTERED BY (id) INTO 4 BUCKETS;
分为4个bucket意味着,会生成4个数据文件,相同的id都在同一个文件中。
Bucket表不能使用LOAD DATA。插入数据到Bucket表:
INSERT OVERWRITE TABLE bucketed_users SELECT * FROM users;
划分bucket的目的是为了更好的进行join,能很大程度上提高join效率,且不受限于大小表约束,但考虑到生成bucket会有一个MapReduce Job的代价,建议是如果某份数据需要被多次查询,可以考虑使用。
8.distinct和group by
hql中如果只有distinct而没有group by 的情况下,例如
SELECT COUNT(DISTINCT ip) FROM action__notice WHERE stat_date=20140516 AND stat_hour=2014051609;
job启动后,会使用1个reducer。在输入数据量非常大的情况下job运行非常慢,可以使用group by 改写如下:
SELECT COUNT(ip) AS TOTAL FROM (SELECT ip FROM action__notice WHERE stat_date=20140516 AND stat_hour=2014051609 GROUP BY ip) tmp;
9.map 与 reduce 内存设置
mapreduce.map.memory.mb=6144
mapreduce.reduce.memory.mb=10192
mapreduce.map.java.opts=-Xmx3072m -Xms2048m
mapreduce.reduce.java.opts=-Xmx6144m -Xms5120m
在遇到报错java.lang.OutOfMemoryError: GC overhead limit exceeded 或 Exception in thread "main" java.lang.OutOfMemoryError: Java heap space
可调节上述参数使用
10. shuffle参数
mapreduce.reduce.shuffle.parallelcopies=20
mapreduce.reduce.shuffle.input.buffer.percent=0.7
mapreduce.reduce.shuffle.memory.limit.percent=0.25
拷贝线程数目由参数"mapreduce.reduce.shuffle.parallelcopies"(默认为5)指定,对于每个待拷贝的文件,如果文件大小小于一定阈值A,则将其放在内存中,否则以文件的形式存放在磁盘上,如果内存中文件满足一定条件D,则会将这些数据写入磁盘,而当磁盘上文件数目达到mapreduce.task.io.sort.factor(默认是10)时,进行一次合并.
阈值A为:
heapsize * {mapreduce.reduce.shuffle.input.buffer.percent} *{mapreduce.reduce.shuffle.memory.limit.percent}
其中,heapsize是通过参数"mapreduce.reduce.java.opts"指定的,默认是200MB,mapreduce.reduce.shuffle.input.buffer.percent默认值为0.7,mapreduce.reduce.shuffle.memory.limit.percent默认0.25。
条件D为以下两个条件中任意一个:
1.内存使用率(总的可用内存为heapsize * {mapreduce.reduce.shuffle.input.buffer.percent})达到mapreduce.reduce.shuffle.merge.percent(默认我0.66)
2.内存中该文件数目超过 mapreduce.reduce.merge.inmem.threshold(默认是1000)。
并行拷贝的时候可能出现reduce内存OOM的问题。
当mapreduce.reduce.shuffle.memory.limit.percent *mapreduce.reduce.shuffle.parallelcopies > 1.0时,就有可能出现shuffle阶段OOM的现象。
解决方案
尝试减小拷贝参数
hive> set mapreduce.reduce.shuffle.input.buffer.percent=0.2;
网友评论