2016年7月5日

パラレル処理可能な集約関数をPL/Pythonで作成する

先日、次期メジャーバージョンの9.6のbeta2がリリースされました。
9.6では、集約関数やJOINなどもパラレルクエリに対応しており、パラレル処理されるようになっていますので、みなさんはもちろんパラレルクエリをゴリゴリと検証されている最中かと思います。

また、言うまでもないことですが、パラレルクエリはデータ分析のためにあり、データ分析といえばPythonなわけです。

本エントリでは、そんなパラレルクエリとデータ分析大好きな方たちに向けて、パラレル処理が可能な集約関数をPL/Pythonで作成する方法を紹介します。

前提としているバージョンは、PostgreSQL 9.6 beta2 です。

■PL/Pythonでの集約関数の作成


パラレルクエリに関係のない通常の集約関数をPL/Pythonで作成する方法については以前書きましたので、以下のエントリを参照してください。
本エントリは、この内容を理解していることが前提となります。

■パラレル対応の集約関数とは?


では、パラレルクエリでパラレル実行可能な集約関数の作り方を見てみましょう。

まずは、パラレルクエリに対応した集約関数がどのように作成されているのかを見てみます。

ここではサンプルとしてavg関数を見ていきます(avg関数はPL/PythonではなくCで書かれた組み込みの集約関数です)。

関連するマニュアルとしては、以下の項目を参照してください。

■集約avgの定義


以下の例はint4を引数として取る場合のavg集約関数の定義です。

testdb=# \x
Expanded display is on.
testdb=# select * from pg_aggregate where aggtransfn::text = 'int4_avg_accum';
-[ RECORD 1 ]----+-------------------
aggfnoid         | pg_catalog.avg
aggkind          | n
aggnumdirectargs | 0
aggtransfn       | int4_avg_accum
aggfinalfn       | int8_avg
aggcombinefn     | int4_avg_combine
aggserialfn      | -
aggdeserialfn    | -
aggmtransfn      | int4_avg_accum
aggminvtransfn   | int4_avg_accum_inv
aggmfinalfn      | int8_avg
aggfinalextra    | f
aggmfinalextra   | f
aggsortop        | 0
aggtranstype     | 1016
aggserialtype    | 0
aggtransspace    | 0
aggmtranstype    | 1016
aggmtransspace   | 0
agginitval       | {0,0}
aggminitval      | {0,0}

testdb=#

これを見て分かるのは、どうやら int4_avg_accum 、 int8_avg 、 int4_avg_combine 、 int4_avg_accum_inv という関数が作成されているようだ、ということです。

それでは、これらの関数が何をしているのかを見てみることにします。

■関数


上記で出てきた関数名でバックエンドのソースコードを検索すると、 src/backend/utils/adt/numeric.c に関数定義を見つけることができます。

それぞれの関数について処理内容を確認すると、以下のようになります。
Datum int4_avg_accum(PG_FUNCTION_ARGS)
整数を加算していく関数。各ワーカーによって実行される。

ArrayType *transarray と int32 newval を引数として受け取る。transarray から Int8TransTypeData *transdata を取り出して、 count と sum を計算(加算)する。
Datum int4_avg_combine(PG_FUNCTION_ARGS)
複数のワーカーの計算結果をまとめる関数。リーダーによって実行される。

ArrayType *transarray1 と ArrayType *transarray2 を引数として受け取る。transarray2 の中身を transarray1 に加算して、transarray1 を返却する。
Datum int4_avg_accum_inv(PG_FUNCTION_ARGS)
平均の計算のための積算処理を逆向きに処理する関数。

ウィンドウ関数で使用される場合に、パフォーマンス改善のために使用される。パラレルクエリとは関係はない。
Datum int8_avg(PG_FUNCTION_ARGS)
最終的な結果を計算して返却する関数。リーダーによって実行される。

ArrayType *transarray を引数として受け取って、その中から Int8TransTypeData *transdata を取り出す。transdata から総計 sum とレコード数 count を取り出して、numeric_div 関数を使って平均値を計算、返却する。

■データ型


typedef struct Int8TransTypeData
{
    int64       count;
    int64       sum;
} Int8TransTypeData;
平均の計算のために必要な中間結果(総和とレコード数)を保持する構造体。

avg関数では、平均値を計算するため、中間結果(state)としてレコード数と値の総和を保持している。

■パラレル処理可能なmin関数をPL/Pythonで実装する


