kt.log

Filter ADLS Gen2 logs separated by container name in Azure Stream Analytics query

Azure Stream Analytics のクエリ内で Azure Data Lake Storage Gen2 のログをコンテナー名ごとにフィルタリングする

Azure Data Lake Storage Gen2 で取得可能なログ

Azure Data Lake Storage Gen2 (以下 ADLS Gen2 という) を利用する中で、セキュリティ監査のために操作ログを収集・蓄積したいというニーズはあると思います。

ADLS Gen2 で取得可能なログは以下の通りです。

  1. プラットフォーム ログとメトリック
  2. Storage Analytics ログ

プラットフォーム ログとメトリックは、ストレージ アカウントの [監視] > [診断設定] から設定できます。リソースの種類 (ストレージ アカウント, blob, queue, table, file) ごとに取得するログを指定することができ、ADLS Gen2 のログに関しては blob についての設定を行います。
blob のログは StorageRead, StorageWrite, StorageDelete, Transaction の 4種類を取得することができ、そのうち Transaction はメトリックに関するもの、それ以外の 3つは操作ログです。
これらのログおよびメトリックは、複数の宛先に送信または保存することができます。具体的には (1) Log Analytics ワークスペース (2) ストレージ アカウント (3) イベント ハブ (4) パートナー ソリューション です。これら外部のしくみを使って、ログを蓄積・活用することができます。

一方、Storage Analytics ログは、ストレージ アカウントの [監視 (クラシック)] > [診断設定 (クラシック)] から設定できます。こちらもメトリック、および各種操作ログ (読み取り, 書き込み, 削除) を取得することができます。ログは時間単位または分単位で取得することができ、データの削除をするかどうか、何日後にデータを削除するか (最短1日、最長365日) を指定することができます。
Storage Analytics ログは、同ストレージ アカウントに最初から存在する特殊なコンテナー $logs に保存されます。
Storage Analytics ログを分析するためには、AzCopy 等を使用して ログをローカルマシンにダウンロードする必要があります

ここで頭の片隅に置いておきたいのは、 $logs コンテナーは、他の Azure サービスからは不可視であるという点です。Azure Event Hubs, Azure Stream Analytics, Azure Data Factory, Azure Logic Apps、こういった ADLS Gen2 と接続してデータを抽出してこれるようなサービスで、 $logs コンテナーを指定することができません。

したがって、ADLS Gen2 のログを活用したいのであれば、プラットフォーム ログとメトリック を利用するのがオススメです。

ADLS Gen2 のログ活用の課題

ADLS Gen2 はエンタープライズ データ レイク です。企業内の様々な組織のユーザーが、様々な構造化データ・非構造化データを蓄積していきます。
ちなみに、一つの ストレージ アカウント上に作成できるコンテナーの数は無制限です。一つのコンテナーを企業全体でシェアするのか、組織等の単位でコンテナーを分けるのかは、企業のポリシー、カルチャー、プロジェクトの性質等によって様々です。
ただ、ログ活用の観点から留意したいこととしては、 ストレージ アカウントで出力できるログはすべてストレージ アカウント単位で出力される ということです。すなわち、 ログをコンテナー単位やディレクトリ単位で出力することはできません

例えば、企業IT部門がデータ レイクを管理していて、ユーザー部門に対してコンテナーを割り当てていると仮定します。そして Azure RBACACL を使用して、コンテナーごとにアクセス制御を行っているとします。
各組織のコンテナー利用についてはこれで問題ありませんが、セキュリティ監査の一環として、コンテナーの操作ログを各組織が活用したいという場合に問題が発生します。
ログはストレージ アカウント単位でしか出力されないですし、その設定もストレージ アカウントのスコープで行うため、組織が自身のためにログ設定を行うと他の組織による設定と排他になりますし、ログを取得するにしても、他の組織のログもすべて取得できてしまいます。これはセキュリティ監査の観点からもよろしいことではありません。

もちろん、組織ごとにストレージ アカウントを分けるという手もありますが、企業のデータ利活用という観点では、制約となり得ます。すなわち、操作ログの活用のためだけに、データ レイクの利便性や可能性を制限するようなことは、本来したくないはずです。

