- 限定コンテンツへのアクセス
- 他ユーザーとのつながり
- あなたの体験を共有
- サポート情報の発見
この記事ではCortex XDRに取り込んだrawデータをパースする方法について説明します。
またこちらの記事で説明したデータをパースすることを前提としていますので、
ご一読いただければと思います。
データをパースするにはParsing Rulesを使います。
rawデータをパースすることもできますが、既存の構造化データ(CSV,TSVなど)において
ある特定のフィールド内の文字列を別の意味のフィールドとして切り出したい場合も、
このParsing Rulesを使うことができます。
Parsing Rulesは3つのセクションがあり、用途によって使い分けます。
以下の例はINGESTセクションを用いたもので、INGESTセクション配下にrawデータのパースルールを記述します。
COLLECTセクションも基本同じですが、broker VMでパースする場合に使用します。
COLLECTセクションを使用することで、broker VM側で不要なデータを排除したり、
新たなフィールドを追加することができます。
CONSTは、プログラミング言語で出てくるようなconstと同じように定数を設定するセクションです。
CONSTで設定した変数は複数のINGESTセクションで再利用することができます。
サンプルのデータはこちらです。
Apache HTTPサーバーのアクセスログのサンプルレコード(_raw_logに書かれるデータ)
以下のようにルールを追加します。あくまでサンプルなので、より良いスマートな方法があれば
色々試してみてください。
青字は、brokerVMでデータ取り込みの際指定したvendor, product、その結果作成されたdatasetの名前を設定します。
青い太字のところが最終的にrawデータから抽出するフィールドです。
[INGEST:vendor = Apache , product = httpserver , target_dataset = apache_httpserver_raw, no_hit = keep]
alter remote_ip = trim(arrayindex(split(_raw_log),0))
,remote_log_name = trim(arrayindex(split(_raw_log),1))
,userid = trim(arrayindex(split(_raw_log),2))
,datetime = trim(arrayindex(regextract(_raw_log, "\[.+\]") ,0),"[]")
,others = regextract(_raw_log, "\".*\"" )
| alter others = split(trim(arrayindex(others,0)), "\" " )
,method = trim(arrayindex(others,0),"\"")
,useragent =trim(arrayindex(others,2)," ")
,status = arrayindex(split(arrayindex(others,1)," "),0)
,length = arrayindex(split(arrayindex(others,1)," "),1)
,referrer = trim(arrayindex(split(arrayindex(others,1)," "),2),"\"")
| alter request_method = arrayindex(split(method," "),0)
,path = arrayindex(split(method," "),1)
,http_version = arrayindex(split(method," "),2)
| fields remote_ip,datetime,remote_log_name,userid,request_method,path,http_version,
status,length,referrer,useragent,_raw_log,_collector_type,_log_source_file_name_broker_ip_address,_broker_device_name,_bundle_id,_broker_hostname,_collection_timestamp,_collector_name,_collector_type,_broker_ip_address,_broker_device_id,_reporting_device_ip,_log_source_file_name,_log_source_file_path,_final_reporting_device_ip;
上記のサンプルログを例に簡単にXQLの解説(青字)します。
alter remote_ip = trim(arrayindex(split(_raw_log),0))
ログを半角スペースで区切り、配列の1番目のを取得
90.208.149.198 - - [28/Jan/2023:12:07:53 +0000] "PUT /explore HTTP/1.0" 200 5037 "hxxp://thompson.com/register.php" "Mozilla/5.0 (Macintosh; Intel Mac OS X 10_8_5) AppleWebKit/5361 (KHTML, like Gecko) Chrome/14.0.873.0 Safari/5361"
,remote_log_name = trim(arrayindex(split(_raw_log),1))
ログを半角スペースで区切り、配列の2番目のを取得
90.208.149.198 - - [28/Jan/2023:12:07:53 +0000] "PUT /explore HTTP/1.0" 200 5037 "hxxp://thompson.com/register.php" "Mozilla/5.0 (Macintosh; Intel Mac OS X 10_8_5) AppleWebKit/5361 (KHTML, like Gecko) Chrome/14.0.873.0 Safari/5361"
,userid = trim(arrayindex(split(_raw_log),2))
ログを半角スペースで区切り、配列の3番目のを取得
90.208.149.198 - - [28/Jan/2023:12:07:53 +0000] "PUT /explore HTTP/1.0" 200 5037 "hxxp://thompson.com/register.php" "Mozilla/5.0 (Macintosh; Intel Mac OS X 10_8_5) AppleWebKit/5361 (KHTML, like Gecko) Chrome/14.0.873.0 Safari/5361"
,datetime = trim(arrayindex(regextract(_raw_log, "\[.+\]") ,0),"[]")
ログから[]で括られている文字列を配列として取得し、配列の1番目のを取得、その後[]を除去
90.208.149.198 - - [28/Jan/2023:12:07:53 +0000] "PUT /explore HTTP/1.0" 200 5037 "hxxp://thompson.com/register.php" "Mozilla/5.0 (Macintosh; Intel Mac OS X 10_8_5) AppleWebKit/5361 (KHTML, like Gecko) Chrome/14.0.873.0 Safari/5361"
,others = regextract(_raw_log, "\".*\"" )
ログからカンマ(")で括られた文字列を配列として取得
90.208.149.198 - - [28/Jan/2023:12:07:53 +0000] "PUT /explore HTTP/1.0" 200 5037 "hxxp://thompson.com/register.php" "Mozilla/5.0 (Macintosh; Intel Mac OS X 10_8_5) AppleWebKit/5361 (KHTML, like Gecko) Chrome/14.0.873.0 Safari/5361"
| alter others = split(trim(arrayindex(others,0)), "\" " )
othersを配列に変換(3つの要素になる)
"PUT /explore HTTP/1.0" 200 5037 "hxxp://thompson.com/register.php" "Mozilla/5.0 (Macintosh; Intel Mac OS X 10_8_5) AppleWebKit/5361 (KHTML, like Gecko) Chrome/14.0.873.0 Safari/5361"
,method = trim(arrayindex(others,0),"\"")
"PUT /explore HTTP/1.0" 200 5037 "hxxp://thompson.com/register.php" "Mozilla/5.0 (Macintosh; Intel Mac OS X 10_8_5) AppleWebKit/5361 (KHTML, like Gecko) Chrome/14.0.873.0 Safari/5361"
,useragent =trim(arrayindex(others,2)," ")
"PUT /explore HTTP/1.0" 200 5037 "hxxp://thompson.com/register.php" "Mozilla/5.0 (Macintosh; Intel Mac OS X 10_8_5) AppleWebKit/5361 (KHTML, like Gecko) Chrome/14.0.873.0 Safari/5361"
,status = arrayindex(split(arrayindex(others,1)," "),0)
"PUT /explore HTTP/1.0" 200 5037
,length = arrayindex(split(arrayindex(others,1)," "),1)
"PUT /explore HTTP/1.0" 200 5037
,referrer = trim(arrayindex(split(arrayindex(others,1)," "),2),"\"")
"PUT /explore HTTP/1.0" 200 5037 "hxxp://thompson.com/register.php" "Mozilla/5.0 (Macintosh; Intel Mac OS X 10_8_5) AppleWebKit/5361 (KHTML, like Gecko) Chrome/14.0.873.0 Safari/5361"
| alter request_method = arrayindex(split(method," "),0)
methodを配列に変換(3つの要素になる)し、1番目の要素を取得
"PUT /explore HTTP/1.0" 200 5037
,path = arrayindex(split(method," "),1)
methodを配列に変換(3つの要素になる)し、2番目の要素を取得
"PUT /explore HTTP/1.0" 200 5037
,http_version = arrayindex(split(method," "),2)
methodを配列に変換(3つの要素になる)し、3番目の要素を取得
"PUT /explore HTTP/1.0" 200 5037
Simulateタブを選択したの後、テストするログデータにチェックを入れ、Simulateボタンをクリックします。
Parsinng Ruleでパースされた結果がLogs Outputに表示されます。
「Show more」を選択すると、
パースされたフィールドが生成されていることを確認できます。
意図したフィールドをパースできなかった場合は、ルールの修正とSimulateを繰り返して
調整してみてください。
datasetを指定するとParsing Rulesで抽出されたフィールドが生成されていることを確認できます。
このようにして、rawデータをパースしXQLによる検索、ダッシュボードの作成、相関分析などに
ご利用いただくことができます。
今回ルール作成に使った関数(trim, arrayindex, split, regextract)の詳細は
以下にリンクを記載しておきますので参考にしてみてください。
Parsing Rulesについてはこちら
XQLの関数についてはこちら