今回はGAE/JからBigQueryへのStream Insertを試してみます。
目次
BigQueryへのデータ取り込み手段
BigQueryへのデータ取り込み手段は、大きく分けて
- プログラムまたはコマンドラインからジョブを使ってデータを一括で取り込む方法
- stream APIで個別にデータを流す方法
- Developer Console上から直接ファイルをアップロードする方法
があります。
(※参考:https://cloud.google.com/bigquery/loading-data-into-bigquery?hl=ja )
データ取り込み方法の比較
手段 | Data Format | quota制限 | 送信元 |
---|---|---|---|
ジョブ | CSV JSON |
1テーブル毎ジョブ実行可能数:1000回/日 1プロジェクト毎ジョブ実行可能数:10000回/日送信可能ファイルサイズ:圧縮済み→1GB、未圧縮→1TB |
ローカルファイル GAE GCS |
stream | CSV JSON |
1insertごとの最大データサイズ: 1MB 1秒あたり最大投入可能行数:10000行 1リクエストあたり最大投入可能行数:500行 |
GAE |
Developer Consoleからアップロード | CSV JSON DataStoreのバックアップデータ |
明示資料無し | ローカルファイル GCS |
また、Developer Console上からファイルをアップロードする場合に関してはドキュメントに明記がなかったので推測ではありますが、quota制限はジョブと同じような感じではないかと思います。
ジョブを使うメリットとしては1度のデータ読み込みで大きなデータファイルを送れることやGCS上のファイルを送れること、streamを使うメリットは一度に送れるデータ量はそれほどではないものの、ジョブ実行時の遅延もないので割とリアルタイムにデータ送信、データ分析を行えること等が挙げられます。
これらの方法で読み込んだデータをBigQueryに新しく作ったテーブルや既存のテーブルに追加したり、テーブルのデータの書き換えたりすることが出来ます。
Stream Insert
今回は、上述したデータ取り込み方法のうち、Streamを使ったBigQueryへのデータ取り込み方法の詳細と実装例について解説します。
以下、基本は https://cloud.google.com/bigquery/streaming-data-into-bigquery の超訳になっています。
BigQueryの概要
BiqQueryでは、BigQueryにデータを読み込むジョブを使う代わりに、tabledata().insertall()メソッドを用いて、データを一度にBigQueryの1レコードへ流し込む方法を選択することが出来ます。この方法ではジョブ読み込み実行の遅延なしに、データ参照が可能になっています。一方で、この方法を用いる前に考慮すべきいくつかの重要なトレードオフがあります。
クォータポリシー
- (1行あたり)最大行サイズ: 20KB
- insertごとの行全体の最大データサイズ: 1MB
- 秒間最大行数: テーブルごとに10000行。これを超えると、quota_exceededエラーになる。これ以上使いたい場合はここからお問い合わせをすると上限を上げてくれるかも?
- リクエストごとの最大行数: 500行
- 秒間最大バイト数: テーブルごとに10MB。これを超えるとquota_exceededエラーになる。
データ可用性
Stream Insertを行うと最初に2分間のウォームアップがあるのでその間はデータにアクセスできません。ウォームアップ完了後は、データをinsertしている間もデータへの参照が可能です。
また、数時間非アクティブな状態が続くと、次回insert時にウォームアップが発生します。
さらに、データのコピーやエクスポートが出来るようになるまで90分かかります。
データの整合性
データの整合性を保証するために、insertされた各行にinsertIdを付与します。BigQueryはこのIDを少なくとも1分間覚えています。もしこの時間内に同じStreamIdがセットされた行をinsertしようとしたら、ベストエフォート方式で重複排除を行う際に、このIdを使用します。
この方法を用いる主な理由としては、あるエラーにおける、Stream Insertの状態を定める方法がないからです。例えば、BigQueryとの間におけるネットワークエラーやBigQueryの内部エラーなどです。
稀に地域のデータセンターが利用不可な場合、データの重複が起こる可能性はありますが・・。新しい行のinsert時には別地域のデータセンターにルーティングされますが、利用不可なデータセンターのデータとの重複排除はできません。
もし、より強いデータの整合性を求めるらば、トランザクションをサポートしているGoogle Cloud Datastoreが代替サービスと言えるでしょう。
ユースケース
1.大量のイベントログを取りたいケース
リアルタイムに大量のデータを収集している場合、Stream Insertはいい選択だと思います。
これらのケースでは主に以下のような基準があります。
- トランザクショナルでない→大量で継続的に行を追加する。アプリケーションはデータの重複が発生する可能性や一時的に利用できなくなることを許容できる。
- 集約分析→単一・限定的なレコード選択において、クエリは主に流行分析のために実行される。
2.リアルタイムなダッシュボードやクエリ
ある状況下において、BigQueryへのStreamがトランザクショナルなデータをリアルタイムに分析することを可能にしています。ストリーミングデータの重複がありうるので、BigQueryの外側にTransactionalなDatastoreがあるか確認してください。
また、最新のビューを持つように、トランザクショナルなデータの分析が出来ることを保証するために、いくつかの予防措置をとることができる。
- 同一のスキーマをもつ2つのテーブルを作成します。1つ目のテーブルは調整済みデータ用、2つ目のテーブルはリアルタイムの未調整データ用です。
- クライアント側では、レコードのトランザクションデータストアを維持します。
- 2.のレコードについてはinsertAll()リクエストを打ちっ放します。insertAll()リクエストはあて先のテーブルとして1.の未調整データ用テーブルをリアルタイムに指定する必要があります。
- ある程度の間隔で、トランザクションデータストアから調整済みデータを追加し、未調整データ用テーブルを全データ削除します。
- このユースケースでは、両方のテーブルからデータを選択することができます。未調整のデータテーブルは重複やレコードの取りこぼしが含まれる可能性があります。
Stream APIの実装
では実際にBigQueryにテーブルを作成し、stream insertを実行してみましょう。
予め、Projectを作成し、BigQueryにテーブルを挿入するためのdatasetを作成しておいてください。
まず、テーブルを作成します。
// テーブルスキーマの設定
Map<String, String> schemata = new HashMap<>();
schemata.put("schema1", "STRING");
schemata.put("schema2", "STRING");
schemata.put("schema3", "STRING");
schemata.put("schema4", "STRING");
// テーブルの作成
TableSchema schema = new TableSchema();
List tableFieldSchema = new ArrayList<>();
for (Map.Entry<String, String> entry : schemata.entrySet()) {
TableFieldSchema schemaEntry = new TableFieldSchema();
schemaEntry.setName(entry.getKey());
schemaEntry.setType(entry.getValue());
tableFieldSchema.add(schemaEntry);
}
schema.setFields(tableFieldSchema);
Table table = new Table();
table.setSchema(schema);
TableReference tableRef = new TableReference();
tableRef.setProjectId(projectId); //プロジェクト名
tableRef.setDatasetId(datasetId); //dataset名
tableRef.setTableId(tableId); //テーブル名
table.setTableReference(tableRef);
bigquery.tables().insert(projectId, datasetId, table).execute();
この結果、指定したprojectのdatasetにスキーマとしてschema1?4を持った、テーブルが作成されていると思います。
次に今作成したテーブルにデータをinsertしてみます
TableRow row = new TableRow();
row.set("Schema1", "テスト");
row.set("Schema2", "テスト");
row.set("Schema3", "テスト");
row.set("Schema4", "テスト");
TableRow row2 = new TableRow();
row2.set("Schema1", "テスト2");
row2.set("Schema2", "テスト2");
row2.set("Schema3", "テスト2");
row2.set("Schema4", "テスト2");
TableDataInsertAllRequest.Rows rows = new TableDataInsertAllRequest.Rows();
Timestamp timestamp = new Timestamp(System.currentTimeMillis());
rows.setInsertId(String.valueOf(timestamp));
rows.setJson(row);
Thread.sleep(1000);
TableDataInsertAllRequest.Rows rows2 = new TableDataInsertAllRequest.Rows();
Timestamp timestamp2 = new Timestamp(System.currentTimeMillis());
rows2.setInsertId(String.valueOf(timestamp2));
rows2.setJson(row2);
rowList.add(rows);
rowList.add(rows2);
注意点としては、BigQueryで対応している型はSTRINGやINTEGER、FLOAT等のみ(※参照)と数が少ないので、対応していないLongやDate型をinsertしようとしても、String等に変換しないとinsertに失敗してしまいます。また、データの整合性の章で説明したように各行にはInsertIdをセットする必要があるのですが、このIdが同じだとBigQueryの方で同じ行であると判断し、元のデータが更新されてしまいますので、この各行のInsertIdがそれぞれUniqueになっていることも合わせてご確認下さい。
結果、以下のようなデータがテーブルにinsertされます。
まとめ
今回はStreamを使ってBigQueryにデータをロードする方法について解説しました。BigQueryはそのパフォーマンスや料金の安さから、例えばアクセスログやアプリケーションログをFluentd経由で送っておいて、分析に活用している企業も徐々に増えているようです。個人でも決して手が届かない金額ではないですし(GCP関連イベントでは$500分使えるクーポンが貰えることも!!)、個人で運用しているアプリケーション等の分析にBigQueryを使ってみてはいかかでしょうか?
決済がカードだと業務では…という場合のために、決済代行サービスもあります。