ここまでで、インターフェースと処理内容はなんとなく見えてきましたので、実際にパラレル処理可能なmin関数をPL/Pythonで作ってみましょう。

まず、pyminという集約関数を以下のように定義します。
CREATE AGGREGATE pymin (float8)
(
    sfunc = float8_pymin,
    stype = float8,
    combinefunc = float8_pymin_combine,
    parallel = safe
);
非パラレルの集約関数の定義と異なっているのは、combinefunc の定義と parallel = safe の定義です。

次に、state更新関数を以下のように作成します。

state更新関数は、stateを第一引数に、新しい値を第二引数として受け取り、更新されたstateを返却する関数として実装します。今回の場合は、新しい値の方が小さかったらそちらを新しくstateとして返却します。
CREATE FUNCTION float8_pymin(s float8, n float8)
  RETURNS float8
AS $$
    global s

    if n is not None:
        if s is None or n < s:
            s = n
    return s
$$
LANGUAGE plpython2u;
次に、各ワーカーが処理した結果をまとめるためのcombine関数を実装します。

combine関数は、二つのstateを受け取り、それを一つのstateにまとめて返却します。今回の場合は2つのfloat8を受け取り、より小さい方を返却します。
CREATE FUNCTION float8_pymin_combine(s1 float8, s2 float8)
  RETURNS float8
AS $$
    global s1, s2

    if s1 is None:
        s1 = s2
    elif s1 > s2:
        s1 = s2
    return s1
$$
LANGUAGE plpython2u;
これらの関数を作成して、実際に動作確認をしてみます。

以下は、generate_series()関数を使って100万行のテーブルを作成した後、そのテーブルに対して今回のpymin関数を実行している様子です。

EXPLAIN ANALYZEの結果を見ると、ワーカープロセスが起動してパラレル処理が行われていることが分かりますし、組み込みのmin関数と比較しても結果が正しく返っていることが分かります。
testdb=# CREATE TABLE t1 AS
testdb-# SELECT generate_series(1,1000000) c1;
SELECT 1000000
testdb=#
testdb=# SHOW force_parallel_mode;
 force_parallel_mode
---------------------
 off
(1 row)

testdb=# SHOW max_parallel_workers_per_gather;
 max_parallel_workers_per_gather
---------------------------------
 2
(1 row)

testdb=# SELECT relname,reloptions FROM pg_class WHERE relname = 't1';
 relname | reloptions
---------+------------
 t1      |
(1 row)

testdb=# EXPLAIN (VERBOSE,ANALYZE) SELECT pymin(c1) FROM t1;
                                                                  QUERY PLAN
----------------------------------------------------------------------------------------------------------------------------------------------
 Finalize Aggregate  (cost=128841.73..128841.74 rows=1 width=8) (actual time=3869.709..3869.710 rows=1 loops=1)
   Output: pymin((c1)::double precision)
   ->  Gather  (cost=128841.02..128841.23 rows=2 width=8) (actual time=3869.458..3869.479 rows=3 loops=1)
         Output: (PARTIAL pymin((c1)::double precision))
         Workers Planned: 2
         Workers Launched: 2
         ->  Partial Aggregate  (cost=127841.02..127841.03 rows=1 width=8) (actual time=3850.664..3850.665 rows=1 loops=3)
               Output: PARTIAL pymin((c1)::double precision)
               Worker 0: actual time=3847.931..3847.933 rows=1 loops=1
               Worker 1: actual time=3835.468..3835.469 rows=1 loops=1
               ->  Parallel Seq Scan on public.t1  (cost=0.00..9126.56 rows=470156 width=4) (actual time=0.351..1123.692 rows=333333 loops=3)
                     Output: c1
                     Worker 0: actual time=0.195..1193.261 rows=359114 loops=1
                     Worker 1: actual time=0.754..1075.905 rows=314994 loops=1
 Planning time: 0.340 ms
 Execution time: 3873.790 ms
(16 rows)

testdb=# SELECT pymin(c1) FROM t1;
 pymin
-------
     1
(1 row)

testdb=# SELECT min(c1) FROM t1;
 min
-----
   1
(1 row)

testdb=#

■パラレル処理可能なavg関数をPL/Pythonで実装する


次にavg関数相当を実装してみます。

avg関数は、min関数と違い、終了するときにfinal関数で平均値を計算するための計算処理が必要となります。そのため、stateを保持する方法もminと少し異なります。

