資料

アーキテクチャ,用語

アーキテクチャ

上図は モバイルゲーム解析プラットフォームの作成 - リファレンス アーキテクチャ | ソリューション | Google Cloud より引用.

  • Message: サービスを通って移動するデータ
    • Message が subscriber に届いたことが確認されると message は削除される
  • Topic: message のフィードを示す名前付きエンティティ
    • Topic は publisher から送信された message を公開する
  • Subscription: 特定の topic に関する message の受信に関心があることを示す名前付きエンティティ
    • Subscription は topic に公開された message を subscriber に配信する
    • Subscription は message 配信後は ack_deadline_seconds が経過するまで message が再配信されないようにする
    • Subscription は message 配信後に subscriber からの ACK を待ち受ける
    • Subscription は ack_deadline_seconds 以内に subscriber から ACK を受信すると message を削除する
    • Subscription は ack_deadline_seconds 以内に subscriber から ACK を受信できなければ message を再配信する
  • Publisher: 特定の topic に関する message を作成して送信する
  • Subscriber: 特定の subscription に関する message を受信する
    • Subscriber は message の受信に成功したら subscription に ACK を返す

Message は subscription から subscriber へ at-least-once で配信される.つまり,同じ message が重複して受信される可能性がある.重複を防ぐためには Cloud Dataflow の利用が推奨されており Apache Beam (Java, Python) を使用する必要がある.

Ruby から Cloud Pub/Sub を利用する

実際に手を動かして理解を深める.

例題

定期的に Cloud Storage (GCS) のバケットにオブジェクト (ファイル,ディレクトリ) が作成される.新しく作成されるオブジェクトのうち,ファイルに対して 1 回だけ何かしら処理を行いたい.ディレクトリに対しては何も行わない.

設計

アーキテクチャ

Publisher

GCS バケットが publisher となる. GCS はバケット内で発生したイベントを Pub/Sub の topic に送信できる.今回は GCS にオブジェクトが作成されたことをトリガーとして処理を行いたいため, OBJECT_FINALIZE イベントを topic に送信する.

OBJECT_FINALIZE

バケットで新しいオブジェクト(または既存オブジェクトの新しい世代)が正常に作成された場合に送信されます。既存のオブジェクトをコピーまたは再作成した場合にも送信されます。アップロードが失敗した場合、このイベントはトリガーされません。

オブジェクトにはファイルとディレクトリの双方が含まれることに注意する.今回はファイルだけを処理したいため,イベントのトリガーがファイルなのかディレクトリなのかを subscriber 側で判断し,ファイルだけを処理の対象とするように条件分岐する.

Subscription / Subscriber

リアルタイム性が求められる処理ではないため Pull 配信とする. Subscriber は定期的に subscription を pull して message を受信する.同じファイル (message) に対して重複して処理が行われないようにするため, subscriber は subscription から正常にメッセージを受信できた場合は ack_deadline_seconds 以内に ACK を返す (message を削除する) 必要がある.

実装

共通部分

require "json"
require "google/cloud/storage"
require "google/cloud/pubsub"

keyfile    = "gcp.json"
project_id = "your-project"
bucket_id  = "your-bucket"
topic_name = "your-topic"
subscription_name = "subscription-pull-test"

storage = Google::Cloud::Storage.new project_id: project_id, credentials: keyfile
bucket  = storage.bucket bucket_id

pubsub = Google::Cloud::Pubsub.new project_id: project_id, credentials: keyfile

Topic 作成

Pub/Sub の topic を作成する.この topic 宛に publisher からイベントが通知 (message が送信) されるようにする.

pubsub.create_topic topic_name
=> #<Google::Cloud::Pubsub::Topic:0x00007fb1e7c35208
 @async_opts={},
 @exists=nil,
 @grpc=
  <Google::Pubsub::V1::Topic: name: "projects/your-project/topics/your-topic", labels: {}, message_storage_policy: nil>,
 @lazy=nil,
 @service=Google::Cloud::Pubsub::Service(your-project)>

Publisher (GCS bucket notification) 作成

GCS バケットのイベントを topic に通知するために notification を作成する. event_types: :finalize を指定すると OBJECT_FINALIZE イベントのみが通知される.

bucket.create_notification topic_name, event_types: :finalize, payload: :json
=> #<Google::Cloud::Storage::Notification:0x00007fb1e90dce40
 @bucket="your-bucket",
 @gapi=
  #<Google::Apis::StorageV1::Notification:0x00007fb1e90de5b0
   @etag="1",
   @event_types=["OBJECT_FINALIZE"],
   @id="1",
   @kind="storage#notification",
   @payload_format="JSON_API_V1",
   @self_link="https://www.googleapis.com/storage/v1/b/your-bucket/notificationConfigs/1",
   @topic="//pubsub.googleapis.com/projects/your-project/topics/your-topic">,
 @service=Google::Cloud::Storage::Service(your-project),
 @user_project=nil>

