目的・やりたいこと
前回、[技術検証]S3に保存された監査ログをGlueで可読性を上げてAtenaで抽出する検証では、ネスト化されたログをフラット化するところまでやりました。
しかし前回のS3のログ配置では、Hive形式になっておらず、パーティションが使えないという課題がありました。
そこで今回の検証では、Glueジョブでパーティションキーを設定してパーティション化し、Hive形式でS3に保存されるまでを扱います。
対象者
ログをS3に保存し、それをAtenaでパーティションキーを指定して情報を抽出したい中級レベルのデータ分析技術者
対象となる技術
- Glue
- Athena
条件(導入にあたっての前提事項)
- 前回までの検証の環境が構築できていること
注意
今回のパーティションの設定方法は完全に我流のやり方なので、もしかしたら本来の方法より非効率かもしれません。もっといい方法あるよという方いましたら教えてください。
参考URL
- AWS Glue ETL ジョブからの Data Catalog でのテーブルの作成、スキーマの更新、および新規パーティションの追加
- [アップデート]AWS Glueがジョブ実行後のパーティション更新に対応しました!
概要図
作業の流れ
設定手順
まずはGlueジョブを使ってtimeフィールド「2024-01-16 01:30:11.0」から情報をそれぞれ年、月、日の単位で抽出し、それをパーティションキーに設定するフローを作ります。
1.Visual ETLをクリック
2.SourceにS3、TransformにまずTo Timestamp関数を選択し、timeをiso形式に変換
3.今度はFormat Timestamp関数を使って、yyyyというフォーマットでyearという列を作成
4.同様にFormat Timestamp関数を使って、MMというフォーマットでmonthという列を作成
5.同様にFormat Timestamp関数を使って、ddというフォーマットでdayという列を作成
6.最後にカタログテーブルでパーティション(年、月、日)を設定
これをスクリプト化したのが次の内容です。
import sys
from awsglue.transforms import *
from awsglue.utils import getResolvedOptions
from pyspark.context import SparkContext
from awsglue.context import GlueContext
from awsglue.job import Job
import gs_format_timestamp
import gs_to_timestamp
args = getResolvedOptions(sys.argv, ["JOB_NAME"])
sc = SparkContext()
glueContext = GlueContext(sc)
spark = glueContext.spark_session
job = Job(glueContext)
job.init(args["JOB_NAME"], args)
# Script generated for node Amazon S3
AmazonS3_node1705548592182 = glueContext.create_dynamic_frame.from_options(
format_options={"multiline": False},
connection_type="s3",
format="json",
connection_options={
"paths": ["s3://csi-se007-infra-system-logs/glue_log/run_part_r_00000/"],
"recurse": True,
},
transformation_ctx="AmazonS3_node1705548592182",
)
# Script generated for node To Timestamp
ToTimestamp_node1704956271806 = AmazonS3_node1705548592182.gs_to_timestamp(
colName="time", colType="iso"
)
# Script generated for node Format Timestamp
FormatTimestamp_node1704956383134 = ToTimestamp_node1704956271806.gs_format_timestamp(
colName="time", dateFormat="yyyy", newColName="year"
)
# Script generated for node Format Timestamp
FormatTimestamp_node1704958713540 = (
FormatTimestamp_node1704956383134.gs_format_timestamp(
colName="time", dateFormat="MM", newColName="month"
)
)
# Script generated for node Format Timestamp
FormatTimestamp_node1705024506944 = (
FormatTimestamp_node1704958713540.gs_format_timestamp(
colName="time", dateFormat="dd", newColName="day"
)
)
additionalOptions = {"enableUpdateCatalog": True}
additionalOptions["partitionKeys"] = ["year", "month", "day"]
# Script generated for node AWS Glue Data Catalog
AWSGlueDataCatalog_node1705548604536 = glueContext.write_dynamic_frame.from_catalog(
frame=FormatTimestamp_node1705024506944,
database="se007-app-db",
table_name="se007-app-table",
transformation_ctx="AWSGlueDataCatalog_node1705548604536",
additional_options=additionalOptions,
)
job.commit()
ここでポイントとなるのが次の2行
additionalOptions = {"enableUpdateCatalog": True}
additionalOptions["partitionKeys"] = ["year", "month", "day"]
これはここの方式1の内容をそのまま記載
次に
sink = glueContext.write_dynamic_frame_from_catalog(frame=last_transform, database=<target_db_name>,
table_name=<target_table_name>, transformation_ctx="write_sink",
additional_options=additionalOptions)
の部分は、「additional_options=additionalOptions」だけを最後のデータカタログ作成コードの中に組み入れればOK
検証
1.まずこの状態でGlueジョブを実行
2.該当のカタログテーブルのスキーマにパーティションキー、パーティションが設定されていることを確認
3.S3にHive形式で出力されていることを確認
4.最後にAtenaでパーティションキーを指定して検索できることを確認
所要時間
1時間