したがって、ログの設定はデータ レイクの管理者である企業IT部門が行うとして、そのログを組織ごとに割り振る方法を考えます。なお、各組織はコンテナーを割り当てられていて、その単位で操作ログの監査を行うユースケースを想定します。

プラットフォーム ログのスキーマ

プラットフォーム ログとメトリック のファイル形式は JSON です。 (ちなみに、Storage Analytics ログはセミコロン ; 区切りの Delimited Text です。)
StorageRead のログのうち、ADLS Gen2 上の blob を一覧表示した際に出力されるものを一例として示します。 (実際のログをもとにしていますが、一部の文字列はダミーで置き換えるか、伏せるか、変数ライクに加工する等しています。)

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
[
{
"time": "1970-01-01T00:00:00.0000000Z",
"resourceId": "/subscriptions/{SUBSCRIPTION_ID}/resourceGroups/{RESOURCE_GROUP_NAME}/providers/Microsoft.Storage/storageAccounts/{STORAGE_ACCOUNT_NAME}/blobServices/default",
"category": "StorageRead",
"operationName": "ListBlobs",
"operationVersion": "2018-03-28",
"schemaVersion": "1.0",
"statusCode": 200,
"statusText": "Success",
"durationMs": 14,
"callerIpAddress": "{IPv4_ADDRESS}:{PORT_NUMBER}",
"correlationId": "********-****-****-****-************",
"identity": {
"type": "AccountKey",
"tokenHash": "key1(****************************************************************)"
},
"location": "{REGION_NAME}",
"properties": {
"accountName": "{STORAGE_ACCOUNT_NAME}",
"userAgentHeader": "Azure-Storage/9.3.2 (.NET CLR 4.0.30319.42000; Win32NT 10.0.14393.0)",
"clientRequestId": "********-****-****-****-************",
"serviceType": "blob",
"objectKey": "/{STORAGE_ACCOUNT_NAME}/{CONTAINER_NAME}",
"metricResponseType": "Success",
"serverLatencyMs": 13,
"requestHeaderSize": 503,
"responseHeaderSize": 152,
"responseBodySize": 4652,
"tlsVersion": "TLS 1.2",
"accessTier": "None"
},
"uri": "https://{STORAGE_ACCOUNT_NAME}.blob.core.windows.net:443/{CONTAINER_NAME}?restype=container&comp=list&prefix=&delimiter=%2F&maxresults=5000&include=metadata",
"protocol": "HTTPS",
"resourceType": "Microsoft.Storage/storageAccounts/blobServices"
},
...
]

ログをコンテナーごとに振り分けるとして、利用価値があるであろうフィールドは以下の通りです。

  • ストレージアカウント名 (STORAGE_ACCOUNT_NAME) が含まれるフィールド
    • resourceId
    • properties.accountName
    • properties.objectKey
    • url
  • コンテナー名 (CONTAINER_NAME) が含まれるフィールド
    • properties.objectKey
    • url

課題へのアプローチとしては、コンテナー名が含まれるフィールドから少なくとも 1つ、可能であればストレージアカウント名が含まれるフィールドも組み合わせて、ログの各行がどのコンテナーのものであるかを判定し、その結果から異なる宛先に転送することとします。

割り振りを行うサービスの選定

再掲となりますが、プラットフォーム ログとメトリックの宛先として利用できるサービスは以下の 4つです。

  1. Log Analytics ワークスペース
  2. ストレージ アカウント
  3. イベント ハブ
  4. パートナー ソリューション

本記事では、 Azure のサービスを利用することを想定します。
この中で上記の割り振りを実現するための最もシンプルな選択肢は、イベント ハブ (Azure Event Hubs) です。
Azure Event Hubs でログをキューイングし、 Azure Stream Analytics でそれを取得、クエリを実行して割り振りを行い、異なる出力先に送る方式を考えます。

Azure Event Hubs の構成

Azure Event Hubs のリソースを 1つ、その中で名前空間を 1つ、それぞれ作成します。コンシューマー グループ はデフォルトで $Default が存在しますが、今は何もしなくて構いません。
もし ADLS Gen2 が複数ある場合は、それに合わせて Azure Event Hubs を複数作るか、1つの Azure Event Hubs リソースの中に名前空間を複数作ります。

