サービス概要
Glueとは、ETL(抽出、変換、ロード)プロセスの検出、準備、統合、近代化を容易にするサーバーレスデータ統合サービスです。ちなみにglueは「接着剤」という意味の英単語
Atenaとは、S3に格納されているデータを、SQLを使用して直接分析でき、簡単に操作できるサービスです。
目的・やりたいこと
前回、[技術検証]OktaのシステムログをS3に掃き出す検証では、Oktaのシステム監査ログを、EventBrideとKinesis FIrehoseと連携してS3に掃き出すところまでやりました。
しかし前回のS3のログをそのままAtenaで抽出すると、このようにdetailフィールドがstruct構造でネスト化されたまま出力され、可読性に欠けるという課題がありました。
今回の検証では、S3に保存されているネスト化されたログをフラット化してデータカタログに保存し、Atenaからクエリして出力するまでを扱います。
対象者
Oktaのシステムログを監査目的でS3に保存し、それをAtenaで条件を変えて情報を抽出したい中級レベルのデータ分析技術者
今回新たに対象となる技術
- Glue
- Athena
条件(導入にあたっての前提事項)
-
前回までの検証の環境が構築できていること
-
必要なロール
・csi-se007-infra-glue-role
・csi-se007-infra-glue-s3read-policy
{
"Statement": [
{
"Action": "s3:*",
"Effect": "Allow",
"Resource": [
"arn:aws:s3:::aws-glue-*/scripts/*",
"arn:aws:s3:::aws-glue-*/temporary/*",
"arn:aws:s3:::aws-glue-studio-transforms-*/*",
"arn:aws:s3:::csi-se007-infra-system-logs/*"
],
"Sid": "ReadS3Policy"
}
],
"Version": "2012-10-17"
}
参考URL
概要図
作業の流れ
手順
まずはGlueを使ってネストされた Struct のフィールドをフラット化し、最上位のフィールドにします。それにはGlue jobを作成する必要があります。
1.Visual ETLをクリック
2.SourceにS3、TransformにFlatten、TargetにData Catalogを選択
3.さらに、極力重複したレコードを排除するため、Flattenの後にTransformsのDrop Duplicatesという関数も挟みます。
さらに、配列(Array)化したレコードをフラットにするため、Transformsの最後にArray To Columnsという関数も挟みます。
例えば、detail.targetというフィールドは標準ではこのように配列化されています。
これを次のようにdetail.target1,detail.target2,detail.target3と3つのレコードに分けます。
さらに重複削除を最初に実行するように配置転換し、最終的には以下のフローにしました。
4.SoruceのS3 URL、Data format、IAM Roleをこのように選択
5.Transformは特にいじらずそのままでOK
6.TargetのDatabase、Tableをこのように選択
7.(Terraform化する場合)このようにScriptが出来上がっているため、この内容をコピーしてどこかローカルのテキストファイルに貼り付ける
import sys
from awsglue.transforms import *
from awsglue.utils import getResolvedOptions
from pyspark.context import SparkContext
from awsglue.context import GlueContext
from awsglue.job import Job
import gs_flatten
from awsglue.dynamicframe import DynamicFrame
import gs_array_to_cols
from pyspark.sql import functions as SqlFuncs
args = getResolvedOptions(sys.argv, ["JOB_NAME"])
sc = SparkContext()
glueContext = GlueContext(sc)
spark = glueContext.spark_session
job = Job(glueContext)
job.init(args["JOB_NAME"], args)
# Script generated for node Amazon S3
AmazonS3_node1701850192247 = glueContext.create_dynamic_frame.from_options(
format_options={"multiline": False},
connection_type="s3",
format="json",
connection_options={
"paths": ["s3://csi-se007-infra-system-logs/amazonloginlog/"],
"recurse": True,
},
transformation_ctx="AmazonS3_node1701850192247",
)
# Script generated for node Drop Duplicates
DropDuplicates_node1701851055624 = DynamicFrame.fromDF(
AmazonS3_node1701850192247.toDF().dropDuplicates(),
glueContext,
"DropDuplicates_node1701851055624",
)
# Script generated for node Flatten
Flatten_node1701851065629 = DropDuplicates_node1701851055624.gs_flatten(maxLevels=0)
# Script generated for node Array To Columns
ArrayToColumns_node1701851069244 = Flatten_node1701851065629.gs_array_to_cols(
colName="`detail.target`", colList="detail.target1,detail.target2,detail.target3"
)
# Script generated for node AWS Glue Data Catalog
AWSGlueDataCatalog_node1701851073854 = glueContext.write_dynamic_frame.from_catalog(
frame=ArrayToColumns_node1701851069244,
database="nozaki-systemlogcrawler-db",
table_name="run_part_r_00000",
transformation_ctx="AWSGlueDataCatalog_node1701851073854",
)
job.commit()
8.[Job details]で、Job bookmarkを「Enable」に変更し、それ以外はデフォルトのままSaveしてJobを保存
ここでジョブブックマークとは、古いデータを再処理しないために最後の処理を覚えさせておく機能です。
ジョブのブックマークを使用した処理済みデータの追跡
ここでブックマークが本当に効いているか有効性を検証
- ブックマークを有効にしている場合
1.Atenaで検索すると、38件抽出
2.Okta経由でAmazonにログインし、システムログを1件増やす。
3.Glue jobを実行
4.Atenaで再度検索すると、1件だけ増えて39件抽出
- ブックマークを無効にした場合
5.今度はブックマークを無効にして、Glue jobを再実行
6.Atenaで再度抽出
58件なので、20件増えている。
内訳を見ると、
完全に同じものを見に行ってるのが20件ある。
以上により、ブックマークの有効性が検証されました。
テーブルのロケーションがシステムログと同じフォルダ内にあるためか、Glueがどうしても重複してS3からシステムログを取りに行くため、テーブルのロケーションを別フォルダに移動することにした。
ALTER TABLE SET LOCATIONを参考にし、以下のALTER TABLE文を実行
ALTER TABLE run_awsgluedatacatalog_node1701299803092_11_part_r_00000 SET LOCATION 's3://csi-se007-infra-system-logs/'
これで確かにLocationがs3://csi-se007-infra-system-logs/glue_log/に変わったが、以降Atenaのクエリでエラーが
どうやらS3バケットアクセスが403Deniedになってるらしい。
それと同時にGlue jobも失敗するようになった。
これらは単純にLocationを変更したらその場所に該当フォルダ「run-AWSGlueDataCatalog_node1701299803092-11-part-r-00000/」がないだけだったので、手動で作ったら直った。そらそうだ
ちなみにS3のブロックパブリックアクセスをオンにしててもアクセスできたので、公開にしなくてもいいらしい。
今度こそ正しいLocationに変更
9.7.で保存したファイル名をn.pyとし、このS3の指定の場所にアップロード
10.Atenaでクエリ
select *
from run_part_r_00000
detail.〜がこのようにフラットに格納されている。
detail.targetはこのように複数のフィールドに分割
所要時間
3時間