2016年12月1日

Logical Decodingを使ったCDC(Change Data Capture)の実現方法を考えてみる

今年も風物詩である PostgreSQL Advent Calendar の時期がやって参りました。Day1担当のデータマエショリスト @snaga です。
去年もDay1を担当した気がしますが、それはさておき。

余談ですが、今年のAdvent Calendarは
にも参加しております。また、
というのにも(個人的に)チャレンジしていますので、この辺に興味のある方はよろしければどうぞ。

■Logical Decoding?


閑話休題。

皆さんご存知の通り、「Logical Decoding」と呼ばれる機能がPostgreSQL 9.4で導入されました。

PostgreSQLでは「新しい機能入ったらしいが一体何にどう使えばいいんだ?」というような機能が稀によくあります。そのため、2年前にリリースされた機能にも関わらず誰かが使っているという話を聞いたことがない、といったことが起こります。

Logical Decodingにもその空気を感じます。

個人的には Logical Decoding みたいなものはインフラストラクチャーであって機能ではないと思うのですが、リリースノートにガッツリ「新機能!」とか書かれたりする関係上、いろいろなところでプレゼンなどを通して話に聞くようになるものの、具体的に何に使えるかサッパリ分からんというような事態に遭遇します。分かります。私もです。

不憫な機能はそのままそっとしておいてもいいのですが、今回は歳末助け合いの精神を発揮して使いどころを考えてみたいと思います。Advent Calendarですし。

なお、以降のLogical Decodingの話に特に興味のないという方には、とりあえず以下の動画をお楽しみいただければと思います。



ありがとうございました。

■CDC(Change Data Capture)とは何か


CDC(Change Data Capture)とは、その名の通り「変更を検知・検出する処理」です。データウェアハウスなどの情報系(分析系)システムで出てくる用語で、具体的にはETLの一貫として行われることの多い処理になります。

なぜ情報系のシステムで出てくるのかというと、その瞬間の取引の記録を残すのが要件のオンライン系のシステムと異なり、情報系では「一定の期間に渡るデータの変化」を時系列で分析したいという局面が多々あるからです。

例えば、以下のように会員情報のマスタがあった場合、オンライン系のシステムでは「その取引が発生した瞬間に会員が(住所とか各種ステータスとかが)どういう状態だったのか」が分かっていれば用は足ります。


一方で、情報系のシステムで分析をする場合には「どこに住んでいる会員がどれくらいいるのか、それはどれくらい変化しているのか」といった「変化」を把握する必要が出てきます(もちろん分析内容次第ですが)。

そのため、会員情報などのマスタについてもその変化を追える必要がある、具体的に言うと以下のような形式でデータが欲しくなるわけです。


というわけで、データ分析に適した形に変換するためにオンライン系のテーブルからその変更を検出する処理をCDC(Change Data Capture)と言います。

ETL処理についてガッツリ知りたい方は以下の書籍などを参照してください。

■Logical Decodingとは何か


次にLogical Decodingの復習です。

Logical DecodingはPostgreSQL 9.4で実装された機能で、トランザクションログの内容を論理的なレコードとして取り出せる、という機能です。

PostgreSQLはRDBMSですので、もともとのトランザクションログは物理的なログ、つまり「どのブロックのどのオフセットにどのようなバイト列を書き込む」というような形式でした。

が、物理的なログでは活用(再利用)できる範囲が限られるため、論理的なログ(いわゆるレコード)として取得できるようになった、というのがLogical Decodingの意味するところです。

Logical Decodingの基礎は以下の記事を読んでいただければと思いますが、基本的には「テーブルへの変更が論理的なレコードの形式で取得できる」、というものです。

■CDCに必要な要件


というわけで、本エントリでは「Logical DecodingをCDCに使えるのか」という点について、もう少し詳細に見ていこうと思います。

それトリガでやればいいじゃん、とか、夜間バッチで、といった方式もありますが(実際私もやっておりますが)、「ですよねー」と言った瞬間に話が終わるので、今回はLogical Decodingで検討してみることにします。(某社さんのGoldenGateとかってのもこの方式らしいですしおすし)