また、今回 ADLS Gen2 の StorageRead を組織ごとに割り振ることを考えますが、他の StorageWriteStorageDelete 、そしてメトリックである Transaction 、あるいは (ADLS Gen2 ではなく) ストレージ アカウントのメトリック Transaction も出力したい場合は、それぞれに対して名前空間を作るとよいでしょう。
理由は、それぞれログのスキーマが異なりますが、 Azure Stream Analytics ジョブ 1つあたりで実行できるクエリは 1つであるため、上記のように名前空間を分けて Azure Stream Analytics ジョブも分けることで、クエリの複雑化を防ぐことができるためです。

また、もし後続の Azure Stream Analytics ジョブを複数作って同じログを複数の宛先に送りたい場合は、コンシューマー グループを複数作ります。

以下に、Azure Event Hubs のデプロイ手順を示します。

ADLS Gen2 のプラットフォーム ログ設定

Azure Event Hubs のリソースをデプロイして構成が完了したら、Azure Stream Analytics の構成を行う前に ADLS Gen2 のストレージ アカウントの [監視] > [診断設定] からプラットフォーム ログの設定を行い、Azure Event Hubs にログを出力しておくことをオススメします。

理由は、Azure Stream Analytics ではクエリの編集中に、実際のデータを使ってクエリの実行結果を確認しながら作業を進められるためです。そのためのログデータを Azure Event Hubs に流しておきます。

今回、プラットフォーム ログは blob について設定します。初期状態では診断の状態が 無効 となっています。

blob をクリック

+ 診断設定を追加する をクリック

今回は StorageRead を組織ごとに割り振るということで、 [ログ] > [カテゴリ] > [StreageRead] のチェックボックスをクリックしてチェックを入れます。

次に [宛先の詳細] > [イベント ハブへのストリーム] のチェックボックスにチェックを入れます。
そして、サブスクリプション、イベント ハブの名前空間をそれぞれ選択します。他はデフォルト状態のままで構いません。

診断設定の名前 を付けておくことを忘れずに。

最後に [保存] をクリックします。

また、加えて、後で使用するログを生成するために、Azure Portal のストレージアカウント等からコンテナーにアクセスしておきましょう。

Azure Stream Analytics の構成

Azure Stream Analytics ジョブのリソースを 1つ作成します。
作成手順を以下に示します。

まずは入力と出力を定義します。
入力は先に作成してある Azure Event Hubs です。
出力については今ここで決めなくてもよいと思います。
各組織のリソースに直接出力する方式も考えられますし、振り分けたログをそれぞれいったん別の Azure Event Hubs にキューイングし、各組織は Azure Stream Analytics 等でそこからデータを取り出して任意の方法で蓄積・加工・分析を行う形も考えられます。

現時点では、出力先を定義せず、ログの振り分けにフォーカスしたいと思います。

入力の追加

[ジョブ トポロジ] > [入力] をクリックし、入力に関する画面を開きます。

[+ ストリーム入力の追加] をクリックし、 イベント ハブ をクリックします。

* の付いている必須項目の入力を行います。

必須項目 解説
入力のエイリアス 名前です。後でクエリ内でこの名前を使いますので、入力が単一の場合はシンプルに input 等でもよいでしょうし、入力を複数定義する場合は識別可能なエイリアス名を付けます。エイリアス名は当該 Azure Stream Analytics ジョブ内でユニークである必要があります。
イベント ハブ設定を手動で行う / サブスクリプションからイベント ハブを選択する 通常は後者 (デフォルト) で良いです。
イベント ハブの名前空間 先に作成してある Azure Event Hubs 名前空間を選択します。
イベント ハブ名 本記事においては既存のものを使用。当該 Azure Event Hubs 名前空間に作成した Event Hub (のインスタンス) を選択します。
イベント ハブ コンシューマー グループ 本記事においては新規作成 (デフォルト) を採用。コンシューマー グループ名にはデフォルトで {当該 Azure Stream Analytics のリソース名}_{イベント ハブ名}_consumer_groupという文字列が入力されています。(入力のエイリアスを先に入力した場合。) コンシューマー グループ名は最大50文字であり、入力フォームでのバリデーションはしてくれないため、保存 後にバリデーション エラーとなる場合があります。この場合は、適宜修正して対応してください。ちなみに、新規作成ではなく既存のものを使用する場合、デフォルトで存在する $Default も選択可能です。
イベントシリアル化形式 プラットフォーム ログ は JSON 形式ですので、 JSON のままにしておいてください。

