最近、Cloud Functionsを利用してサーバレスでシステムを構築しています。けれども、まだまだサーバレスでシステムを作り慣れていないので、Cloud Functionsが正しく実行されたのか、ちょっと不安なので確認したいと考えていました。そこで、Cloud Functionsが実行完了した際に、それを検知してSlackにメッセージを送信する仕組みを作ってみたので、その説明をします。
TL;DR
- Cloud Functionsが実行完了した際に通知してくれる仕組みを作りたかった
- Cloud Functionsの実行ログが流れる stackdriver loggingを利用して、フィルタを設定する
- フィルタに一致するログを受け取ったら pub/subでメッセージを送信
- 上のメッセージをフックにしてSlack通知を行う Cloud Functionを実装
- それによって目的を達成するシステムを実装した
システムの全体像
Stackdriver Loggingのログエクスポート機能を用いて、指定したフィルタに一致するログを取得しCloud Pub/Subでメッセージとして流します。そのメッセージを受け取った Cloud FunctionsがSlackに通知します。
前提知識
Stackdriver Loggingについて
https://cloud.google.com/logging/?hl=ja
Stackdriver Loggingはログデータやイベントを格納、検索、分析、モニタリング、通知するためのサービスです。Stackdriver LoggingのAPIを使えば、あらゆるソースからデータを取り込むことができます。GCPのサービスは、基本的にStackdriver Loggingにログが送られるようになっており、すぐに利用することができます。
Stackdriver Loggingではすべてのクラウドログを一箇所に集めて管理します。そのためそれら膨大な種類のデータの中から、価値のあるデータを拾い集め、適切な処理を促す仕組みが充実しています。今回はその中のExport機能を利用してCloud Functionsの実行通知をすることにしました。
Stackdriver LoggingのExport機能について
https://cloud.google.com/logging/docs/export/?hl=ja
Stackdriver Loggingには、ログをエクスポートする機能があります。エクスポートする用途としては下記の項目が挙げられます。
- ログを長期間保存するため。(通常のログは30日間程度保持されます)
- ログを分析するため
- 他のアプリケーションで利用するため
上記の用途で利用できるようにするため、Stackdriver Loggingでは Cloud Storage, BigQuery, Cloud Pub/Subの3つにログをエクスポートすることができます。
StackdriverでLogをエクスポートする設定を理解するためにはsinkと呼ばれるオブジェクトについて理解する必要があります。Sinkオブジェクトは、自身の名前、エクスポートするログを選択するためのフィルタ、そしてフィルタに引っかかったログのエクスポート先の3つの要素から構築されます。
Logging自体には、ログをエクスポートする上で料金や制限は存在しません。けれども、エクスポート割きでのログデータの保存や送信には料金が掛かります。
Sinkはproject, organizations, billing_accounts単位で設定できます。そのため、監査ログを一元管理したいときなどはorganizationsに対してSinkの設定をするなど、用途に応じて適用する範囲を設定する必要があります。
設定と実装
要件
「sample_cloud_function
という名前の cloud functionsが実行完了した際に、その実行完了のログを検知してSlackに通知する」という要件で Stackdriver LoggingのログをSlackに通知してみます。
フィルタの設定
フィルタの設定については、こちらのページに設定方法が記載されています。設定に利用できるプロパティについては、こちら に記載されています。なお、resource type毎のlabelの値については、資料を見つけることができなかったので記載していません。
上記資料をもとにフィルタを自分で作成してみます。今回はsample_cloud_function
という名前のcloud functionが実行を完了したら、任意のpub/sub topicにメッセージを送信する必要があります。その要件を満たすフィルタは下記のようなります。
resource.type = "cloud_function" resource.labels.region = "asia-northeast1" resource.labels.function_name = "sample_cloud_function" textPayload: "finished"
なおフィルタ設定時のベストプラクティスについては、下記のように記述されています。僕は毎回判断することが手間だったので、必ず引用符を用いるようにしています。
ベスト プラクティス: フィールド値と比較する文字列は引用符で囲むようにしてください。これにより、比較の意味が変わったり、デバッグが困難になるような誤りを防ぐことができます。文字の後に連続した文字、数字、アンダースコア(_)が続く単語の場合は、引用符を省略できます。
エクスポート先の設定 [terraform]
https://www.terraform.io/docs/providers/google/r/logging_project_sink.html
今回はPub/Subの cloud-functions-activity
というTOPICに対してメッセージを送信することにしています。上記terraformのドキュメントを見ると、下記のように設定すると書かれています。
destination - (Required) The destination of the sink (or, in other words, where logs are written to). Can be a Cloud Storage bucket, a PubSub topic, or a BigQuery dataset. Examples: "storage.googleapis.com/[GCS_BUCKET]" "bigquery.googleapis.com/projects/[PROJECT_ID]/datasets/[DATASET]" "pubsub.googleapis.com/projects/[PROJECT_ID]/topics/[TOPIC_ID]" The writer associated with the sink must have access to write to the above resource.
unique_writer_identity
については、projectをまたいで利用する場合はtrueにする必要があります。将来的に複数のprojectのcloud functionsのlogをここで通知したいと考えているので、ここでは true
としました。
resource "google_logging_project_sink" "sample_cloudfunction" { name = "sample_cloudfunction_sink" destination = "pubsub.googleapis.com/projects/xxx/topics/cloud-functions-activity" filter = "resource.type = 'cloud_function' AND resource.labels.region = 'asia-northeast1' AND resource.labels.function_name = 'sample_cloudfunction' AND textPayload: 'finished'" unique_writer_identity = true }
デプロイの設定 [cloudbuild]
cloudbuildを利用してデプロイしているので、その設定を載せておきます。とはいえ、重要なポイントは --trigger-topic
のオプションの引数が、エクスポート先の設定で指定した cloud-functions-activity
というトピックを指定しているという点のみです。
- name: 'gcr.io/cloud-builders/gcloud' args: - beta - functions - deploy - slack_reporter - --region=asia-northeast1 - --stage-bucket=cf-bucket-for-xxx - --trigger-topic=cloud-functions-activity - --runtime=nodejs8
Pub/Sub Messageの内容
https://cloud.google.com/logging/docs/export/using_exported_logs?hl=ja#pubsub-organization
上記ドキュメントによると、Cloud Pub/Subによってストリーミングされるログは、下記のようなフォーマットになります。
{ "receivedMessages": [ { "ackId": "dR1JHlAbEGEIBERNK0EPKVgUWQYyODM...QlVWBwY9HFELH3cOAjYYFlcGICIjIg", "message": { "data": "eyJtZXRhZGF0YSI6eyJzZXZ0eSI6Il...Dk0OTU2G9nIjoiaGVsbG93b3JsZC5sb2cifQ==", "attributes": { "compute.googleapis.com/resource_type": "instance", "compute.googleapis.com/resource_id": "123456" }, "messageId": "43913662360" } } ] }
dataフィールドを base64
でデコードすると、下記のようなLogEntry オブジェクトが取得できます。Cloud Functionの中では下記のObjectを利用してSlackへ送信するメッセージを構築します。
{ "log": "helloworld.log", "insertId": "2015-04-15|11:41:00.577447-07|10.52.166.198|-1694494956", "textPayload": "Wed Apr 15 20:40:51 CEST 2015 Hello, world!", "timestamp": "2015-04-15T18:40:56Z", "labels": { "compute.googleapis.com\/resource_type": "instance", "compute.googleapis.com\/resource_id": "123456" }, "severity": "WARNING" }
Cloud Functionの実装
Pub/Sub Messageの内容で確認したObjectを利用してSlackに通知するスクリプトは下記のようになります。このとき SlackのTokenやChannel ID については、こちらの公式ドキュメント に記載されている方法で行っていますが、今回は省略させていただきます。
//index.ts export async function slack_reporter(data: any) { const dataBuffer = Buffer.from(data.data, "base64"); const logEntry = JSON.parse(dataBuffer.toString("ascii")); const client = await SlackClient.create(); await client.post(`cloud function: ${logEntry.resource.labels.function_name} textPayload: ${logEntry.textPayload}`); }
// SlackClient.ts import { WebAPICallResult, WebClient } from "@slack/client"; export class SlackClient { public static async create(): Promise<SlackClient> { if (!this.instance) { this.instance = new SlackClient(); } return this.instance; } private static instance: SlackClient; private slackCleint: WebClient; private readonly channel: string = process.env.CHANNEL_ID as string; private readonly token: string = process.env.SlackToken as string; constructor() { this.slackCleint = new WebClient(token); } public async post(text: string): Promise<WebAPICallResult> { return this.slackCleint.chat.postMessage({ username: "ERP-HR Bot", channel: channel, text, }); } }
結果
ということで sample_cloud_functionsの実行完了を検知してSlack通知する仕組みを作ることができました。今後は、severity
のレベルに応じてメッセージの内容を変更し、迅速に対応が必要なものがあれば即座に分かるようなところまで作ろうかなーとか考えたりしています。
所感
Stackdriver Logging、めちゃくちゃ便利です。一度 Stackdriver Loggingに集約することによって、ログの処理を一元管理することができます。一元管理することで、ログの前処理やBigQueryへのインポート等々、様々な処理を共通化できる気配を感じます。今回はこのような使い方をしましたが、RailsのLogなどを全てBigQueryに入れてしまって、BigQueryで分析することなどもできるのではないかな〜と思ったりしてます(昔Amazon Athenaでやってて、それなりに便利だった)。
もっとガシガシ使って、ポテンシャルを引き出していこう。