ナンモワカランアザラシ

Ringed seal rings no bells

Dataflow Flex Templateの作り方

https://cloud.google.com/dataflow/docs/guides/templates/using-flex-templates?hl=ja

Dataflowはデータのストリーミングやバッチのためのソフトウェアを動かせるマネージドサービスである。 Apache Beam SDKを使って独自のテンプレートを作ることができる。

Dataflowで動かすための「テンプレート」はjsonファイルである。使用するコンテナイメージやパラメータを構造的に記述する設定ファイルだ。

FROM gcr.io/dataflow-templates-base/python312-template-launcher-base

ENV FLEX_TEMPLATE_PYTHON_PY_FILE="/template/your-package/main.py"
ENV FLEX_TEMPLATE_PYTHON_REQUIREMENTS_FILE="/template/requirements.txt"
ENV FLEX_TEMPLATE_PYTHON_SETUP_FILE="/template/setup.py" 

COPY requirements.txt /template/
COPY beametrics/ /template/your-package/
COPY setup.py /template/

RUN apt-get update \
    && apt-get install -y libffi-dev git \
    && rm -rf /var/lib/apt/lists/* \
    && pip install --no-cache-dir --upgrade pip \
    && pip install --no-cache-dir -r /template/requirements.txt \
    && cd /template && pip install .

ENTRYPOINT ["/opt/google/dataflow/python_template_launcher"]

こんな感じでDockerfileを作ってイメージビルドすればいい感じになる。たぶんsetup, requirementsなどが冗長だと思うけどとりあえずこの状態で動かすために十分ではある。 https://github.com/kesompochy/beametrics/blob/master/Dockerfile

Airbyte Custom Connectorの作り方

https://github.com/airbytehq/airbyte/blob/master/airbyte-integrations/connector-templates/generator/generate.sh

  1. https://github.com/airbytehq/airbyte/blob/master/airbyte-integrations/connector-templates/generator/generate.sh を実行する
  2. ファイル群ができあがるのでsource.pyなどを好きにいじる
  3. https://github.com/kesompochy/airbyte-source-datadog-usage/blob/master/Dockerfile のような感じでDockerfileを作る
  4. イメージビルド、プッシュ
  5. airbyteのweb UIからsourceを作る
  6. 完成!

Stream

airbyteのSourceはStreamという概念を基本としている。
Sourceは複数のStreamを持つ。 Sourceに与えられたconfigから個々のStreamにパラメータを渡す。
airbyteで使う側は、どのStreamを使用するかを選ぶ。
基本的なAPIリクエストで抽出するやつならHTTPStreamが便利。
base_url, path, request_params, parse_responseをいじって柔軟に作れる。
最終的にはparse_responseでiterableに作られたデータがdestinationに渡される。

Cosmos@1.7.0にcommitした

https://github.com/astronomer/astronomer-cosmoshttps://github.com/astronomer/astronomer-cosmos/pull/1234

CosmosはDBTをAirflow上で動かすためにフレームワークである。
virtualenvを使ったときに一時ディレクトリが消えないバグを踏んだので調べていた。

https://github.com/astronomer/astronomer-cosmos/issues/958

1.6.0にしたらそのバグは解消されたのだが、冗長っぽい処理を見つけたのでパッチを当てた。レビュー対応・リリース処理をしてくれたcore contributersには大感謝である。
かなり大きな変更をしてしまったので本当に意図通りに機能するか、劣化しないかひやひやしている......。
人柱として早めにアップグレードを試そう。

BigQueryテーブル内のJSONデータの一部の値を`JSON_SET`を使って変更する

UPDATE句とJSON_SET関数を合わせて使えばできる。 具体的には次のような感じ。

テストデータの用意

INSERT INTO `my_dataset.test_table` (id, json)
VALUES 
  (1, JSON '{"hoge": "fuga", "piyo": 30}'),
  (2, JSON '{"hoge": "fuga2", "piyo": 32}'),
  (3, JSON '{"hoge": "fuga3", "piyo": 33}')

一部をUPDATE

UPDATE `avid-influence-381703.my_dataset.test_table` 
SET json = JSON_SET(json, '$.hoge', 'modified!!')
WHERE id = 1

selectして確認

select * from my_dataset.test_table

jsonオブジェクトの一部の値が変更された。

Row  
json
id
1   
{"hoge":"modified!!","piyo":30}
1
2   
{"hoge":"fuga2","piyo":32}
2
3   
{"hoge":"fuga3","piyo":33}
3

JSON_SETについては次を参照されたい。

https://cloud.google.com/bigquery/docs/reference/standard-sql/json_functions#json_set

JSON_SETとUPDATEを組み合わせた事例が見つからなかったので自分のプロジェクトで確かめた。 ちなみに、streaming insertが常に走っているようなテーブルだとそのテーブルはUPDATEできないので注意が必要である(一敗)。その場合、次のようなエラーメッセージが出る。

UPDATE or DELETE statement over table my-dataset.table.hoge would affect rows in the streaming buffer, which is not supported

streaming bufferが存在する状態だとDML(データ操作言語、UPDATEやDELETE)での操作ができないようだ。

ref: Life of a BigQuery streaming insert | Google Cloud Blog

streaming bufferはパーティションに紐づくので、パーティションが例えばDAYで分かれていれば、streamingが走っている日以外のデータはUPDATEできる。

Apache AirflowのDAGファイルで他のDAGファイルから変数などをimportするとスケジューラはDAGオブジェクトを二重に検知する

題の通り。 例えば次の二つのファイルがあると、スケジューラはimportの際にエラーを返す。 dag2.pyでdag_1というIDのDAGがあるが、それはdag1.pyですでに宣言されているよ、と。

from airflow import DAG
from airflow.operators.python import PythonOperator
from datetime import datetime, timedelta

default_args = {
    'owner': 'airflow',
    'depends_on_past': False,
    'start_date': datetime(2023, 1, 1),
    'email_on_failure': False,
    'email_on_retry': False,
    'retries': 1,
    'retry_delay': timedelta(minutes=5),
}

def print_hello():
    return 'Hello from DAG 1'

dag1 = DAG(
    'dag_1',
    default_args=default_args,
    description='A simple DAG 1',
    schedule_interval=timedelta(days=1),
)

task1 = PythonOperator(
    task_id='hello_task',
    python_callable=print_hello,
    dag=dag1,
)
from airflow import DAG
from airflow.operators.python import PythonOperator
from datetime import datetime, timedelta
from dag_file_1 import print_hello, default_args

dag2 = DAG(
    'dag_2',
    default_args=default_args,
    description='A simple DAG 2',
    schedule_interval=timedelta(days=1),
)

task2 = PythonOperator(
    task_id='hello_task',
    python_callable=print_hello,
    dag=dag2,
)

https://airflow.apache.org/docs/apache-airflow/stable/best-practices.html#top-level-python-code

公式ページには、トップレベルのコードで避けるべきことが載っている。 スケジューラはDAGファイルのコードをメモリにロードし、その中からDAGオブジェクトを認識するのだろう(推測)。importするとそのファイルのトップレベルのオブジェクトもメモリに乗ってしまうのだろう(推測)。

TerraformでBigQueryテーブルのforce replacementがplan時に検知されなかったんだけど再現できない!!

掲題の通り。BigQueryのテーブルが再作成されてしまったのだが、plan時にはreplaceが表示されなかった。再現しようとしてもできない。なぜ?

追記: 2024-09-16

日次でBigQueryのテーブルをreplaceするスクリプトが定期実行されていた。replaceする際に一部のフィールドだけが大文字にされていた。
planしたタイミングとスキーマが変わるタイミングがうまく噛み合ってしまった結果、「planで検知されなかった」が発生したようだ。

バージョン

  • terraform@1.9.5
  • terraform google provider@4.0.0

やったことと起きたこと

前提

  • BigQueryのデータセットがある
  • BigQueryのデータセットはstate管理されている
  • BigQueryのデータセット内にテーブルがある
  • そのテーブルはstate管理されていなかった

やったこと

  1. schemaのfieldの大文字小文字を間違えた状態で対象テーブルのresourceブロックを記述した
  2. importブロックを使ってテーブルをstateにimportした

起きたこと

  1. importできた(!?)
  2. そのあとのplanでも差分は出なかった(??)
  3. 一日経ったらforce replaceの差分が検知され、再作成された(それはそう)

schemaの文字が変わったので再作成になるのは理解できる。しかしschemaの大文字小文字が異なる状態でimportができたことに納得がいかない。

再現したかった

https://github.com/kesompochy/terraform-force-replace-test

適当なGoogle Cloudプロジェクトを使ってリソースを配置してコードを書いてみたが、再現できない。正しくforce replacementが表示される。 「起きたこと」の2と3の間にリソースに変更があったのか?と怪しんだが、監査ログを見てもそのような変更は見られない。 何か見落としていることがありそうだけど分からない。

【入門awk】Terraformのdeprecated `require_partition_filter is deprecated and will be removed in a future major release.` を一括対応する

#!/bin/bash

# 処理対象のファイルパスを指定
file="./path/to/your/file.tf"

# AWKスクリプトを使用してファイルを処理
awk '
BEGIN {
    # 初期化:各種フラグと変数の設定
    inside_dynamic = 0                # dynamic ブロック内にいるかのフラグ
    inside_time_partitioning = 0      # time_partitioning ブロック内にいるかのフラグ
    bracket_count = 0                 # 中括弧の数を追跡するカウンター
    require_partition_filter = ""     # require_partition_filter の行を保存する変数
    block_content = ""                # 現在処理中のブロックの内容を保存する変数
    resource_indent = ""              # リソースのインデントを保存する変数
}

# google_bigquery_table リソースの行を検出し、そのインデントを保存
/resource[[:space:]]+"google_bigquery_table"/ {
    resource_indent = $0
    sub(/[^[:space:]].*$/, "", resource_indent)
    print
    next
}

# dynamic "time_partitioning" ブロックの開始を検出
/dynamic "time_partitioning"/ {
    inside_dynamic = 1
    bracket_count = 1
    block_content = $0
    next
}

# time_partitioning ブロックの開始を検出(dynamic ブロック外の場合)
/time_partitioning[[:space:]]*{/ && !inside_dynamic {
    inside_time_partitioning = 1
    bracket_count = 1
    block_content = $0
    next
}

# require_partition_filter の行を検出し、適切なブロック内であれば保存
/require_partition_filter[[:space:]]*=/ {
    if (inside_dynamic || inside_time_partitioning) {
        require_partition_filter = $0
        sub(/^[[:space:]]*/, "", require_partition_filter)
        next
    }
    print
    next
}