最後に 保存 をクリックします。

エラーとなった場合

ここで ジョブへの ID の追加に失敗しました というエラーが発生する場合があります。

この場合、あと少し時間が経てば自動的に解決することが多いですが、手動で解決する場合は、リンク先にあるドキュメントを参照しながら、当該 Azure Stream Analytics のマネージド ID を作成し、当該 Azure Event Hubs 名前空間における Azure Event Hubs データ所有者 ロールを、そのマネージド ID に割り当ててください。

繰り返しになりますが、時間が経てば解決することが多いです。手動で設定中に解決されている場合もエラーとなります。

そのあと、Azure Stream Analytics に戻り、当該 Azure Event Hubs への接続をテストしてください。

クエリの作成

[ジョブ トポロジ] > [クエリ] をクリックし、クエリに関する画面を開きます。

一番右のペインの中断、行番号が振ってあるエリアがクエリを記述するエリアです。
初期状態で記述されているクエリは使用しません。使用するクエリで上書きします。

使用するクエリの例を示します。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
WITH ReaderQuery AS (
SELECT record FROM [evh-diagnostics-stdatalake20230122aje-blob-storageread]
CROSS APPLY GetArrayElements(records) AS record
)

SELECT
record.arrayvalue.*
INTO
[cosmos-diag-stdatalake20230122aje-blob-storageread]
FROM
ReaderQuery
WHERE
record.arrayvalue.resourceId LIKE ('%/providers/Microsoft.Storage/storageAccounts/stdatalake20230122aje/blobServices/default')
AND record.arrayvalue.properties.accountName = 'stdatalake20230122aje'
AND record.arrayvalue.properties.objectKey = '/stdatalake20230122aje/datalake1'
AND record.arrayvalue.uri LIKE ('https://stdatalake20230122aje.blob.core.windows.net:443/datalake1?%')

以下、解説を行います。


WITH 句

クエリエンジンに入力されるデータは、プラットフォーム ログ そのままではなく、それを含む Azure Event Hubs のキューです。
具体的には以下のようなフォーマットになっています。

1
2
3
4
5
6
7
8
{
"records": [
// Azure Event Hubs が受信したプラットフォーム ログ
],
"EventProcessedUtcTime": "{datetime型の日付}", // イベントが処理された日時
"PartitionId": 0, // パーティションID
"EventEnqueuedUtcTime": "{datetime型の日付}" // イベントがキューイングされた日時
}

Azure Stream Analytics で処理をして出力先に送信したいのは records 内のデータであるはずなので、この WITH 句では records からレコードを取り出して、後続の SELECT ... FROM ...FROM に与えられるようにしています。

SELECT 文

record.arrayvalue.* としています。これは、プラットフォーム ログのすべてのフィールドを指定しています。
フィールドを限定したい場合は、

1
2
3
4
SELECT
record.arrayvalue.operationName AS operationName,
record.arrayvalue.properties.metricResponseType AS metricResponseType,
...

といった要領で個別に指定し、必要に応じて任意のフィールド名を付けたりします。

INTO

ここではデフォルトのままで結構です。
後でクエリ結果の出力先のエイリアス名を指定します。

FROM

クエリの実行対象を指定します。ここでは WITH 句を使って ReaderQuery からプラットフォーム ログを取り出せるようにしていますので、 ReaderQuery を指定します。
これにより、プラットフォーム ログの各行に対して record を用いることでアクセスできるようになります。

WHERE 句

プラットフォーム ログをコンテナー別で振り分けるのが、今回の課題でした。そのための条件を記述します。
ここでは、ストレージアカウント名とコンテナー名で細かく絞り込みを行っています。


クエリ画面を開いたら、画面右下のペイン 入力のプレビュー 配下に、入力データのプレビューが表示されます。これは、ここでは実際に Azure Event Hubs から取得したキューが使われています。