CDCを実現するに当たって本質的に必要な要件が3つほどあります。(snaga調べ)

まずは「レコードを特定することができ、新規作成か更新かを判別することができる」ということです。つまり、「このレコードは新規レコードなのか、それとも既存のレコードの変更なのか」を判別する必要があるのです。

これは一般的には主キーまたはユニークインデックスの作成されたカラムを使って実現されます。

主キーが「A」というレコードがテーブルに存在している場合、新たに入ってきたログが「A」という主キーを持っていれば既存レコードの更新(UPDATE)、「B」という主キーを持っていたら新規レコード(INSERT)と判別します。

2つ目の要件は「変更を検出したい対象のカラムを絞ることができる」ということです。

つまり、「変更されたら検出したいカラム」と「変更されても検出したくないカラム」を設定できることが重要です。

例えば、会員のマスターとなるテーブルに「住所」と「最終ログイン時刻」のような情報を保持している場合もあるかと思いますが、住所の変更は検出したくても、最終ログイン時刻は検出したくない、といったケースが考えられます。


このような時に「住所」や「ステータス」の変更だけを検出できるというのもCDCに必要な要件になります。

そして、最後は「データが(論理的に)変更されていない時にはログを吐かない」ということです。

これが物理ログと論理ログの違いであり、「論理ログの量」を必要最小限に抑えるために重要な要件となります。

■Logical Decodingのセットアップ


さて、というわけで、そろそろLogical Decodingの出力を詳細に見てみます。

Logical Decodingでは出力プラグインを自由に設定できますが、今回は test_decoding を使います。
test_decodingの詳細はマニュアルを参照してください、と言いたいところなのですが、マニュアルにはまったく詳細が書かれていないので、それは言うだけ野暮というものです。

まず、以下の設定をpostgresql.confにします。
wal_level = logical
max_replication_slot = 3

次に、レプリケーションスロットを作成します。

レプリケーションスロットは、Logical Decodingでデータを取得する対象のデータベース上で作成する必要があるので注意してください。(今回はtestdb)
testdb=# SELECT * FROM pg_replication_slots;
 slot_name | plugin | slot_type | datoid | database | active | active_pid | xmin | catalog_xmin | restart_lsn | confirmed_flush_lsn
-----------+--------+-----------+--------+----------+--------+------------+------+--------------+-------------+---------------------
(0 行)

testdb=# SELECT * FROM pg_create_logical_replication_slot('testslot0', 'test_decoding');
 slot_name | xlog_position
-----------+---------------
 testslot0 | 2/CFE57030
(1 row)

testdb=# SELECT * FROM pg_replication_slots;
 slot_name |    plugin     | slot_type | datoid | database | active | active_pid | xmin | catalog_xmin | restart_lsn | confirmed_flush_lsn
-----------+---------------+-----------+--------+----------+--------+------------+------+--------------+-------------+---------------------
 testslot0 | test_decoding | logical   |  83604 | testdb   | f      |            |      |         4283 | 1/FF467510  | 1/FF467548
(1 行)

testdb=#

