サイトをリニューアルいたしました。(6/1)

Dataflow を試してみる

データ処理を簡単に実装できる Dataflow

Cloud Dataflow(以下 Dataflow) は、GCP(Google Cloud Platfom) にて提供される、ストリームデータ処理、およびバッチデータ処理を簡素化するフルマネージドサービスです。つまり、利用者はサーバなどインフラの運用を気にすることなく、データを加工する処理を簡単に実装できるようになります。更に、膨大な量のデータを処理する場合でもオートスケールされるので、管理の手間がかからずリソース不足の心配もありません。
本記事ではDataflowを初めて知るという方向けにサービスの概要からどのようなものかなどを概説します。

【CLOUD DATAFLOW】
https://cloud.google.com/dataflow/?hl=ja

Dataflow では、処理を Java、または Python で実装する必要があります(ただし、Python ではストリーミングデータ処理が DirectRunner のみなどの条件付きでのサポートになります(2018年5月時点))。また、2017年5月から Cloud Dataflow SDK が 2.0.0 にバージョンアップされ、Apache Beam をベースとしたコード体系となっています。
Apache Beam では、バッチ処理、ストリーミング処理の両方を実装でき、また、Apache Beam で実装することによって、Dataflow だけではなく Apache Flink、Apache Spark など他のプラットフォームでも実行可能になるので可搬性が高まります。更に、Cassandra、Hadoop、MongoDB など GCP 以外のシステムへの変換にも対応され、I/O の機能面が充実しています。このようなメリットから、データ処理のプログラミングにおいて Apache Beam は注目されています。

【なぜ Apache Beam なのか】
https://cloudplatform-jp.googleblog.com/2016/05/apache-beam-dataflow.html

【Apache Beam Programming Guide】
https://beam.apache.org/documentation/programming-guide/

【Built-in I/O Transforms】
https://beam.apache.org/documentation/io/built-in/

本記事では、Dataflow SDK 2.3 を使用して Java プログラムを作成し、Pub/Sub で受信したメッセージを Datastore へ書き込むまでを試したいと思います。

構成としては以下のような、赤枠部分のデータの流れを Dataflow にて実装します。

このケースは例えば、スマホやパソコンなどのユーザの端末から何らかのメッセージを受け取って、クラウド側でデータ処理を行ない、DataStoreに書き込むということを想定しています。

Dataflow 実行環境を構築する

Dataflow を実行するには、GCP上では、

  • 使用する API の有効化
  • GCS バケットの作成
  • サービスアカウントの設定

が必要になります。また、ローカルPC では

  • Google Cloud SDK
  • Java
  • Maven

のインストールが必要になります。また、Dataflow の実行に直接必要となるものではありませんが、本記事では Pub/Sub メッセージの送信プログラムも実行するため、それに必要なサービスアカウント秘密鍵ファイルの手順についても以下に説明していきます。
なお、以下で説明するローカルPC 環境は Mac OSX High Sierra にて確認しています。

GCP での実行環境構築

それではまず、GCP での構築を進めていきましょう。

GCP プロジェクトの作成