作成したクエリをテストするには、 クエリのテスト をクリックします。これにより、入力のプレビューに対してクエリを実行し、 テスト結果 から (出力先に出力される) 結果を確認することができます。

テスト結果 には 結果をダウンロード のリンクがあります。こちらから結果のデータをダウンロードし、 Visual Studio Code 等で JSON のフォーマットを行うと見やすくなります。コンテナー名で検索を行い、クエリによって正しくコンテナーの振り分けができているかどうか確認しましょう。

適宜クエリの修正とテスト結果の確認を行い、クエリが完成したら、 クエリの保存 をクリックしてクエリを保存します。

今回は ADLS Gen2 のコンテナーごとに StorageRead ログを振り分けたいので、各コンテナーに読み取りアクセスがあった際のデータをプレビュー入力として使います。
実際に各コンテナーに読み取りアクセスをしてログを生成することもできますが、繰り返し行うテストのデータの作成方法としては安定性・網羅性の観点からスマートではありません。
入力のプレビューサンプル データをダウンロードする から実際のログを取得し、コンテナー名について網羅性のあるテストデータとなるような編集を行い、その JSON ファイルを 入力のプレビューサンプル入力のアップロード からアップロードするのがオススメです。

出力の追加

入力の追加と同じ要領で、出力を追加します。(あるいは、クエリ画面から出力を追加することもできます。)

今回は (Azure Stream Analytics ジョブへの入力に用いるものとは異なる) Azure Event Hubs を出力として指定します。

エラーとなった場合

入力の追加と同様、ここで ジョブへの ID の追加に失敗しました というエラーが発生する場合があります。
また、出力テストの結果、接続テストが失敗することもあります。

対処方法も基本的に同様です。手動で解決する場合、当該 Azure Stream Analytics のマネージド ID はすでに存在するため、当該 Azure Event Hubs 名前空間においてロールの追加を行ってください。

クエリの修正

INTO に与えるパラメータを、出力のエイリアスに書き換えます。

画面中央のペインに、入力と出力のエイリアス名が表示されています。これは、既存のエイリアスと、クエリ内で指定されているエイリアス名です。
既に出力は追加されていますので、出力以下にそれらのエイリアス名以外が残らないように、気をつけてクエリを編集してください。

この時点で、元々 YourOutputAlias とされていた箇所が、出力エイリアス名で置き換えられています。
最終的なクエリは以下の通りです。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
WITH ReaderQuery AS (
SELECT record FROM [evh-stloganalytics20230208a0-blob-storageread]
CROSS APPLY GetArrayElements(records) AS record
)

SELECT
record.arrayvalue.*
INTO
[evhns-LogAnalytics-20230208a-JapanEast-container0]
FROM
ReaderQuery
WHERE
record.arrayvalue.resourceId LIKE ('%/providers/Microsoft.Storage/storageAccounts/stloganalytics20230208a0/blobServices/default')
AND record.arrayvalue.properties.accountName = 'stloganalytics20230208a0'
AND record.arrayvalue.properties.objectKey = '/stloganalytics20230208a0/container0'
AND record.arrayvalue.uri LIKE ('https://stloganalytics20230208a0.blob.core.windows.net:443/container0?%')

Azure Stream Analytics の開始

[概要] 内、 開始 をクリックして、ジョブを開始します。
順次ログが処理されて、出力先に出力されます。

Appendix: 参考用の構成

ここまでの解説では、振り分け処理にフォーカスしていたため、ADLS Gen2 のコンテナー 1つで解説をしてきました。
ここでは、コンテナーを 3つに増やし、 StorageRead のログについて、振り分け後に各組織のリソースに格納されるところまでを紹介します。

まず、リソース グループからみる最終的な構成は以下の通りです。

そして、振り分け処理を行う Azure Stream Analytics ジョブのクエリは以下の通りです。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
WITH ReaderQuery AS (
SELECT record FROM [evh-stloganalytics20230208a0-blob-storageread]
CROSS APPLY GetArrayElements(records) AS record
)