Subscription 作成

Topic に公開された message を受信するために subscription を作成する. Subscription はデフォルトで pull 配信として作成される. deadline で ack_deadline_seconds (デフォルト 10, 最長 600) を指定できる. ack_deadline_seconds は subscriber 側の処理に要する時間 (ACK を返せるようになるまでの時間) と相談して決める.

topic  = pubsub.topic topic_name
topic.subscribe subscription_name, deadline: 600
=> #<Google::Cloud::Pubsub::Subscription:0x00007fb1ea878770
 @exists=nil,
 @grpc=
  <Google::Pubsub::V1::Subscription: name: "projects/your-project/subscriptions/subscription-pull-test", topic: "projects/your-project/topics/your-topic", push_config: <Google::Pubsub::V1::PushConfig: push_endpoint: "", attributes: {}>, ack_deadline_seconds: 600, retain_acked_messages: false, message_retention_duration: <Google::Protobuf::Duration: seconds: 604800, nanos: 0>, labels: {}>,
 @lazy=nil,
 @service=Google::Cloud::Pubsub::Service(your-project)>

Message 受信

まず, GCS バケットにオブジェクトを作成して topic に message を公開する.バケットのオブジェクトとして適当にディレクトリとファイルを作成する.バケット直下に pubsub-test-dir というディレクトリを作成し,このディレクトリの中に pubsub-test-file というファイルを作成する.

これで topic に message が公開された.次に, subscription から message を pull する.