# その他の行の処理
{
    if (inside_dynamic || inside_time_partitioning) {
        # 現在のブロック内容に行を追加
        if (block_content != "") block_content = block_content "\n"
        block_content = block_content $0
        
        # 中括弧の数をカウント
        if ($0 ~ /{/) bracket_count++
        if ($0 ~ /}/) {
            bracket_count--
            if (bracket_count == 0) {
                # ブロックの終了を検出
                if (inside_dynamic) inside_dynamic = 0
                if (inside_time_partitioning) inside_time_partitioning = 0
                
                # ブロック内容を出力
                printf "%s\n", block_content
                block_content = ""
                
                # require_partition_filter を適切なインデントで出力
                print resource_indent "  " require_partition_filter
                require_partition_filter = ""
            }
        }
    } else {
        # ブロック外の行はそのまま出力
        print
    }
}

END {
    # 未出力のブロック内容やrequire_partition_filterがあれば出力
    if (block_content != "") printf "%s\n", block_content
    if (require_partition_filter != "") print resource_indent "  " require_partition_filter
}
' "$file" > "$file.tmp" && mv "$file.tmp" "$file"

registry.terraform.io

require_partition_filter is deprecated and will be removed in a future major release. Use the top level field with the same name instead.

google_bigquery_tableリソースのrequire_partition_filter属性を記述する位置が変わるようだ。トップレベルに書くのが推奨になった。 tfファイルの中に大量にtime_partitionブロックの中のrequire_partition_filterがあったので、一括で移動するためのawkスクリプトを書いた。dynamic "time_partitioning"ブロックの中のcontentブロックの中にもあったので、どちらでも対応できるようにした。

私は今回 awkを初めてまともにスクリプト言語として使った。awkは入力に対して、行を舐めていってprintしていくスクリプト言語であると理解した。実行全体を通して状態が共有されるフラグが使えるのは便利だ。