GCP プロジェクトを作成するには、Google Cloud Console [https://console.cloud.google.com/] にアクセスし、Google アカウントでログインします(Google アカウントを持っていない場合は作成します)。

まだプロジェクトを作成していない場合は作成します(既に作成していて新しく作成する場合は、画面上部のプロジェクト名をクリックして開いたウィンドウの ”+”ボタンから作成します)。
プロジェクトが作成され、GCP が使用可能であれば下図のようにプロジェクト情報などが確認できます。

API の有効化

Dataflow を実行するために使用する API を有効にするため、Google Cloud Console の左のメニューから [API とサービス] の [ダッシュボード] を開いて、[API とサービスの有効化] をクリックします。

下のような画面が開くので、

以下のサービスを検索して有効になっていることを確認します。

Cloud Dataflow
Compute Engine
Stackdriver Logging
Google Cloud Storage
Google Cloud Storage JSON
Google Cloud Datastore
Google Cloud Resource Manager API

また、以下のサービスを検索して有効にします。

Google Cloud Pub/Sub

GCS バケットの作成

Dataflow を実行する際に必要なファイルを格納するため、GCS に保管場所が必要となります。新規に作成する場合は、Google Cloud Console の左のメニューから [ストレージ] – [Storage] を開いて作成できます。ここで、バケット名は世界でユニークである必要があるので、入力してみて使用可能なバケット名を確認しながら作成してください。

↑既に使用されている場合はメッセージが表示されます。

IAM の設定

Dataflow プログラムを実行する際は裏で GCE のインスタンスを利用するので、GCE のサービスアカウント(xxxxxxxxx-compute@developer.gserviceaccount.com) に対する権限付与が必要となります。ただし注意点として、今回は簡単のためにデフォルトのサービスアカウントへ設定しますが、最小限の権限を与えられた新しいサービスアカウントを作成して使用することが推奨されています。
Google Cloud Console の左のメニューから [IAM と管理] – [サービスアカウント] を開いて、今回作成するプログラムの実行に必要な権限として、IAM で GCE サービスアカウントに以下の権限を付与してください。

  • ストレージ – ストレージのオブジェクト管理者
  • Dataflow – Dataflow ワーカー
  • Pub/Sub – Pub/Sub 編集者
  • Datastore – Cloud Datastore ユーザー

また、Dataflow に関して必要となる権限は以下のサイトが参考になります。

【Cloud Dataflow アクセス制御ガイド】
https://cloud.google.com/dataflow/access-control?hl=ja

サービスアカウントの作成

Dataflow実行、および Pub/Subメッセージ送信を行なうサービスアカウントを作成します(役割としては分けるべきですが、簡単のために 1つのサービスアカウントとして作成します)。サービスアカウントを作成するには Google Cloud Console 左メニューの [IAM と管理] – [サービスアカウント] から、[サービスアカウントを作成] をクリックします。任意のサービスアカウント名を設定して役割に以下のものを設定します。

  • プロジェクト – 参照者
  • Dataflow – Dataflow 管理者
  • Datastore – Cloud Datastore ユーザー
  • Pub/Sub – Pub/Sub 編集者
  • ストレージ – ストレージ管理者

また、秘密鍵のファイルも必要となるので、[新しい秘密鍵の提供] をチェックして [作成] をクリックしてください。作成した秘密鍵ファイルはローカルPC で使用します。

ローカルPC での実行環境構築

続いて、ローカルPC の実行環境の構築を進めていきます。

Google Cloud SDK のインストール

Google Cloud SDK を使用するには Python 2.7 が必要となります。Mac OSX であればデフォルトでインストールされていますが、ターミナルから以下のコマンドを実行して確認し、インストールされていなかったりバージョンが 3 以上となっていた場合は Python 公式サイトなどを参照してインストールしましょう。

コード
python --version

それでは Google Cloud SDK のインストールになりますが、パッケージは以下のサイトからダウンロードします。
https://cloud.google.com/sdk/docs/?hl=ja#mac

ダウンロードできたら任意の場所で解凍し、シェルを実行して path を設定します。

コード
$ tar zxvf google-cloud-sdk-192.0.0-darwin-x86_64.tar.gz
$ ./google-cloud-sdk/install.sh

変更が反映されるようにターミナル ウィンドウを開き直し、gcloud コマンドで SDK を初期化します。

コード
$ gcloud init

Java のインストール

JDK は 1.8 が必要となりますので、以下のサイトからダウンロードし、インストールします。
http://www.oracle.com/technetwork/java/javase/downloads/jdk8-downloads-2133151.html

Maven のインストール

Java 用のプロジェクト管理ツールである Maven のインストールには、公式サイトの手順のように圧縮ファイルをダウンロード、解凍して path を通す方法と、Homebrew を使用してインストールする方法があります。以下では Homebrew を使用した方法を示します。
Homebrew をインストールするにはターミナルから以下のコマンドを実行します。

コード
$ /usr/bin/ruby -e "$(curl -fsSL https://raw.githubusercontent.com/Homebrew/install/master/install)"

Homebrew がインストールできたら、以下のコマンドで Maven をインストールします。

コード
$ brew install maven

秘密鍵ファイルの環境変数への設定

GCP で作成してダウンロードした秘密鍵ファイルを、以下のように環境変数 GOOGLE_APPLICATION_CREDENTIALS に設定します。

コード
$ vi ~/.bash_profile
などで設定ファイルの編集を開始し、
コード
export GOOGLE_APPLICATION_CREDENTIALS=”ダウンロードした秘密鍵のファイルパス”

を追記、保存した後、

コード
$ source ~/.bash_profile
で反映させます。

Dataflow プログラムを作成する

実行環境構築が終わったら、Dataflow プログラムを作成していきましょう!
実行環境の構成は以下のようになります。まず Dataflow実行クラスが Dataflow のジョブを開始してメッセージ受信可能状態としておきます。その状態で、メッセージ送信クラスが Pub/Sub へメッセージを送信することで Dataflow のジョブが実行され、Datastore へのデータ書き込みまで処理が流れます。

プログラムを作成すると最終的なディレクトリ構成は以下のようになります。これらのプログラムについて説明していきます。

<任意のフォルダ>
├─pom.xml
└─src
 └─main
  └─java
   └─ca
    └─test
     └─dataflow
      ├─PubsubToDatastore.java
      ├─MemoryToPubsub.java
      ├─PortableConfiguration.java
      └─RetryHttpInitializerWrapper.java

Maven 設定ファイルの作成

Maven 設定ファイル pom.xml では以下のように設定します。Dataflow SDK のバージョンは 2.3 を使用しています。

pom.xml
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
   xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
   <modelVersion>4.0.0</modelVersion>
   <groupId>ca.test.dataflow</groupId>
   <artifactId>ca-test-dataflow</artifactId>
   <version>0.0.1-SNAPSHOT</version>
   <properties>
       <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
   </properties>
   <build>
       <plugins>
           <plugin>
               <groupId>org.apache.maven.plugins</groupId>
               <artifactId>maven-compiler-plugin</artifactId>
               <version>3.7.0</version>
               <configuration>
                   <source>1.8</source>
                   <target>1.8</target>
               </configuration>
           </plugin>
           <plugin>
               <groupId>org.apache.maven.plugins</groupId>
               <artifactId>maven-eclipse-plugin</artifactId>
               <configuration>
                   <downloadSources>true</downloadSources>
                   <downloadJavadocs>true</downloadJavadocs>
               </configuration>
           </plugin>
       </plugins>


       <pluginManagement>
           <plugins>
               <plugin>
                   <groupId>org.codehaus.mojo</groupId>
                   <artifactId>exec-maven-plugin</artifactId>
                   <version>1.4.0</version>
                   <configuration>
                       <cleanupDaemonThreads>false</cleanupDaemonThreads>
                   </configuration>
               </plugin>
           </plugins>
       </pluginManagement>
   </build>


   <dependencies>
       <dependency>
           <groupId>org.apache.beam</groupId>
           <artifactId>beam-runners-google-cloud-dataflow-java</artifactId>
           <version>2.3.0</version>
       </dependency>
       <dependency>
           <groupId>org.apache.beam</groupId>
           <artifactId>beam-sdks-java-core</artifactId>
           <version>2.3.0</version>
       </dependency>
       <dependency>
           <groupId>org.apache.beam</groupId>
           <artifactId>beam-runners-direct-java</artifactId>
           <version>2.3.0</version>
           <scope>runtime</scope>
       </dependency>
       <dependency>
           <groupId>org.apache.beam</groupId>
           <artifactId>beam-sdks-java-io-google-cloud-platform</artifactId>
           <version>2.3.0</version>
       </dependency>
       <dependency>
           <groupId>org.slf4j</groupId>
           <artifactId>slf4j-jdk14</artifactId>
           <version>1.7.25</version>
       </dependency>
   </dependencies>
</project>

Java プログラムの実装

Dataflow 実行クラス

Apache Beam をベースとして実装することになりますが、具体的には、Pipeline クラスを作成し、apply にてデータの I/O や変換処理をつなげていきます。
今回作成する以下のコードでは PubsubToDatastore クラスを定義し、execute メソッド内にて Pipeline を実装しています。Pipeline では、

  1. input として Pub/Sub メッセージ(String) を取得
  2. 取得したメッセージの編集と Datastore のデータ形式(Entity) への変換
  3. Datastore へ書き込み(output)

という処理の流れになります。

PubsubToDatastore.java
package ca.test.dataflow;


import static com.google.datastore.v1.client.DatastoreHelper.makeKey;
import static com.google.datastore.v1.client.DatastoreHelper.makeValue;


import java.time.LocalDateTime;
import java.time.ZoneId;
import java.time.ZonedDateTime;
import java.time.format.DateTimeFormatter;


import org.apache.beam.runners.dataflow.options.DataflowPipelineOptions;
import org.apache.beam.sdk.Pipeline;
import org.apache.beam.sdk.io.gcp.datastore.DatastoreIO;
import org.apache.beam.sdk.io.gcp.pubsub.PubsubIO;
import org.apache.beam.sdk.options.PipelineOptionsFactory;
import org.apache.beam.sdk.transforms.DoFn;
import org.apache.beam.sdk.transforms.PTransform;
import org.apache.beam.sdk.transforms.ParDo;
import org.apache.beam.sdk.values.PCollection;


import com.google.datastore.v1.Entity;
import com.google.datastore.v1.Key;


/**
* PubsubToDatastore
*/
@SuppressWarnings("serial")
public class PubsubToDatastore {
  
   private static String TIME_FORMAT = "yyyy-MM-dd HH:mm:ss.SSS";
   private static String TIME_ZONE = "Asia/Tokyo";
   private static DateTimeFormatter dtFormatter = DateTimeFormatter.ofPattern(TIME_FORMAT);


   static class AddDatetimeFn extends DoFn {
      
       private String toNamespace;
       private String toKind;
       private String toNamePrefix;
      
       public AddDatetimeFn(String toNamespace, String toKind, String toNamePrefix) {
          
           this.toNamespace = toNamespace;
           this.toKind = toKind;
           this.toNamePrefix = toNamePrefix;
       }


       @ProcessElement
       public void processElement(ProcessContext c) {
          
           // Pub/Sub のメッセージ String をカンマで分割して各カラムデータを取得します
           String[] columns = c.element().split(",");
          
           Entity.Builder entityBuilder = Entity.newBuilder();
          
           Integer jobId = Integer.parseInt(columns[0].trim());
           Integer rowId = Integer.parseInt(columns[1].trim());
           Key.Builder keyBuilder = makeKey(toKind, String.format("%s-%04d-%04d", toNamePrefix, jobId, rowId));
           if (toNamespace != null) {
               keyBuilder.getPartitionIdBuilder().setNamespaceId(toNamespace);
           }
           Integer wordNum = columns[2].trim().split(" ").length;
           String importDtStr = columns[3].trim();
           ZonedDateTime importDt = LocalDateTime.parse(importDtStr, dtFormatter).atZone(ZoneId.of(TIME_ZONE));
          
           // 現在の日時を取得して、追加するカラムデータを作成します
           ZonedDateTime updateDt = ZonedDateTime.now(ZoneId.of(TIME_ZONE));


           // 各カラムデータを Entity クラスの property に put します
           entityBuilder.setKey(keyBuilder.build());
           entityBuilder.putProperties("rowId", makeValue(rowId).build());
           entityBuilder.putProperties("sentence", makeValue(columns[2].trim()).build());
           entityBuilder.putProperties("wordNum", makeValue(wordNum).build());
           entityBuilder.putProperties("importTimestamp", makeValue(dtFormatter.format(importDt)).build());
           entityBuilder.putProperties("updateTimestamp", makeValue(dtFormatter.format(updateDt)).build());
          
           // 生成した Entity クラスを output に格納して返します
           c.output(entityBuilder.build());
       }
   }


   public class TransformData extends PTransform, PCollection> {
      
       private PubsubToDatastoreOptions options;
      
       public TransformData(PubsubToDatastoreOptions options) {
           this.options = options;
       }


       @Override
       public PCollection expand(PCollection line) {
           // Functionクラスを使用して各Pub/Subメッセージを Entity へ変換します
           PCollection rows = line.apply(ParDo.of(
                   new AddDatetimeFn(options.getToNamespace(), options.getToKind(), options.getToNamePrefix())));


           return rows;
       }
   }
  
   public interface PubsubToDatastoreOptions extends DataflowPipelineOptions {


       // DataflowPipelineOptions を継承してクラス特有の引数を追加します
       String getFromTopic();
       void setFromTopic(String fromTopic);
      
       String getToNamespace();
       void setToNamespace(String toNamespace);
      
       String getToKind();
       void setToKind(String toKind);
      
       String getToNamePrefix();
       void setToNamePrefix(String toNamePrefix);
   }


   public void execute(String[] args) {
      
       // 引数を渡して pipeline 実行時の設定クラスを作成します
       PubsubToDatastoreOptions options = PipelineOptionsFactory.fromArgs(args)
               .withValidation().as(PubsubToDatastoreOptions.class);
       options.setStreaming(true);


       Pipeline pipeline = Pipeline.create(options);


       pipeline
           // Pub/Sub topic からメッセージを取得します
           .apply("read from Pubsub", PubsubIO.readStrings().fromTopic(options.getFromTopic()))
           // Pub/Sub メッセージの Stringデータを Datastore の Entity に変換します
           .apply("transform Pubsub data", new TransformData(options))
           // 変換した Entity を Datastore へ書き込みます
           .apply("write to Datastore", DatastoreIO.v1().write().withProjectId(options.getProject()));


       // 構築した pipeline を実行します
       pipeline.run().waitUntilFinish();
   }
  
   public static void main(String[] args) {
      
       PubsubToDatastore pubsubToDatastore = new PubsubToDatastore();
       pubsubToDatastore.execute(args);
   }
}

※Datastore への書き込みに関する注意点として、DatastoreIO.v1().write() を使用して 1回の書き込みで同時に同一キーを持つ複数のエンティティを書き込もうとするとエラーとなるので、その対策が必要となります。今回作成するコードは Dataflow の実行が目的のため、重複キー対策について考慮していませんのでご了承ください。

Pub/Sub メッセージ送信クラス

上述の PubsubToDatastore クラスでは Pub/Sub メッセージを取得しますが、その機能を確認するために Pub/Sub メッセージを送信する必要があります。送信方法は任意で良いですが、今回は java で送信クラスを以下のように作成して送信します。Pub/Sub メッセージ送信は以下のクラス(MemoryToPubsub) にて実行します。

MemoryToPubsub.java
package ca.test.dataflow;


import ca.test.dataflow.PortableConfiguration;


import com.google.api.services.pubsub.Pubsub;
import com.google.api.services.pubsub.model.PublishRequest;
import com.google.api.services.pubsub.model.PublishResponse;
import com.google.api.services.pubsub.model.PubsubMessage;
import com.google.common.collect.ImmutableList;


import org.apache.beam.sdk.options.Default;
import org.apache.beam.sdk.options.PipelineOptions;
import org.apache.beam.sdk.options.PipelineOptionsFactory;


import java.io.IOException;
import java.time.ZoneId;
import java.time.ZonedDateTime;
import java.time.format.DateTimeFormatter;
import java.util.ArrayList;
import java.util.List;
import java.util.Optional;


/**
* MemoryToPubsub
*/
public class MemoryToPubsub {


   private static String TIME_FORMAT = "yyyy-MM-dd HH:mm:ss.SSS";
   private static String TIME_ZONE = "Asia/Tokyo";
   private static DateTimeFormatter dtFormatter = DateTimeFormatter.ofPattern(TIME_FORMAT);
  
   public class PubsubJob {
      
       private String toTopic;
       private Integer jobId;
       private Integer rowNum;
      
       public PubsubJob(String toTopic, int jobId, int rowNum) {
          
           this.toTopic = toTopic;
           this.jobId = jobId;
           this.rowNum = rowNum;
       }
  
       public void publishData() throws IOException {
          
           Pubsub pubsub = PortableConfiguration.createPubsubClient();
           for (int i=1; i<=rowNum; i++) {
               String rowId = String.valueOf(i);
               // Pub/Sub へ送信するメッセージを作成
               // 例: "1,1,JobId: 1  rowId: 1 no test desu,2018-05-01 14:02:38.420"
               String message = jobId + "," + rowId + ",JobId: " + jobId + "  rowId: " + rowId
                       + " no test desu," + dtFormatter.format(ZonedDateTime.now(ZoneId.of(TIME_ZONE)));
              
               PubsubMessage pubsubMessage = new PubsubMessage();
               pubsubMessage.encodeData(message.getBytes("UTF-8"));
               List messages = ImmutableList.of(pubsubMessage);
               PublishRequest publishRequest = new PublishRequest().setMessages(messages);
              
               PublishResponse publishResponse = pubsub.projects().topics()
                       .publish(toTopic, publishRequest)
                       .execute();
           }
       }
      
       public void doJob() {
          
           try {
               publishData();
           } catch (IOException e) {
               e.printStackTrace();
           }
       }
   }
  
   public interface MemoryToPubsubOptions extends PipelineOptions {
      
       // 送信先のトピック("projects/<プロジェクト名>/topics/<トピック名>")
       String getToTopic();
       void setToTopic(String toTopic);
      
       // 並列処理job数
       @Default.Integer(1)
       Integer getJobNum();
       void setJobNum(Integer jobNum);
      
       // job あたりのデータ件数
       @Default.Integer(1)
       Integer getRowNum();
       void setRowNum(Integer RowNum);
   }
  
   public void execute(String[] args) {
      
       MemoryToPubsubOptions options = PipelineOptionsFactory.fromArgs(args).as(MemoryToPubsubOptions.class);
      
       String toTopic = options.getToTopic();
       int jobNum = options.getJobNum();
       int rowNum = options.getRowNum();


       List jobs = new ArrayList();
       for (int i=1; i<=jobNum; i++) {
           jobs.add(new PubsubJob(toTopic, i, rowNum));
       }
       jobs.parallelStream().forEach(x -> {
           x.doJob();
       });
   }


   public static void main(String[] args) {
      
       MemoryToPubsub memoryToPubsub = new MemoryToPubsub();
       memoryToPubsub.execute(args);
   }
}

また、上記のプログラムでは Pub/Sub を使用するために以下のクラスを利用していますので、

  • PortableConfiguration クラス
  • RetryHttpInitializerWrapper クラス

以下のサイトから Java コードをコピーして Java ファイルを作成してください。

【Cloud Pub/Sub アプリケーションの設定】
https://cloud.google.com/pubsub/configure?hl=ja

Dataflow プログラムを実行する

Dataflow の実行

それでは作成した Dataflow プログラムを実行してみましょう!
プログラムを実行する際のコマンドは以下のようになります。ここで、<>で記載されている箇所は各自の実行環境にて読み替えてください。

コード
mvn compile exec:java \
   -Dexec.mainClass=ca.test.dataflow.PubsubToDatastore \
   -Dexec.args=" \
   --project=<プロジェクト名> \
   --stagingLocation=gs://<バケット名>/staging \
   --autoscalingAlgorithm=NONE \
   --numWorkers=1 \
   --fromTopic=projects/<プロジェクト名>/topics/<トピック名> \
   --toNamespace= \
   --toKind= \
   --toNamePrefix= \
   --runner=DataflowRunner"

コマンド実行時のオプションの説明は以下のサイトになります。

【パイプラインの実行パラメータを指定する】
https://cloud.google.com/dataflow/pipelines/specifying-exec-params?hl=ja

また、今回作成したクラスでは以下のオプションを追加しました。

オプション 設定内容
fromTopic 受信するトピック名
toNamespace 出力先Datastore の Namespace
toKind 出力先Datastore の Kind
toNamePrefix 出力先Datastore の Name のプレフィックス

今回は処理が重くはないため、“–autoscalingAlgorithm=NONE”、”–numWorkers=1” を指定してオートスケールさせずに worker を 1つに限定しています。重い処理であれば、numWorkers と autoscalingAlgorithm の引数は無しにしてオートスケールを有効(デフォルト) にしたり、numWorkers の固定値を増やしたりします。ただし、ストリーミング自動スケーリングはベータ版になります(2018年5月時点)。

Pub/Sub メッセージの送信

上述の Dataflow プログラムを実行すると Dataflow のジョブが開始され、 Pub/Sub のメッセージを受け取れる状態になります。それでは、このジョブへ Pub/Sub メッセージを送信してみましょう。送信するには先ほど作成した Pub/Sub メッセージ送信プログラムを実行します。
実行コマンドは以下になります。

コード
mvn -e compile exec:java \
   -Dexec.mainClass=ca.test.dataflow.MemoryToPubsub \
   -Dexec.args=" \
   --toTopic=projects/<プロジェクト名>/topics/<トピック名> \
   --jobNum=1 --rowNum=1"

Dataflow ジョブの実行状況を確認する

Dataflow のジョブの実行状況は Google Cloud Console で確認することができます。Google Cloud Console の左のメニューから [Bigdata] – [Dataflow] を開いてみましょう。先程実行したプログラムに問題がなければ以下のような画面が確認できると思います。

名前のリンクをクリックするとジョブ内の各ステップのフロー図が確認できます。

更に、各ステップのセルをクリックすると詳細が右のペインで確認できます。

上の画像では「追加された要素数」が 2 となっているので、2件のデータが Datastore へ書き込まれたことがわかります。
実際に Datastore の管理画面を開くと、以下のように追加されたことが確認できると思います。

実行したジョブを終了するには、Google Cloud Console の Dataflow ジョブ画面から [ジョブを中止] をクリックします。ここで、ジョブを停止する際のダイアログにて、直ちに終了する場合は [キャンセル]、バッファ内のデータまで処理する場合は [ドレイン] を選択することができます。

まとめ

いかがでしたでしょうか? 本記事では Java で Dataflow のプログラムを作成し、Pub/Sub のメッセージデータを加工して Datastore へ書き込むまでを実行してみました。今回は Pub/Sub → Datastore のフローでしたが、Dataflow では他にも GCS、BigQuery、BigTable、Spanner など他の GCP サービスを利用したフローも作成することができます。次回はそれらの GCP サービスを利用したプログラムを紹介したいと思います。