SQL感覚でHiveQLを書くと痛い目にあう例
Hadoop Advent Calendar 2013 4日目の記事です
tl;dr
- explainとjob historyを読め
- 1 reducerは悪
- data skewは悪
前書き
みんな大好きSQLでHadoop上での処理を実行できるHiveにはみなさん普段からお世話になっていることでしょう。ちょっと調べ物でググる度に目に入る愛らいしいマスコットが、荒んだ心に清涼な風をはこんでくれます。
ですがHiveのクエリ言語はSQLではなくHiveQLですし、実行エンジンもRDBのそれとは全く異なるMapReduceです。SQLのつもりでHiveQLを書いていると地雷を踏んでしまうことがまれによくあります。本エントリでは陥りがちなHiveQLの落とし穴を2つ紹介します。
例1
SELECT count(DISTINCT user_id) FROM access_log
SQLに慣れた方であれば、集約関数の中に DISTINCT や ORDER BY を入れて用いることは多いと思います。Hiveでは全ての集約関数で利用できるわけではないのですが、この例のように count 内での DISTINCT は利用することができます。
例のHiveQLではアクセスログからユニークユーザー数を計算しています。一つのクエリで完結していて美しいですね。一体どこが問題なのでしょうか?
データによるところが大きいですが、以下のようにクエリを書くと速くなる場合があります。
SELECT
count(*)
FROM (
SELECT
DISTINCT
user_id
FROM
access_log
) t
せっかく count(DISTINCT )
で綺麗に一つにまとめられていたところをわざわざサブクエリに分割しています。なぜこちらの方が速くなるのでしょうか?
一つ目のクエリでEXPLAINを実行すると以下の様なプランになります。
ここで重要な事は、全体として一つのMapReduceになっている、ということです。一つのMapReduceで重複を除きつつカウントを行うなら、Reducerは一つで処理を実行する必要があります。そのためReducerで分散処理ができず、遅くなってしまうことがある、というわけです。
STAGE DEPENDENCIES:
Stage-1 is a root stage
Stage-0 is a root stage
STAGE PLANS:
Stage: Stage-1
Map Reduce
Alias -> Map Operator Tree:
access_log
TableScan
alias: access_log
Select Operator
expressions:
expr: user_id
type: string
outputColumnNames: user_id
Group By Operator
aggregations:
expr: count(DISTINCT user_id)
bucketGroup: false
keys:
expr: user_id
type: string
mode: hash
outputColumnNames: _col0, _col1
Reduce Output Operator
key expressions:
expr: _col0
type: string
sort order: +
tag: -1
value expressions:
expr: _col1
type: bigint
Reduce Operator Tree:
Group By Operator
aggregations:
expr: count(DISTINCT KEY._col0:0._col0)
bucketGroup: false
mode: mergepartial
outputColumnNames: _col0
Select Operator
expressions:
expr: _col0
type: bigint
outputColumnNames: _col0
File Output Operator
compressed: true
GlobalTableId: 0
table:
input format: org.apache.hadoop.mapred.TextInputFormat
output format: org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat
Stage: Stage-0
Fetch Operator
limit: -1
一方、二つ目のクエリは、サブクエリを用いているためMapReduceの数は増えていますが、user_idをpartition keyとしてデータが分割されるため、Reducerでも効率よく分散処理を行うことができます。
STAGE DEPENDENCIES:
Stage-1 is a root stage
Stage-2 depends on stages: Stage-1
Stage-0 is a root stage
STAGE PLANS:
Stage: Stage-1
Map Reduce
Alias -> Map Operator Tree:
t:access_log
TableScan
alias: access_log
Select Operator
expressions:
expr: user_id
type: string
outputColumnNames: user_id
Group By Operator
bucketGroup: false
keys:
expr: user_id
type: string
mode: hash
outputColumnNames: _col0
Reduce Output Operator
key expressions:
expr: _col0
type: string
sort order: +
Map-reduce partition columns:
expr: _col0
type: string
tag: -1
Reduce Operator Tree:
Group By Operator
bucketGroup: false
keys:
expr: KEY._col0
type: string
mode: mergepartial
outputColumnNames: _col0
Select Operator
Select Operator
Group By Operator
aggregations:
expr: count()
bucketGroup: false
mode: hash
outputColumnNames: _col0
File Output Operator
compressed: true
GlobalTableId: 0
table:
input format: org.apache.hadoop.mapred.SequenceFileInputFormat
output format: org.apache.hadoop.hive.ql.io.HiveSequenceFileOutputFormat
Stage: Stage-2
Map Reduce
Alias -> Map Operator Tree:
hdfs://cdh4cluster/tmp/hive-okuno/hive_2013-12-04_13-33-10_514_1739731017764214960/-mr-10002
Reduce Output Operator
sort order:
tag: -1
value expressions:
expr: _col0
type: bigint
Reduce Operator Tree:
Group By Operator
aggregations:
expr: count(VALUE._col0)
bucketGroup: false
mode: mergepartial
outputColumnNames: _col0
Select Operator
expressions:
expr: _col0
type: bigint
outputColumnNames: _col0
File Output Operator
compressed: true
GlobalTableId: 0
table:
input format: org.apache.hadoop.mapred.TextInputFormat
output format: org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat
Stage: Stage-0
Fetch Operator
limit: -1
この二つの例のように、効率よくReducerを利用できているかどうか、というのは正直なところEXPLAINを見ているだけでは分かりません(熟練すれば分かるかもしれませんが)。そういう場合でも、実際にクエリを実行してみればReducerで詰まっている様子が一目で分かると思います。
例2
例2のクエリはこちら。
SELECT
sales.product_id,
sum(product.price * sales.num)
FROM
sales
INNER JOIN
product ON sales.product_id = product.product_id
GROUP BY
sales.product_id
販売履歴に商品マスタをJOINして、商品毎の売上をだしている、と想定して下さい。
このクエリは以下のようにすると速くなる可能性があります。(もちろんデータによります)
SELECT
sales.product_id,
product.price * total_num
FROM (
SELECT
product_id,
sum(num) AS total_num
FROM
sales
GROUP BY
product_id
) sales
INNER JOIN
product ON sales.product_id = product.product_id
このクエリもSQLに慣れた人なら避けて最初の例のように書くのではないでしょうか。
後者の例が速くなるポイントはデータの偏り(data skew)です。
一つ目のクエリでは、salesおよびproductのデータがproduct_idでpartitionされてReducerに配られます。その時、sales内に飛び抜けて売れた商品があると、あるReducerにだけデータが大量に集まってきてしまいます。そうした大量のデータに対するJOINは非常に遅い処理になってしまいます。その結果、そのReducerだけ処理時間が長くなってしまい、結局Job全体としても遅くなります。
一方、二つ目のクエリではMapReduceの数は増えてしまいますが、一段目のMapReduceではMap側集約を利用でき効率よく集約を行うことができます。二段目のMapReduceでは一段目でsalesがproduct_idで集約されて各product_idについて一行しか存在しないため、productとのJOINも非常に軽い処理で済むようになっています。
但し、product側が十分に小さくmap-site joinが利用できる場合は話が全く別です。その場合は、まず間違いなく一つ目のクエリの方が速くなるでしょう。
まとめ
Hiveは大変便利なのですが、上記の例のようにデータの量や偏りによって効率のいいクエリが全く異なるケースがあって厄介です(RDBでも同じですが)。クエリを選択する際にはSQLの常識は通じないことが多いので、Hiveを利用する際にはその事を意識しておくべきでしょう。めんどうでもEXPLAINでプランを見つつ、実際に実行してみて効率の悪いMapReduceになっていないか常にチェックしていくしかないと思います。