まず、以下のようにpyavg関数を定義します。pyminとの違いは、stype(stateのためのデータ型)が float8[] になっていること、および finalfunc が定義されていることです。
CREATE AGGREGATE pyavg (float8)
(
    sfunc = float8_pyavg,
    stype = float8[],
    combinefunc = float8_pyavg_combine,
    finalfunc = float8_pyavg_final,
    parallel = safe
);
次に、新しい値を受け取ってstateを更新する float8_pyavg を以下のように作成します。

ここで気を付けて欲しいのは、stateの要素0番目は総和であり、要素1番目はレコード数である、ということです。final関数では、これらを使って平均値を計算することになります。
CREATE FUNCTION float8_pyavg(s float8[], n float8)
  RETURNS float8[]
AS $$
    global s

    if n is not None:
        if s is None:
            s = []
            s.append(0)
            s.append(0)
        # s[0]:sum, s[1]:count
        s[0] = s[0] + n
        s[1] = s[1] + 1
    return s
$$
LANGUAGE plpython2u;
次はワーカーの結果をまとめるcombine関数です。avgの場合、ワーカーの結果をまとめるためには、単に加算するだけで問題ありません。
CREATE FUNCTION float8_pyavg_combine(s1 float8[], s2 float8[])
  RETURNS float8[]
AS $$
    global s1, s2

    if s1 is None:
        s1 = []
        s1.append(s2[0])
        s1.append(s2[1])
    else:
        s1[0] += s2[0]
        s1[1] += s2[1]
    return s1
$$
LANGUAGE plpython2u;
そして最後にfinal関数で、平均値を計算して返却します。
CREATE FUNCTION float8_pyavg_final(s float8[])
  RETURNS float8
AS $$
    global s

    # s[0]:sum, s[1]:count
    return s[0]/s[1]
$$
LANGUAGE plpython2u;
そして、この集約関数を実行すると以下のような結果が得られます。

今回作ったpyavg関数はパラレル実行されており、結果についても組み込みのavg関数と同じ結果が得られていることが分かります。
testdb=# EXPLAIN (verbose,analyze) SELECT pyavg(c1) FROM t1;
                                                                 QUERY PLAN
---------------------------------------------------------------------------------------------------------------------------------------------
 Finalize Aggregate  (cost=114800.96..114800.97 rows=1 width=8) (actual time=4469.722..4469.723 rows=1 loops=1)
   Output: pyavg((c1)::double precision)
   ->  Gather  (cost=114800.00..114800.21 rows=2 width=32) (actual time=4468.969..4469.632 rows=3 loops=1)
         Output: (PARTIAL pyavg((c1)::double precision))
         Workers Planned: 2
         Workers Launched: 2
         ->  Partial Aggregate  (cost=113800.00..113800.01 rows=1 width=32) (actual time=4453.186..4453.195 rows=1 loops=3)
               Output: PARTIAL pyavg((c1)::double precision)
               Worker 0: actual time=4453.345..4453.369 rows=1 loops=1
               Worker 1: actual time=4437.589..4437.591 rows=1 loops=1
               ->  Parallel Seq Scan on public.t1  (cost=0.00..8591.67 rows=416667 width=4) (actual time=0.032..923.497 rows=333333 loops=3)
                     Output: c1
                     Worker 0: actual time=0.041..876.075 rows=350250 loops=1
                     Worker 1: actual time=0.041..986.962 rows=340808 loops=1
 Planning time: 0.218 ms
 Execution time: 4471.432 ms
(16 rows)

testdb=# SELECT pyavg(c1) FROM t1;
  pyavg
----------
 500000.5
(1 row)

testdb=# SELECT avg(c1) FROM t1;
         avg
---------------------
 500000.500000000000
(1 row)

testdb=#

■まとめ


今回はパラレルクエリで並列実行可能な集約関数をPL/Pythonで作成する方法をご紹介しました。

データ分析の処理をデータベースで実装しようとすると、独自の集約関数で実装したくなることが多々発生します。

また、9.6ではせっかくパラレルクエリが使えるようになったわけですから、自分の実装する処理も極力パラレル処理可能にすることで、CPUを効果的に使ってデータ分析に必要となる処理時間を短縮したくなるというのは、ある意味で当たり前と言えます。

データ分析と親和性の高いPythonで独自の集約関数をパラレル処理できることが確認できましたので、ぜひいろいろな分析処理を並列化して、新しいPostgreSQLを使い倒してみていただければと思います。

では、また。

0 件のコメント:

コメントを投稿