yikegaya’s blog

仕事関連(Webエンジニア)と資産運用について書いてます

DockerでローカルにGCP Dataflowの開発環境を作った

仕事でGCPのDataflowを検証することになりローカルからdockerでPythonスクリプトを実行してGCPにデプロイできる環境を作ってみました。

動かすコード

GCPの公式サイトに書いてあったDataflowからPubsubに書き込むデモを動かしてみます。 Dataflow から Pub/Sub に書き込む  |  Google Cloud

環境変数から必要な情報を読み取ってdocker-compose runのみで実行できるようにするため情報を足して以下のように書き換えます。

このpythonスクリプトはjobs/write_to_pubsub_demo/main.pyに保存しておきます。

import argparse
from typing import Any, Dict, List
from dotenv import load_dotenv
import os
import apache_beam as beam
from apache_beam.options.pipeline_options import GoogleCloudOptions
from apache_beam.io import PubsubMessage
from apache_beam.io import WriteToPubSub
from apache_beam.options.pipeline_options import PipelineOptions
from apache_beam.options.pipeline_options import StandardOptions
from apache_beam.options.pipeline_options import WorkerOptions

from typing_extensions import Self

load_dotenv()
GCP_PROJECT_ID = os.environ['GCP_PROJECT_ID']
GCS_BUCKET_NAME = os.environ['GCS_BUCKET_NAME']
DEMO_TOPIC_ID = os.environ['DEMO_TOPIC_ID']
TOPIC = 'projects/{}/topics/{}'.format(GCP_PROJECT_ID, DEMO_TOPIC_ID)
JOB_NAME = 'write-to-pubsub-demo'

def item_to_message(item: Dict[str, Any]) -> PubsubMessage:
    # Re-import needed types. When using the Dataflow runner, this
    # function executes on a worker, where the global namespace is not
    # available. For more information, see:
    # https://cloud.google.com/dataflow/docs/guides/common-errors#name-error
    from apache_beam.io import PubsubMessage

    attributes = {
        'buyer': item['name'],
        'timestamp': str(item['ts'])
    }
    data = bytes(item['product'], 'utf-8')

    return PubsubMessage(data=data, attributes=attributes)

def write_to_pubsub():

    # Parse the pipeline options passed into the application. Example:
    #     --topic=$TOPIC_PATH --streaming
    # For more information, see
    # https://beam.apache.org/documentation/programming-guide/#configuring-pipeline-options

    example_data = [
        {'name': 'Robert', 'product': 'TV', 'ts': 1613141590000},
        {'name': 'Maria', 'product': 'Phone', 'ts': 1612718280000},
        {'name': 'Juan', 'product': 'Laptop', 'ts': 1611618000000},
        {'name': 'Rebeca', 'product': 'Video game', 'ts': 1610000000000}
    ]
    options = PipelineOptions()

    google_cloud_options = options.view_as(GoogleCloudOptions)
    google_cloud_options.project = GCP_PROJECT_ID 
    google_cloud_options.job_name = JOB_NAME
    google_cloud_options.staging_location = '{}/binaries'.format(GCS_BUCKET_NAME) 
    google_cloud_options.temp_location = '{}/temp'.format(GCS_BUCKET_NAME)
    google_cloud_options.region = 'asia-northeast1' 

    options.view_as(WorkerOptions).autoscaling_algorithm = 'THROUGHPUT_BASED'
    options.view_as(StandardOptions).runner = 'DataflowRunner'

    p = beam.Pipeline(options=options)
    (p
      | "Create elements" >> beam.Create(example_data)
      | "Convert to Pub/Sub messages" >> beam.Map(item_to_message)
      | WriteToPubSub(
            topic=TOPIC,
            with_attributes=True))
    p.run()
    print('Pipeline ran successfully.')

if __name__ == "__main__":
    write_to_pubsub()

用意した環境

Docker

FROM python:3.10

WORKDIR /workspace

COPY . .

RUN pip install -r requirements.txt

compose.yml

version: '3.8'

services:
  dataflow_demo:
    build: .
    env_file: .env
    container_name: dataflow-demo
    volumes:
      - .:/workspace

requirements.txt

DataflowはApache BeamsというプログラミングモデルをGCPで実行するサービスなのでapache-beamをインストールできるようにしておきます。

また、実運用を想定して環境変数を使うためpython-dotenvも用意します。

python-dotenv
apache-beam
apache-beam[gcp]

envファイルとGCP設定

事前にGCPの適当なプロジェクトとバケット、Pub/Subのトピックを作って環境変数に必要な情報を足していきます。

またサービスアカウントキーを作成してルート直下にcredentials.jsonとして保存します。 cloud.google.com

GCP_PROJECT_ID=dataflow-demo
GCS_BUCKET_NAME=gs://demo-bucket-name
GOOGLE_APPLICATION_CREDENTIALS=/workspace/credentials.json
DEMO_TOPIC_ID=demo-topic

githubで管理することを想定してgitignoreしておきます。

.env
credentials.json

実行

docker-compose run dataflow_demo jobs/write_to_pubsub_demo/main.py

実行後しばらく待ってGCP管理画面でDataflowの実行結果を確認できます。

これでjobsフォルダの下に試したいjobのケース(例えばBigqueryに書き込むなど)をpythonで書いて検証していくことができるようになりました。

github.com