subscription = pubsub.subscription subscription_name
messages = subscription.pull
=> [#<Google::Cloud::Pubsub::ReceivedMessage:0x00007fb1e7d9a260
  @grpc=
   <Google::Pubsub::V1::ReceivedMessage: ack_id: "XkASTDYMRElTK0MLKlgRTgQhIT4wPkVTRFAGFixdRkhRNxkIaFEOT14jPzUgKEUSC1MTUVx1BFgQaV8zdQdRDRlze2QjO10RCQVNUnRfURsfWVx-SgVRBBB7dWd9bVobAQBCWlaL9ozp5apDZhs9XBJLLD5-PTlFQQ", message: <Google::Pubsub::V1::PubsubMessage: data: "{\n  \"kind\": \"storage#object\",\n  \"id\": \"your-bucket/pubsub-test-dir/pubsub-test-file/1537505134193603\",\n  \"selfLink\": \"https://www.googleapis.com/storage/v1/b/your-bucket/o/pubsub-test-dir%2Fpubsub-test-file\",\n  \"name\": \"pubsub-test-dir/pubsub-test-file\",\n  \"bucket\": \"your-bucket\",\n  \"generation\": \"1537505134193603\",\n  \"metageneration\": \"1\",\n  \"contentType\": \"application/octet-stream\",\n  \"timeCreated\": \"2018-09-21T04:45:34.193Z\",\n  \"updated\": \"2018-09-21T04:45:34.193Z\",\n  \"storageClass\": \"STANDARD\",\n  \"timeStorageClassUpdated\": \"2018-09-21T04:45:34.193Z\",\n  \"size\": \"0\",\n  \"md5Hash\": \"1B2M2Y8AsgTpgAmY7PhCfg==\",\n  \"mediaLink\": \"https://www.googleapis.com/download/storage/v1/b/your-bucket/o/pubsub-test-dir%2Fpubsub-test-file?generation=1537505134193603&alt=media\",\n  \"crc32c\": \"AAAAAA==\",\n  \"etag\": \"CMPX9Pqky90CEAE=\"\n}\n", attributes: {"payloadFormat"=>"JSON_API_V1", "eventTime"=>"2018-09-21T04:45:34.193219Z", "objectId"=>"pubsub-test-dir/pubsub-test-file", "objectGeneration"=>"1537505134193603", "eventType"=>"OBJECT_FINALIZE", "notificationConfig"=>"projects/_/buckets/your-bucket/notificationConfigs/1", "bucketId"=>"your-bucket"}, message_id: "209998687080279", publish_time: <Google::Protobuf::Timestamp: seconds: 1537505134, nanos: 492000000>>>,
  @subscription=
   #<Google::Cloud::Pubsub::Subscription:0x00007fb1ea878770
    @exists=nil,
    @grpc=
     <Google::Pubsub::V1::Subscription: name: "projects/your-project/subscriptions/subscription-pull-test", topic: "projects/your-project/topics/your-topic", push_config: <Google::Pubsub::V1::PushConfig: push_endpoint: "", attributes: {}>, ack_deadline_seconds: 600, retain_acked_messages: false, message_retention_duration: <Google::Protobuf::Duration: seconds: 604800, nanos: 0>, labels: {}>,
    @lazy=nil,
    @service=Google::Cloud::Pubsub::Service(your-project)>>,
 #<Google::Cloud::Pubsub::ReceivedMessage:0x00007fb1e7d9a1e8
  @grpc=
   <Google::Pubsub::V1::ReceivedMessage: ack_id: "XkASTDYMRElTK0MLKlgRTgQhIT4wPkVTRFAGFixdRkhRNxkIaFEOT14jPzUgKEUSC1MTUVx1BFwQaVwzdQdRDRlze2QjO10SVFdHVXRfURsfWVx-SgVQDRlyeGN9aVgUAwpHV1aL9ozp5apDZhs9XBJLLD5-PTlFQQ", message: <Google::Pubsub::V1::PubsubMessage: data: "{\n  \"kind\": \"storage#object\",\n  \"id\": \"your-bucket/pubsub-test-dir//1537505095050713\",\n  \"selfLink\": \"https://www.googleapis.com/storage/v1/b/your-bucket/o/pubsub-test-dir%2F\",\n  \"name\": \"pubsub-test-dir/\",\n  \"bucket\": \"your-bucket\",\n  \"generation\": \"1537505095050713\",\n  \"metageneration\": \"1\",\n  \"contentType\": \"application/x-www-form-urlencoded;charset=utf-8\",\n  \"timeCreated\": \"2018-09-21T04:44:55.050Z\",\n  \"updated\": \"2018-09-21T04:44:55.050Z\",\n  \"storageClass\": \"STANDARD\",\n  \"timeStorageClassUpdated\": \"2018-09-21T04:44:55.050Z\",\n  \"size\": \"0\",\n  \"md5Hash\": \"1B2M2Y8AsgTpgAmY7PhCfg==\",\n  \"mediaLink\": \"https://www.googleapis.com/download/storage/v1/b/your-bucket/o/pubsub-test-dir%2F?generation=1537505095050713&alt=media\",\n  \"crc32c\": \"AAAAAA==\",\n  \"etag\": \"CNnLn+iky90CEAE=\"\n}\n", attributes: {"payloadFormat"=>"JSON_API_V1", "eventTime"=>"2018-09-21T04:44:55.050303Z", "objectId"=>"pubsub-test-dir/", "objectGeneration"=>"1537505095050713", "eventType"=>"OBJECT_FINALIZE", "notificationConfig"=>"projects/_/buckets/your-bucket/notificationConfigs/1", "bucketId"=>"your-bucket"}, message_id: "210005283272824", publish_time: <Google::Protobuf::Timestamp: seconds: 1537505095, nanos: 213000000>>>,
  @subscription=
   #<Google::Cloud::Pubsub::Subscription:0x00007fb1ea878770
    @exists=nil,
    @grpc=
     <Google::Pubsub::V1::Subscription: name: "projects/your-project/subscriptions/subscription-pull-test", topic: "projects/your-project/topics/your-topic", push_config: <Google::Pubsub::V1::PushConfig: push_endpoint: "", attributes: {}>, ack_deadline_seconds: 600, retain_acked_messages: false, message_retention_duration: <Google::Protobuf::Duration: seconds: 604800, nanos: 0>, labels: {}>,
    @lazy=nil,
    @service=Google::Cloud::Pubsub::Service(your-project)>>]

Message 解析,ファイル処理

Message の内容は JSON 文字列で格納されている.今回はファイルの情報 (GCS 上のパス) が必要となるため, key “name” の value を取得する.

messages.each {|message| puts JSON.parse(message.data)["name"] }
#=> pubsub-test-dir/pubsub-test-file
#   pubsub-test-dir/

オブジェクトがディレクトリの場合は name の末尾に / が付与されていることがわかる.よって,次のように条件分岐させることでファイルのみを処理することができる.

messages.each do |message|
  path = JSON.parse(message.data)["name"]
  # オブジェクトがファイルの場合
  if path[-1] != "/"
    # ファイルを処理する
    # 処理時間が ack_deadline_seconds を超過するおそれがある場合は,
    # いったんファイル情報だけを取得・保存し,実際の処理は ACK を返した後で行う
  end
  # 処理が正常に終了したら ACK を返す
  # messages を pull してから ack_deadline_seconds が経過していた場合は ACK は無効となる
  message.acknowledge!
end

あとは定期的に message 受信, message 解析,ファイル処理, ACK を繰り返せばよい.

以上で例題を解決できた.