俺の報告

RoomClipを運営するエンジニアの日報(多分)です。

日報 #80 - AWS Data Pipeline再び

グッと冷え込みましたね。
寒いと暑いとどっちが好きか投票を大規模にとってみたいものです。
僕は断然暑いの好き派ですが、案外寒いも人気のようです。
マックスどうでもいい話でしたが、何にでも枕ってのは必要なんですよ。
さて、本日の本題。

ちょっと前にData Pipelineについて触れましたが、その手順をもう少し明らかにします。
前回は回すのに精一杯でしたので、、、

RDSからS3へバックアップし、S3からRedshiftへ同期

最終的に作り終えた図はこんな感じです。

f:id:tom_rc:20141115003757p:plain

S3が2つありますが、これはCopy動作に時間差を作るためです。
DataPipelineはそこそこ面白い仕組みだと思いますが、
如何せんよく分からない仕様が多いです。
スケジュールもその1つで、DataNodeに対して付与したスケジュールが何を意味するのかがわかりづらいです。
最終的に@領域でStartTimeなどの環境変数を持ってくるためにセットしただけのように見えます。
まぁとりあえず手順を記載しましょう。

何はともあれAdd DataNode

とにかく4つ作ります。
RDS Nodeは下記の通り。

  • Type : MySqlDataNode
  • Schedule : sch_1
  • Username : hoge
  • Password : hogehoge
  • Table : hoge_table
  • ConnectionString : jdbc:mysql://{RDSのEndpoint}:3306/{DBName}
  • Select Query : クエリ文(後述)

という設定。
クエリ文は「毎日の差分をとる」という意味で、EventStartの時間が必要になります。
ということで下記のように設定。

SELECT id, date
FROM hoge_table
WHERE date <= '#{@scheduledStartTime.format('YYYY-MM-dd HH:mm:ss')}'
AND date > date_add('#{@scheduledStartTime.format('YYYY-MM-dd HH:mm:ss')}', interval -1 day);

#{@scheduledStartTime.format('YYYY-MM-dd HH:mm:ss')}

この記述の仕方でとりあえずスケジュールのスタートタイムが取れます。
で、「前日分」の表現は、MySQLのdate関数を使います。

さて続いて、S3 Nodeについて。

  • Type : S3DataNode
  • Schedule : sch_1
  • File Path: s3://{S3のパス}/hoge/#{@scheduledStartTime.format('YYYY-MM-dd')}_.csv

これは簡単のように見えますが、最終的にRedshiftに打ち込むので、S3の領域はRedshiftと同じリージョンにしておく必要があります。
また、CSV名にはここでもsch_1オブジェクトのStartTimeを使いましょう。
続いてS3_2 Nodeについて。
S3 Nodeとの変更点は、Scheduleをsch_2にするだけです。
ここでスケジュールオブジェクトを変更しているのは、
S3へのバックアップ処理が終わった後、Redshiftへの転送をするため、
Activityのスケジュールが変わるからです。
続いてRedshiftについて。

  • Type : RedshiftDataNode
  • Schedule : sch_2
  • Table Name : hoge_table
  • Database : rdsft_db(後述)

さて、続いてactivityを追加します。
activityは2個。
RDS to S3 と S3 to Redshift です。

[RDS to S3]
* Type : CopyActivity
* Input : RDS
* Schedule : sch_1
* Runs On : EC2_1(後述) * Output : S3

[S3 to Redshift]
* Type : RedshiftCopyActivity
* Input : S3_2
* Schedule : sch_2
* Insert Mode : OVERWRITE_EXISTING * Runs On : EC2_2(後述) * Output : Redshift

さて、味噌はスケジュールを変更しているところです。
sch_1のアクティビティの後、sch_2を実行するため、ここを変更しなければなりません。
ただ、そうしてしまうとアクティビティがまたぐDataNode間のスケジュールが一貫しないため、
Validation Errorが吐出されます。
そのため、RDS to S3用にS3 Nodeとスケジュールを1つ(sch_1)、
S3 to Redshift用にS3 Nodeとスケジュールを1つ(sch_2)、作成しないといけません。
また、RedshiftへのInsertModeをOVERWRITEにしているのは、
RDSからの抽出SQLを工夫することで、上手く「UPDATE文」を救える事ができるので、
そうだとするならば、上書き保存のほうがより「同期」に近いと考えられるからです。

さて、残りはリソース達の設定です。
まずrdsft_db、つまりRedshiftDatabaseの設定です。

  • Name : rdsft_db
  • Type : RedshiftDatabase
  • Region : {Redshiftと同じリージョン}
  • Database Name : {RedshiftのDB名}
  • Username : hoge
  • Password : hoge
  • Cluster Id : hoge

続いて、EC2_1について。

  • Type : Ec2Resource
  • Instance Type : t1.micro (こうしないとsmallで立ち上がります)
  • Terminate After : 10 min
  • Schedule : sch_1
  • Security Groups : hoge
  • Role, Resource Roleは初期値のまま

EC2_2については、要注意です。 EC2インスタンスを立ち上げるRegionをRedshiftと一緒にしないといけません。
また、そこでSecurity Groupsを作成しておき、Redshiftにパスを通しておく必要があります。

大体以上で大丈夫です。
あとは素直にSave pipelineした後に出るErrors/Warningsを解決していきましょう。
長くなりましたが、既存のバッチサーバなどに依存しないでバックアップを取る方法の紹介でした。

あ、ちなみに蛇足ですが、
「今までの分を一気になんとかしたい…」というときは、純粋にCOPYコマンドが便利です。
とにかくRDSからCSVを取得してきて、S3にアップ。
そのあとSQLWorkBenchでもなんでも繋いでから、下記のようにクエリを打つと簡単にS3からRedshiftへ展開できます。本当、これができるんならData Pipelineなんて必要なくない?って思うけど、あくまでEC2群とは切り離されてるってのが重要なのね。

Copy構文

COPY
{TABLE_NAME}
FROM 's3://{FILE_PATH}'
CREDENTIALS 'aws_access_key_id=[your access key];aws_secret_access_key=[your secret access key]' delimiter ',' REMOVEQUOTES;

早くて便利ですよ。