まず、テーブルを作成します。
testdb=# create table k1 (
testdb(#   uid integer primary key,
testdb(#   uname text not null,
testdb(#   gname text not null
testdb(# );
CREATE TABLE
testdb=#

この時、レプリケーションスロットからは以下のデータを取得できます。
testdb=# SELECT * FROM pg_logical_slot_get_changes('testslot0', NULL, NULL, 'include-xids', '0');
  location  | xid  |  data
------------+------+--------
 1/FF4675B0 | 4283 | BEGIN
 1/FF4861C0 | 4283 | COMMIT
(2 行)

testdb=#

どうやらDDLのデータは取得できないようです。

■Logical Decodingでは何が出力され、何を取得できるのか


それでは、まずレコードをINSERTしてみます。

testdb=# insert into k1 values (1, 'Park Gyu-ri', 'KARA');
INSERT 0 1
testdb=#

この時、以下のようなログを取得することができます。
testdb=# SELECT * FROM pg_logical_slot_get_changes('testslot0', NULL, NULL, 'include-xids', '0');
  location  | xid  |                                         data
------------+------+--------------------------------------------------------------------------------------
 1/FF4861F8 | 4284 | BEGIN
 1/FF4861F8 | 4284 | table public.k1: INSERT: uid[integer]:1 uname[text]:'Park Gyu-ri' gname[text]:'KARA'
 1/FF486308 | 4284 | COMMIT
(3 行)

testdb=#
主キーの値と、名前が出力されています。

複数レコードを一括してINSERTすると、
testdb=# insert into k1 values (2, 'Nicole Jung', 'KARA'),
testdb-#   (3, 'Goo Ha-ra', 'KARA'),
testdb-#   (4, 'Han Seung-yeon', 'KARA'),
testdb-#   (5, 'Kang Ji-young', 'KARA');
INSERT 0 4
testdb=#

以下のようなログになります。
testdb=# SELECT * FROM pg_logical_slot_get_changes('testslot0', NULL, NULL, 'include-xids', '0');
  location  | xid  |                                          data
------------+------+-----------------------------------------------------------------------------------------
 1/FF486340 | 4285 | BEGIN
 1/FF486340 | 4285 | table public.k1: INSERT: uid[integer]:2 uname[text]:'Nicole Jung' gname[text]:'KARA'
 1/FF4863D0 | 4285 | table public.k1: INSERT: uid[integer]:3 uname[text]:'Goo Ha-ra' gname[text]:'KARA'
 1/FF486460 | 4285 | table public.k1: INSERT: uid[integer]:4 uname[text]:'Han Seung-yeon' gname[text]:'KARA'
 1/FF4864F0 | 4285 | table public.k1: INSERT: uid[integer]:5 uname[text]:'Kang Ji-young' gname[text]:'KARA'
 1/FF4865B0 | 4285 | COMMIT
(6 行)

testdb=#

次に、主キーを指定してレコードを更新してみます。
testdb=# update k1 set uname = 'Nicole' where uid = 2;
UPDATE 1
testdb=#

この時のログは以下のようになります。主キーおよび更新された属性(今回は名前)が出力されています。
testdb=# SELECT * FROM pg_logical_slot_get_changes('testslot0', NULL, NULL, 'include-xids', '0');
  location  | xid  |                                      data
------------+------+---------------------------------------------------------------------------------
 1/FF4865E8 | 4286 | BEGIN
 1/FF4865E8 | 4286 | table public.k1: UPDATE: uid[integer]:2 uname[text]:'Nicole' gname[text]:'KARA'
 1/FF486670 | 4286 | COMMIT
(3 行)

testdb=#

では次に、主キーを指定しないで更新してみます。
testdb=# update k1 set uname = 'Nicole Jung' where uname = 'Nicole';
UPDATE 1
testdb=#

この時、以下のログを取得できます。主キーを指定しない更新でしたが、ログには主キーの情報も出力されています。
testdb=# SELECT * FROM pg_logical_slot_get_changes('testslot0', NULL, NULL, 'include-xids', '0');
  location  | xid  |                                         data
------------+------+--------------------------------------------------------------------------------------
 1/FF4866A8 | 4287 | BEGIN
 1/FF4866A8 | 4287 | table public.k1: UPDATE: uid[integer]:2 uname[text]:'Nicole Jung' gname[text]:'KARA'
 1/FF486730 | 4287 | COMMIT
(3 行)

testdb=#

なお、値が変わらない更新をしてみると、
testdb=# update k1 set uname = uname;
UPDATE 5
testdb=#

律儀に全レコードの更新ログが出力されます。
testdb=# SELECT * FROM pg_logical_slot_get_changes('testslot0', NULL, NULL, 'include-xids', '0');
  location  | xid  |                                          data
------------+------+-----------------------------------------------------------------------------------------
 1/FF4869C0 | 4288 | BEGIN
 1/FF4869C0 | 4288 | table public.k1: UPDATE: uid[integer]:1 uname[text]:'Park Gyu-ri' gname[text]:'KARA'
 1/FF486A18 | 4288 | table public.k1: UPDATE: uid[integer]:3 uname[text]:'Goo Ha-ra' gname[text]:'KARA'
 1/FF486A70 | 4288 | table public.k1: UPDATE: uid[integer]:4 uname[text]:'Han Seung-yeon' gname[text]:'KARA'
 1/FF486AD0 | 4288 | table public.k1: UPDATE: uid[integer]:5 uname[text]:'Kang Ji-young' gname[text]:'KARA'
 1/FF486B30 | 4288 | table public.k1: UPDATE: uid[integer]:2 uname[text]:'Nicole Jung' gname[text]:'KARA'
 1/FF486BB8 | 4288 | COMMIT
(7 行)

testdb=#

最後に主キーを指定せずに削除すると
testdb=# delete from k1;
DELETE 5
testdb=#

主キーのみを出力として含むログを取得できます。
testdb=# SELECT * FROM pg_logical_slot_get_changes('testslot0', NULL, NULL, 'include-xids', '0');
  location  | xid  |                  data
------------+------+-----------------------------------------
 1/FF486BF0 | 4289 | BEGIN
 1/FF486BF0 | 4289 | table public.k1: DELETE: uid[integer]:1
 1/FF486C30 | 4289 | table public.k1: DELETE: uid[integer]:3
 1/FF486C70 | 4289 | table public.k1: DELETE: uid[integer]:4
 1/FF486CB0 | 4289 | table public.k1: DELETE: uid[integer]:5
 1/FF486CF0 | 4289 | table public.k1: DELETE: uid[integer]:2
 1/FF486D60 | 4289 | COMMIT
(7 行)

testdb=#

■要するに、Logical DecodingはCDCに使えるのか?


ここまで見てきたように、Logical Decodingではテーブルに主キーが存在していれば、主キーを指定しない更新であってもログに主キーが出力されることが分かりました。そのため、「レコードを特定して新規か更新かを判別する」ということが可能になります。

一方で、「変更を検知したいカラムだけを対象にする」という要件については、現在のLogical Decoding(というか test_decoding プラグイン)の仕様としては、(変更されていないカラムも含めて)すべてのカラムの変更の際にログが出力されることになります。よって、変更を検知する対象としてカラムを絞りたいといった場合には別のしくみが必要になります。

通信の負荷などを考えると、Loigcal Decodingのログを受け取るアプリ側ではなく、ログを出力する Output プラグイン側でフィルターできるようにするべきでしょう。

また、「論理的に値が変わっていない時にはログを吐かない」という点についても、もう一工夫が必要なように感じます。

test_decoding のソースを見ると、テーブルの各カラムの情報である tupledesc と、更新前および更新後のタプルのデータ oldtuple と newtuple を扱えるようですので、この辺りを使えばCDCに必要な要件を実現できるように思います。(汎用的に実現するにはそれなりに手間がかかりそうですが・・・)
/*
 * callback for individual changed tuples
 */
static void
pg_decode_change(LogicalDecodingContext *ctx, ReorderBufferTXN *txn,
                 Relation relation, ReorderBufferChange *change)
{
    ...
    TupleDesc    tupdesc;
    ...
    tupdesc = RelationGetDescr(relation);
    ...
        case REORDER_BUFFER_CHANGE_UPDATE:
            appendStringInfoString(ctx->out, " UPDATE:");
            if (change->data.tp.oldtuple != NULL)
            {
                appendStringInfoString(ctx->out, " old-key:");
                tuple_to_stringinfo(ctx->out, tupdesc,
                                    &change->data.tp.oldtuple->tuple,
                                    true);
                appendStringInfoString(ctx->out, " new-tuple:");
            }

            if (change->data.tp.newtuple == NULL)
                appendStringInfoString(ctx->out, " (no-tuple-data)");
            else
                tuple_to_stringinfo(ctx->out, tupdesc,
                                    &change->data.tp.newtuple->tuple,
                                    false);
            break;

■まとめ


まとめます。
  • 誰か汎用CDC用プラグイン作ってください
  • 「トリガーとかバッチでいいじゃん」って言わない
PostgreSQL Advent Calendar 2016、Day2の明日の担当は @seikoudoku2000 さんです。

では、また。

みなさま、良いお年を。

0 件のコメント:

コメントを投稿