Glueでパーティションキーを設定してパーティション化する

目的・やりたいこと

前回、[技術検証]S3に保存された監査ログをGlueで可読性を上げてAtenaで抽出する検証では、ネスト化されたログをフラット化するところまでやりました。
しかし前回のS3のログ配置では、Hive形式になっておらず、パーティションが使えないという課題がありました。

 

そこで今回の検証では、Glueジョブでパーティションキーを設定してパーティション化し、Hive形式でS3に保存されるまでを扱います。

対象者

ログをS3に保存し、それをAtenaでパーティションキーを指定して情報を抽出したい中級レベルのデータ分析技術者

対象となる技術

  • Glue
  • Athena

条件(導入にあたっての前提事項)

  • 前回までの検証の環境が構築できていること

注意

今回のパーティションの設定方法は完全に我流のやり方なので、もしかしたら本来の方法より非効率かもしれません。もっといい方法あるよという方いましたら教えてください。

参考URL

概要図

作業の流れ

設定手順

まずはGlueジョブを使ってtimeフィールド「2024-01-16 01:30:11.0」から情報をそれぞれ年、月、日の単位で抽出し、それをパーティションキーに設定するフローを作ります。

1.Visual ETLをクリック

2.SourceにS3、TransformにまずTo Timestamp関数を選択し、timeをiso形式に変換

3.今度はFormat Timestamp関数を使って、yyyyというフォーマットでyearという列を作成

4.同様にFormat Timestamp関数を使って、MMというフォーマットでmonthという列を作成

5.同様にFormat Timestamp関数を使って、ddというフォーマットでdayという列を作成

6.最後にカタログテーブルでパーティション(年、月、日)を設定

これをスクリプト化したのが次の内容です。

import sys
from awsglue.transforms import *
from awsglue.utils import getResolvedOptions
from pyspark.context import SparkContext
from awsglue.context import GlueContext
from awsglue.job import Job
import gs_format_timestamp
import gs_to_timestamp

args = getResolvedOptions(sys.argv, ["JOB_NAME"])
sc = SparkContext()
glueContext = GlueContext(sc)
spark = glueContext.spark_session
job = Job(glueContext)
job.init(args["JOB_NAME"], args)

# Script generated for node Amazon S3
AmazonS3_node1705548592182 = glueContext.create_dynamic_frame.from_options(
    format_options={"multiline": False},
    connection_type="s3",
    format="json",
    connection_options={
        "paths": ["s3://csi-se007-infra-system-logs/glue_log/run_part_r_00000/"],
        "recurse": True,
    },
    transformation_ctx="AmazonS3_node1705548592182",
)

# Script generated for node To Timestamp
ToTimestamp_node1704956271806 = AmazonS3_node1705548592182.gs_to_timestamp(
    colName="time", colType="iso"
)

# Script generated for node Format Timestamp
FormatTimestamp_node1704956383134 = ToTimestamp_node1704956271806.gs_format_timestamp(
    colName="time", dateFormat="yyyy", newColName="year"
)

# Script generated for node Format Timestamp
FormatTimestamp_node1704958713540 = (
    FormatTimestamp_node1704956383134.gs_format_timestamp(
        colName="time", dateFormat="MM", newColName="month"
    )
)

# Script generated for node Format Timestamp
FormatTimestamp_node1705024506944 = (
    FormatTimestamp_node1704958713540.gs_format_timestamp(
        colName="time", dateFormat="dd", newColName="day"
    )
)

additionalOptions = {"enableUpdateCatalog": True}
additionalOptions["partitionKeys"] = ["year", "month", "day"]

# Script generated for node AWS Glue Data Catalog
AWSGlueDataCatalog_node1705548604536 = glueContext.write_dynamic_frame.from_catalog(
    frame=FormatTimestamp_node1705024506944,
    database="se007-app-db",
    table_name="se007-app-table",
    transformation_ctx="AWSGlueDataCatalog_node1705548604536",
    additional_options=additionalOptions,
)

job.commit()

ここでポイントとなるのが次の2行

additionalOptions = {"enableUpdateCatalog": True}
additionalOptions["partitionKeys"] = ["year", "month", "day"]

これはここの方式1の内容をそのまま記載
次に

sink = glueContext.write_dynamic_frame_from_catalog(frame=last_transform, database=<target_db_name>,
                                                    table_name=<target_table_name>, transformation_ctx="write_sink",
                                                    additional_options=additionalOptions)

の部分は、「additional_options=additionalOptions」だけを最後のデータカタログ作成コードの中に組み入れればOK

検証

1.まずこの状態でGlueジョブを実行

2.該当のカタログテーブルのスキーマパーティションキー、パーティションが設定されていることを確認

3.S3にHive形式で出力されていることを確認

4.最後にAtenaでパーティションキーを指定して検索できることを確認

所要時間

1時間