SELECT
LEFT(record.arrayvalue.time, 10) AS date,
record.arrayvalue.*
INTO
[evh-stloganalytics20230208a0-container0-blob-storageread]
FROM
ReaderQuery
WHERE
record.arrayvalue.resourceId LIKE ('%/providers/Microsoft.Storage/storageAccounts/stloganalytics20230208a0/blobServices/default')
AND record.arrayvalue.properties.accountName = 'stloganalytics20230208a0'
AND record.arrayvalue.properties.objectKey = '/stloganalytics20230208a0/container0'
AND record.arrayvalue.uri LIKE ('https://stloganalytics20230208a0.blob.core.windows.net:443/container0?%')

SELECT
LEFT(record.arrayvalue.time, 10) AS date,
record.arrayvalue.*
INTO
[evh-stloganalytics20230208a0-container1-blob-storageread]
FROM
ReaderQuery
WHERE
record.arrayvalue.resourceId LIKE ('%/providers/Microsoft.Storage/storageAccounts/stloganalytics20230208a0/blobServices/default')
AND record.arrayvalue.properties.accountName = 'stloganalytics20230208a0'
AND record.arrayvalue.properties.objectKey = '/stloganalytics20230208a0/container1'
AND record.arrayvalue.uri LIKE ('https://stloganalytics20230208a0.blob.core.windows.net:443/container1?%')

SELECT
LEFT(record.arrayvalue.time, 10) AS date,
record.arrayvalue.*
INTO
[evh-stloganalytics20230208a0-container2-blob-storageread]
FROM
ReaderQuery
WHERE
record.arrayvalue.resourceId LIKE ('%/providers/Microsoft.Storage/storageAccounts/stloganalytics20230208a0/blobServices/default')
AND record.arrayvalue.properties.accountName = 'stloganalytics20230208a0'
AND record.arrayvalue.properties.objectKey = '/stloganalytics20230208a0/container2'
AND record.arrayvalue.uri LIKE ('https://stloganalytics20230208a0.blob.core.windows.net:443/container2?%')

これまでの説明では振り分け処理のための SELECT 文は 1つでしたが、今回これを 3つに増やしており、それぞれコンテナー0,1,2 と対応しています。

振り分けの出力はそれぞれ異なる Azure Event Hubs 名前空間で、その先で Azure Stream Analytics ジョブが単純な SELECT 文でログを全件取得しています。

例えば、コンテナー0 に関する StorageRead ログは、後続の Azure Stream Analytics ジョブでは以下のクエリで処理をしています。

1
2
3
4
5
6
SELECT
*
INTO
[cosmos-container0-stloganalytics20230208a0-blob-StorageRead]
FROM
[evh-stloganalytics20230208a0-container0-blob-storageread]

コンテナー1,2 についても、やり方は同様です。

それぞれの後続の Azure Stream Analytics ジョブは、今回それぞれ Azure Cosmos DB for NoSQL にログを格納することとしています。

クエリを

1
2
3
4
5
SELECT
LEFT(record.arrayvalue.time, 10) AS date,
record.arrayvalue.*
INTO
...

としたのはそのためです。パーティション キーとして年月日を使用することを想定し、 time フィールドから date フィールドを生成しています。

StorageRead のプラットフォーム ログを振り分けるしくみの、最終的な構成図は以下の通りです。

実際に各 Azure Cosmos DB for NoSQL のコレクションにログが振り分けられていることも確認できます。

将来的に StorageWrite および StorageDelete のプラットフォーム ログも同様に処理したい場合は、一連の Azure Event Hubs インスタンス (名前空間ではなく) と Azure Stream Analytics ジョブのセットを増やしていく形になります。

まとめ

Azure Stream Analytics のクエリ内で ADLS Gen2 のログをコンテナー名ごとにフィルタリングする方法について解説しました。
特に出力先の構成については、企業や組織のポリシー等に依存するところですので、それに合わせてカスタマイズしていただければと思います。
また、今回は ADLS Gen2 のプラットフォーム ログのうち StorageRead のみを扱いましたが、同じ要領で他のログを振り分けることができます。 (メトリックにはコンテナー名が含まれていないので、今回の課題に則した振り分けはできません。)
ぜひこれらのログを活用し、セキュリティ監査に役立ててください。

See also