selmertsxの素振り日記

ひたすら日々の素振り内容を書き続けるだけの日記

functions-framework を利用したGoogle Cloud Functionsにおいてpubsubのテストをする方法

このドキュメントに書いてあること

これまで Google Cloud Functionsをローカル環境でテストするときは、cloud-functions-emulator という公式で提供されているツールが一般的に利用されていました。しかしながらこのツールは現在archiveされており、作者が2019年5月16日に作成した issueによると functions-framework という新しいツールの利用を推奨しています。

このドキュメントでは、functions-framework を利用して Google Cloud FunctionsをLocalから実行する方法、特に公式では提供されていない eventsトリガーを利用してpubsubのメッセージを読み込ませる方法について記載します。

※ なお、2019年6月26日になっても公式ドキュメントでは @google-cloud/functions-emulatorを利用するようにと書かれています。 https://cloud.google.com/functions/docs/emulator?hl=ja#getting_started

スクリーンショット 2019-06-26 16.07.17.png

functions-frameworkとは?

Node.jsを利用してFaaSを書くためのOSSフレームワークです。このフレームワークを利用して書かれたコードは Cloud Functionsだけでなく、Cloud Runなどでも利用することができます。

そのようなことを目的としていると公式ドキュメントには書かれているものの、Cloud Run複数のAPIを持つ場合のケースに対応しておらず、現状では Cloud Functionsを Localで起動するためのツールとして使われることがメインになりそうです。Cloud Functionsでの利用であれば、Localの開発環境のみinstallするだけですぐに利用することができます。

npm i -D @google-cloud/functions-framework

このあたりの実装を読んでみると、内部でexpressサーバーを起動して、利用者が作成した functionをラッピングしていることが見て取れます。実行は簡単で、ラッピングしたいfunctionを下記のように指定してコマンドを実行すれば動きます。

npx functions-framework --target=helloWorld

実際に動かしてみる

プログラムの用意

僕が実際に利用しているプログラムからの抜粋です。cloud pubsubのmessageを受け取った後、その中身を見てSlackに通知しています。

export async function slack_reporter(data: any) {
  const dataBuffer = Buffer.from(data.data, "base64");
  const body = dataBuffer.toString("ascii");
  const client = await SlackClient.create();
  await client.post(body);
}
import { WebAPICallResult, WebClient } from "@slack/client";

export class SlackClient {
  public static async create(): Promise<SlackClient> {
    if (!this.instance) {
      const token = process.env.SLACK_TOKEN as string;
      const channel = process.env.SLACK_CHANNEL_ID as string;
      this.instance = new SlackClient(token, channel);
    }

    return this.instance;
  }

  private static instance: SlackClient;

  private slackCleint: WebClient;
  private channel: string;

  constructor(token: string, channel: string) {
    this.slackCleint = new WebClient(token);
    this.channel = channel;
  }

  public async post(text: string): Promise<WebAPICallResult> {
    return this.slackCleint.chat.postMessage({
      username: "ERP-HR Bot",
      channel: this.channel,
      text,
    });
  }
}

pubsubのmessageを作成する

pubsubで送信されるmessageのフォーマットについては、公式ドキュメントによると下記のように指定されています。

{
  "data": string,
  "attributes": {
    string: string,
    ...
  },
  "messageId": string,
  "publishTime": string
}

このように色々なパラメータが存在していますが、今回のケースで利用するのは data のみです。理由については公式ドキュメントに記載されています。ということで messageを作成していきます。この dataパラメータはbase64エンコードする必要があるため、下記のコマンドでエンコードします。

$ echo -n "hogehoge" | base64
aG9nZWhvZ2U=

これをmessageで送信するjsonに組み込むと下記のようになります。

{
  "data": "aG9nZWhvZ2U="
}

Local環境でCloud Functionsを起動する

公式のドキュメントによると、functions frameworkを起動する際のoptionは --port, --target, --signature-typeの3点です。ここでは実行したい functionは slack_reporterであり、トリガーはpubsubにしたいので、下記のように設定をしました。

$ npx functions-framework --target=slack_reporter --signature-type=event

Serving function...
Function: slack_reporter
URL: http://localhost:8080/

呼び出し

Cloud FunctionもLocalで起動したので、次は起動しているCloud Functionを実行していきます。ここで Functions Frameworkのissueを読んでいくと次のようなissueが見つかります。

https://github.com/GoogleCloudPlatform/functions-framework-nodejs/issues/37

そしてこちらのPR上で、どのようにmessageを送信することが適切なのか議論されています。ということで、これから記述する方法は将来正しくない方法になってしまう可能性がありますが、とはいえ今試す必要がある人がいるとも思うので書いておきます。

curlを使って下記のように実行すると、Localで動いているCloud Functionsが pubsubのメッセージを読み込むことができます。

$ curl -X POST -H 'Content-Type:application/json; charset=utf-8' \
  -H 'ce-type: xxx' \
  -H 'ce-specversion: xxx' \
  -H 'ce-source: xxx' \
  -H 'ce-id: xxx' \
  -d "$(cat mock_pubsub.json)" http://localhost:8080

すると、こんな感じでslackに通知されました。めでたしめでたし。

スクリーンショット 2019-06-26 18.47.51.png

この理由については、このあたりのコードに書いてあるのですが、もうちょっと追加調査したいことがあるので、またの機会に書こうと思います〜。

SAM Localを利用してLocalで動かしているAWS Lambda からdynamodb-localにアクセスする方法

この記事に書かれていること

  • SAM CLIの環境構築方法
  • SAM CLIを使ってLocalでLambdaを起動する方法
  • SAM CLIを使ってLocalで起動しているLambdaから、Localで用意したDynamoDB containerにアクセスする方法
  • これらの処理を僕が趣味で作っているAWS Lambdaを例に説明します。

この記事に書かれていないこと

  • SAM CLIとは何か?
  • Lambdaを利用する際のwebpackの設定

利用環境

  • nodejs8.10
  • TypeScript 3.4.5
  • SAM CLI 0.15.0
  • python 3.7.2

事前準備

aws-sam-cliのinstall

Installing the AWS SAM CLI on macOS というAWS公式の手順に則ってinstallします。

aws-sam-cliは、pythonのバージョン 2.7、3.6、3.7 に対応しています。もし手元の環境がそれらのバージョンに一致していないのであれば、対応しているバージョンのpythonをinstallしましょう。なお2.7は2020年の1月にはメンテナンスが終了されますので、今から入れるのであれば 3以上にすると良いでしょう。

$ brew install pyenv
$ brew install pyenv-virtualenv
$ pyenv install 3.7.2
$ pyenv local 3.7.2
$ brew tap aws/tap
$ brew install aws-sam-cli
$ sam --version
SAM CLI, version 0.15.0

dynamodb-localのdocker imageをpull

こちらもamazon公式のdocker imageを利用します。下記のコマンドを実行してdocker imageをpullしましょう

docker pull amazon/dynamodb-local

SAM Localテスト用データ作成

aws-sam-cliを使ってLocalからLambdaを起動するためのデータを作成します。今回は、シンプルにAPI Gatewayから起動することにします。

sam local generate-event \
  apigateway aws-proxy \
  --path datadog_report \
  --method GET > events/event_apigateway.json

このコマンドによって作成されたjsonこちらになります。

実装

docker-composeの設定

# docker-compose.yml
version: "3"

services:
  dynamodb-local:
    container_name: dynamodb
    image: amazon/dynamodb-local
    build: ./
    ports:
      - 8000:8000
    command: -jar DynamoDBLocal.jar -dbPath /data -sharedDb
    volumes:
      - ./data:/data
    networks:
      - lambda-local
networks:
  lambda-local:
    external: true

この設定において重要な点は3点あります。

1点目は、DynamoDB localのコマンドオプションに -dbPath /data を指定している点です。-dbPathでdockerがマウントしているvolumeに書き出すことによって、指定したディレクトリにデータを吐き出させるようにしています。こうすることで、データを永続化しています。-inMemoryオプションを使ってしまうと、毎回データが削除されてしまうので、開発時にそのオプションを利用するのは少し手間が掛かってしまうでしょう。(テストのときはあると良さそうです)

2点目は、DynamoDB localのコマンドオプションに、 -sharedDbオプションを指定しているところです。-sharedDbオプションを指定しない場合、データはmyaccesskeyid_region.db というフォーマットで格納されます。これはこれで、毎回起動するときにそのあたりのパラメータをちゃんと設定できていればよいのですが、今回は簡単のため-sharedDbオプションを指定しています。

3点目は、networksを指定しているところです。aws-sam-localによってlocalで実行されるLambdaは、起動時にdockerのnetworkを指定することができます。ここで指定したnetworksを aws-sam-localの起動時にも利用することによって、localで起動しているLambdaから、このdocker containerにアクセスすることができるようになります。

これらDynamoDB localのオプション内容については、公式ドキュメントに記載があるので参照してください。ということで設定ができたので、下記コマンドを実行してDynamoDB Localの環境を構築しましょう。

docker network create lambda-local
docker-compose up

typescript

ぼくが趣味で作っている、AWS Lambdaのコードから取ってきたやつです。 https://github.com/selmertsx/datadog_slack_report

今思えばちょっと設計に改善の余地ありですな...。この後新しい機能を追加予定なので、そのときにでもリファクタリングしようと思います。一旦必要そうなもののみ引っ張ってきました。

// https://github.com/selmertsx/datadog_slack_report/blob/c4e59fdb60b2e190bd58f7e823268d8b697e3dfb/src/index.ts
import { APIGatewayEvent, Callback, Context } from "aws-lambda";
import moment from "moment-timezone";
import "source-map-support/register";
import { Billing } from "./Billing";
import { SlackClient } from "./SlackClient";

export async function datadog_handler(event: APIGatewayEvent, context: Context, callback: Callback) {
  const fromTime = moment({ hour: 0, minute: 0, second: 0 })
    .tz("Asia/Tokyo")
    .subtract(1, "days")
    .format("X");

  const toTime = moment({ hour: 23, minute: 59, second: 59 })
    .tz("Asia/Tokyo")
    .subtract(1, "days")
    .format("X");

  try {
    const billing = new Billing();
    const report = await billing.calculate(fromTime, toTime);
    const slackClient = new SlackClient();
    await slackClient.post(report.slackMessageDetail());

    callback(null, {
      statusCode: 200,
      headers: {
        "Content-Type": "application/json;charset=UTF-8",
      },
      body: JSON.stringify({ status: 200, message: "OK" }),
    });
  } catch (err) {
    throw new Error(err);
  }
}
// https://github.com/selmertsx/datadog_slack_report/blob/e078d2427806f3f9b402a3af1fbe79c98b0e2a5a/src/DynamoDBClient.ts
import { DynamoDB } from "aws-sdk";
import { ReservedPlan } from "./typings/datadog";

export class DynamoDBClient {
  private client = new DynamoDB.DocumentClient({
    endpoint: "http://dynamodb:8000", // ここが重要!!!!!
    region: "ap-north-east1",
  });

  public getReservedPlans(): Promise<ReservedPlan[]> {
    return new Promise<any>((resolve: any, rejects: any) => {
      this.client.scan({ TableName: "DatadogPlan" }, (error, data) => {
        if (error) {
          rejects(error);
        } else if (data.Items == undefined) {
          resolve([]);
        } else {
          const results: ReservedPlan[] = [];
          data.Items.forEach(item => {
            results.push({ productName: item.Product, plannedHostCount: item.PlannedHostCount });
          });
          resolve(results);
        }
      });
    });
  }
}

さて、長々とコードが書いてあるのであれなのですが、重要なのは1点だけです。DynamoDBのendpointについて http://${dynamodb-localのcontainer名}:8000 としていることです。これによってSAM Localで起動したAWS Lambdaから、LocalのDynamoDBにアクセスすることができます。

  private client = new DynamoDB.DocumentClient({
    endpoint: "http://dynamodb:8000", // ここが重要!!!!!
    region: "ap-north-east1",
  });

起動方法

ということで、ここまでやったら後は起動するだけ。起動する際は、 sam local invoke コマンドの --docker-network オプションに、先程 docker-compose.yml で指定した network名を設定してみましょう。具体的には下記のコマンドになります。

$ npx webpack --config webpack.prod.js
$ sam local invoke --docker-network lambda-local -e events/event_apigateway.json --env-vars .env.json DatadogReport

2019-04-25 10:30:30 Found credentials in environment variables.
2019-04-25 10:30:30 Invoking index.datadog_handler (nodejs8.10)

Fetching lambci/lambda:nodejs8.10 Docker container image......
2019-04-25 10:30:33 Mounting /Users/shuhei.morioka/project/speee/datadog_slack_report as /var/task:ro,delegated inside runtime container
START RequestId: dbefc77e-42dc-1d21-a444-0abc44875df5 Version: $LATEST
END RequestId: dbefc77e-42dc-1d21-a444-0abc44875df5
REPORT RequestId: dbefc77e-42dc-1d21-a444-0abc44875df5  Duration: 4299.35 ms    Billed Duration: 4300 ms        Memory Size: 256 MB     Max Memory Used: 121 MB

ということで、Localで動いているAWS LambdaからDynamoDB Localにアクセスすることができました〜。

TypeScriptで書かれているCloud FunctionsからCloud PubSubのREST APIを叩く

この記事に書いてあること

この記事には、 TypeScriptで書かれているCloud FunctionsからCloud PubSubのAPIを叩く方法 が書かれています。それだけのことなのですが、現在GCPから公式で提供されているライブラリで実現するにはとても大変でした。

僕が把握している限り、firestoreを利用する際も同じ問題が発生しています。そのような問題に対処する際に参考になればと思い書きました。

概要

TypeScriptで書かれたCloud Functionsから、任意の条件を満たしたときに Cloud PubSubの特定のTOPICに対してメッセージを送信しようとしました。Cloud PubSubをnodeで利用する際、公式から提供されているのは @google-cloud/pubsub というライブラリです。しかしながら、このライブラリはCloud Functionsで利用することは難しいです。なぜなら、@google-cloud/pubsub で利用されている node-pre-gyp は、webpackでの利用を想定していないからです。(きっと現状、多くの人が Cloud Functions のbundleにはwebpackを利用していることでしょう! )

No. I designed node-pre-gyp and I've never used webpack nor do I understand what it is. So, its definitely not supposed to work. That said if it is feasible to get it working, I'd review a PR with tests. Until then I'll close this issue to avoid confusion/the assumption that things should work. (Issueのコメントから抜粋)

そこで、今回はCloud Functionsの中からCloud PubSub のAPIを直接実行するような方法で実装を行いました。以下、その詳細について記述します。

環境

  • Cloud Functions: runtime=nodejs8
  • TypeScript
  • webpackでbundle
  • Cloud Functionsを実行するサービスアカウントには、任意のPubSub Topicに対してメッセージを送信する権限を付与済み

問題

@google-cloud/pubsub を利用して特定のtopicに対してpubsubをするとき、local環境で ts-node を使って実行すると問題なく動作しました。しかしながら、webpackでbundleして cloud functionsにデプロイしようとしたところ、下記のようなエラーが出ました。

Detailed stack trace: Error: package.json does not exist at /package.json
    at Object.exports.find (webpack:///./node_modules/grpc/node_modules/node-pre-gyp/lib/pre-binding.js?:18:15)
    at Object.eval (webpack:///./node_modules/grpc/src/grpc_extension.js?:29:12)
    at eval (webpack:///./node_modules/grpc/src/grpc_extension.js?:63:30)
    at Object../node_modules/grpc/src/grpc_extension.js (/srv/index.js:11604:1)
    at __webpack_require__ (/srv/index.js:20:30)
    at eval (webpack:///./node_modules/grpc/src/client_interceptors.js?:144:12)
    at Object../node_modules/grpc/src/client_interceptors.js (/srv/index.js:11557:1)
    at __webpack_require__ (/srv/index.js:20:30)
    at eval (webpack:///./node_modules/grpc/src/client.js?:35:27)
    at Object../node_modules/grpc/src/client.js (/srv/index.js:11545:1)

この問題について調査していったところ、node-pre-gypのissue にたどり着きました。

対応方法

node-pre-gyp がwebpackでの利用を想定していないということなので、 @google-cloud/pubsub の利用を諦めて、直接 REST APIでpubsubを実行することにしました。GCPにおいてリソースを操作する方法は REST APIとgRPCの2つあります。nodeでgRPCで実行する際には node-pre-gyp が必須となってしまうため、今回は REST API で Cloud PubSubを操作することにしました。

メッセージ送信

GCPから提供されている Cloud PubSubのREST APIドキュメントは下記になります。

https://cloud.google.com/pubsub/docs/reference/rest/v1/projects.topics/publish https://cloud.google.com/pubsub/docs/reference/rest/v1/PubsubMessage

こちらのドキュメントには、Cloud PubSubでメッセージを送信するために必要なパラメータは下記のようになると記載されています。(こちらは必要最小限のデータのみを載せています)

POST https://pubsub.googleapis.com/v1/projects/${project_name}/topics/${topic_name}:publish
{
  "messages": [ { "data": string(Base 64でエンコードされている文字列) } ]
}

REST APIで実施する場合は、認証をする必要があります。認証に関するドキュメントはこちらです。 https://developers.google.com/identity/protocols/OAuth2#serviceaccount

認証はOAuth 2.0で行う必要があります。GoogleのサーバーからAccessTokenを取得し、そのAccessTokenをAPI Requestのbearer トークンタイプとして渡す必要があります。このとき、API Requestは下記のようになります。

POST https://pubsub.googleapis.com/v1/projects/${project_name}/topics/${topic_name}:publish
Content-Type: "application/json"
Authorization: Bearer ${accessToken}

{
  "messages": [ { "data": string(Base 64でエンコードされている文字列) } ]
}

参考: OAuth2.0 rfc アクセストークンを利用したAPI Request

コード

上記 REST API をリクエストするnodeのコードは下記のようになります。googleapis にはアクセストークンを取得するメソッドが存在するので、簡単に実現することができました。

import { google } from "googleapis";
import axios from "axios";
const url = "https://pubsub.googleapis.com/v1/projects/${project_name}/topics/${topic_name}:publish";

async function main() {
  const token = await google.auth.getAccessToken();
  const data = { messages: [ { data: new Buffer("hogehoge").toString("base64") }]}
  const config = {
    headers: {
      "Content-Type": "application/json",
      Authorization: `Bearer ${token}`,
    },
  };
  return await axios.post(url, data, config);
}

main();

結果

$ npx ts-node sample.ts
{ messageIds: [ '512512244825408' ] }

ということで、Cloud PubSubでメッセージを送信できていることを確認できました!(ちなみに、諸々の事情で載せてはいませんが、Cloud Functionsから実行しても問題なく動きました )

Stackdriver Logging を利用して特定の条件に一致したLogの情報をSlackに通知する

最近、Cloud Functionsを利用してサーバレスでシステムを構築しています。けれども、まだまだサーバレスでシステムを作り慣れていないので、Cloud Functionsが正しく実行されたのか、ちょっと不安なので確認したいと考えていました。そこで、Cloud Functionsが実行完了した際に、それを検知してSlackにメッセージを送信する仕組みを作ってみたので、その説明をします。

TL;DR

  • Cloud Functionsが実行完了した際に通知してくれる仕組みを作りたかった
  • Cloud Functionsの実行ログが流れる stackdriver loggingを利用して、フィルタを設定する
  • フィルタに一致するログを受け取ったら pub/subでメッセージを送信
  • 上のメッセージをフックにしてSlack通知を行う Cloud Functionを実装
  • それによって目的を達成するシステムを実装した

システムの全体像

f:id:selmertsx:20190325202219p:plain

Stackdriver Loggingのログエクスポート機能を用いて、指定したフィルタに一致するログを取得しCloud Pub/Subでメッセージとして流します。そのメッセージを受け取った Cloud FunctionsがSlackに通知します。

前提知識

Stackdriver Loggingについて

https://cloud.google.com/logging/?hl=ja

Stackdriver Loggingはログデータやイベントを格納、検索、分析、モニタリング、通知するためのサービスです。Stackdriver LoggingのAPIを使えば、あらゆるソースからデータを取り込むことができます。GCPのサービスは、基本的にStackdriver Loggingにログが送られるようになっており、すぐに利用することができます。

Stackdriver Loggingではすべてのクラウドログを一箇所に集めて管理します。そのためそれら膨大な種類のデータの中から、価値のあるデータを拾い集め、適切な処理を促す仕組みが充実しています。今回はその中のExport機能を利用してCloud Functionsの実行通知をすることにしました。

Stackdriver LoggingのExport機能について

https://cloud.google.com/logging/docs/export/?hl=ja

Stackdriver Loggingには、ログをエクスポートする機能があります。エクスポートする用途としては下記の項目が挙げられます。

  • ログを長期間保存するため。(通常のログは30日間程度保持されます)
  • ログを分析するため
  • 他のアプリケーションで利用するため

上記の用途で利用できるようにするため、Stackdriver Loggingでは Cloud Storage, BigQuery, Cloud Pub/Subの3つにログをエクスポートすることができます。

StackdriverでLogをエクスポートする設定を理解するためにはsinkと呼ばれるオブジェクトについて理解する必要があります。Sinkオブジェクトは、自身の名前、エクスポートするログを選択するためのフィルタ、そしてフィルタに引っかかったログのエクスポート先の3つの要素から構築されます。

Logging自体には、ログをエクスポートする上で料金や制限は存在しません。けれども、エクスポート割きでのログデータの保存や送信には料金が掛かります。

Sinkはproject, organizations, billing_accounts単位で設定できます。そのため、監査ログを一元管理したいときなどはorganizationsに対してSinkの設定をするなど、用途に応じて適用する範囲を設定する必要があります。

設定と実装

要件

sample_cloud_function という名前の cloud functionsが実行完了した際に、その実行完了のログを検知してSlackに通知する」という要件で Stackdriver LoggingのログをSlackに通知してみます。

フィルタの設定

フィルタの設定については、こちらのページに設定方法が記載されています。設定に利用できるプロパティについては、こちら に記載されています。なお、resource type毎のlabelの値については、資料を見つけることができなかったので記載していません。

上記資料をもとにフィルタを自分で作成してみます。今回はsample_cloud_functionという名前のcloud functionが実行を完了したら、任意のpub/sub topicにメッセージを送信する必要があります。その要件を満たすフィルタは下記のようなります。

resource.type = "cloud_function"
resource.labels.region = "asia-northeast1"
resource.labels.function_name = "sample_cloud_function"
textPayload: "finished"

なおフィルタ設定時のベストプラクティスについては、下記のように記述されています。僕は毎回判断することが手間だったので、必ず引用符を用いるようにしています。

ベスト プラクティス: フィールド値と比較する文字列は引用符で囲むようにしてください。これにより、比較の意味が変わったり、デバッグが困難になるような誤りを防ぐことができます。文字の後に連続した文字、数字、アンダースコア(_)が続く単語の場合は、引用符を省略できます。

エクスポート先の設定 [terraform]

https://www.terraform.io/docs/providers/google/r/logging_project_sink.html

今回はPub/Subの cloud-functions-activity というTOPICに対してメッセージを送信することにしています。上記terraformのドキュメントを見ると、下記のように設定すると書かれています。

destination - (Required) The destination of the sink (or, in other words, where logs are written to). Can be a Cloud Storage bucket, a PubSub topic, or a BigQuery dataset. Examples: "storage.googleapis.com/[GCS_BUCKET]" "bigquery.googleapis.com/projects/[PROJECT_ID]/datasets/[DATASET]" "pubsub.googleapis.com/projects/[PROJECT_ID]/topics/[TOPIC_ID]" The writer associated with the sink must have access to write to the above resource.

unique_writer_identity については、projectをまたいで利用する場合はtrueにする必要があります。将来的に複数のprojectのcloud functionsのlogをここで通知したいと考えているので、ここでは true としました。

resource "google_logging_project_sink" "sample_cloudfunction" {
    name = "sample_cloudfunction_sink"
    destination = "pubsub.googleapis.com/projects/xxx/topics/cloud-functions-activity"
    filter = "resource.type = 'cloud_function' AND resource.labels.region = 'asia-northeast1' AND resource.labels.function_name = 'sample_cloudfunction' AND textPayload: 'finished'"
    unique_writer_identity = true
}

デプロイの設定 [cloudbuild]

cloudbuildを利用してデプロイしているので、その設定を載せておきます。とはいえ、重要なポイントは --trigger-topic のオプションの引数が、エクスポート先の設定で指定した cloud-functions-activity というトピックを指定しているという点のみです。

- name: 'gcr.io/cloud-builders/gcloud'
    args:
    - beta
    - functions
    - deploy
    - slack_reporter
    - --region=asia-northeast1
    - --stage-bucket=cf-bucket-for-xxx
    - --trigger-topic=cloud-functions-activity
    - --runtime=nodejs8

Pub/Sub Messageの内容

https://cloud.google.com/logging/docs/export/using_exported_logs?hl=ja#pubsub-organization

上記ドキュメントによると、Cloud Pub/Subによってストリーミングされるログは、下記のようなフォーマットになります。

{
 "receivedMessages": [
  {
   "ackId": "dR1JHlAbEGEIBERNK0EPKVgUWQYyODM...QlVWBwY9HFELH3cOAjYYFlcGICIjIg",
   "message": {
    "data": "eyJtZXRhZGF0YSI6eyJzZXZ0eSI6Il...Dk0OTU2G9nIjoiaGVsbG93b3JsZC5sb2cifQ==",
    "attributes": {
     "compute.googleapis.com/resource_type": "instance",
     "compute.googleapis.com/resource_id": "123456"
    },
    "messageId": "43913662360"
   }
  }
 ]
}

dataフィールドを base64でデコードすると、下記のようなLogEntry オブジェクトが取得できます。Cloud Functionの中では下記のObjectを利用してSlackへ送信するメッセージを構築します。

{
  "log": "helloworld.log",
  "insertId": "2015-04-15|11:41:00.577447-07|10.52.166.198|-1694494956",
  "textPayload": "Wed Apr 15 20:40:51 CEST 2015 Hello, world!",
  "timestamp": "2015-04-15T18:40:56Z",
  "labels": {
    "compute.googleapis.com\/resource_type": "instance",
    "compute.googleapis.com\/resource_id": "123456"
  },
  "severity": "WARNING"
}

Cloud Functionの実装

Pub/Sub Messageの内容で確認したObjectを利用してSlackに通知するスクリプトは下記のようになります。このとき SlackのTokenやChannel ID については、こちらの公式ドキュメント に記載されている方法で行っていますが、今回は省略させていただきます。

//index.ts
export async function slack_reporter(data: any) {
  const dataBuffer = Buffer.from(data.data, "base64");
  const logEntry = JSON.parse(dataBuffer.toString("ascii"));
  const client = await SlackClient.create();
  await client.post(`cloud function: ${logEntry.resource.labels.function_name} textPayload: ${logEntry.textPayload}`);
}
// SlackClient.ts
import { WebAPICallResult, WebClient } from "@slack/client";

export class SlackClient {
  public static async create(): Promise<SlackClient> {
    if (!this.instance) {
      this.instance = new SlackClient();
    }
    return this.instance;
  }

  private static instance: SlackClient;

  private slackCleint: WebClient;
  private readonly channel: string = process.env.CHANNEL_ID as string;
  private readonly token: string = process.env.SlackToken as string;

  constructor() {
    this.slackCleint = new WebClient(token);
  }

  public async post(text: string): Promise<WebAPICallResult> {
    return this.slackCleint.chat.postMessage({
      username: "ERP-HR Bot",
      channel: channel,
      text,
    });
  }
}

結果

f:id:selmertsx:20190325202416p:plain

ということで sample_cloud_functionsの実行完了を検知してSlack通知する仕組みを作ることができました。今後は、severity のレベルに応じてメッセージの内容を変更し、迅速に対応が必要なものがあれば即座に分かるようなところまで作ろうかなーとか考えたりしています。

所感

Stackdriver Logging、めちゃくちゃ便利です。一度 Stackdriver Loggingに集約することによって、ログの処理を一元管理することができます。一元管理することで、ログの前処理やBigQueryへのインポート等々、様々な処理を共通化できる気配を感じます。今回はこのような使い方をしましたが、RailsのLogなどを全てBigQueryに入れてしまって、BigQueryで分析することなどもできるのではないかな〜と思ったりしてます(昔Amazon Athenaでやってて、それなりに便利だった)。

もっとガシガシ使って、ポテンシャルを引き出していこう。

SlackのBlock Kitをjsxの記法で書ける jsx-slackを試してみた

TL;DR

  • 僕は個人的に datadog_slack_reporterというものを作成して、datadogで監視しているサービスの台数をslackに通知しています
  • Slackのメッセージ作成部分を、SlackのBlock Kitをjsxの方式で記述できる jsx-slack に置き換えてみました
  • 面倒なjsonの作成部分がReactっぽく書けるので、めちゃくちゃ便利でした
  • この資料にはjsx-slackをTypeScriptで導入する上で必要な設定方法を記載します

jsx-slackとは何か

Slackのメッセージ作成をjsxのフォーマットで行うことができるnpmのpackageです。これを利用すれば、Reactのcomponentを書くような書き味で複雑なjsonの作成をすることができます。

スクリーンショット 2019-03-04 12.07.34.png

上の図はjsx-slack内で生成されるjsonが確認できるページです。こちらの右側のjsonがslackが要求するjsonのフォーマットとなっており、左側がjsx-slackを利用して記述するコードになります。jsx-slackの動作確認ページはこちらになります。

成果物

最初に、今回作成したものの全体像をざっくり説明します。

Slackへのメッセージ出力

スクリーンショット 2019-03-04 11.22.05.png

こちらが、今回僕が作成したbotが実際にslackへ投稿したメッセージです。

ソースコード (一部抜粋)

jsx-slackを使った slackへのメッセージ送信ボットのプログラムがこちらになります。全体像を掴むために、さらっと眺めてもらえれば大丈夫です。

Cloud Functionsのhandler

import moment from "moment-timezone";
import "source-map-support/register";
import { DatadogHostMetrics } from "./datadog";
import { DatadogClient } from "./DatadogClient";
import { SlackClient } from "./SlackClient";
import { slackMessageBlock } from "./SlackMessageBlock";

const datadogClient = new DatadogClient();
const slackClient = new SlackClient();

export async function datadog_handler(data: any): Promise<void> {
  const fromTime = moment({ hour: 0, minute: 0, second: 0 })
    .tz("Asia/Tokyo")
    .subtract(1, "days")
    .format("X");

  const toTime = moment({ hour: 23, minute: 59, second: 59 })
    .tz("Asia/Tokyo")
    .subtract(1, "days")
    .format("X");
  // datadogのAPIを叩いてデータを取ってくる
  const hostMetrics: DatadogHostMetrics[] = await datadogClient.countHosts(fromTime, toTime);
  // 送信するメッセージを生成する
  const blocks = slackMessageBlock(fromTime, toTime, hostMetrics);
  // メッセージを送信する
  await slackClient.post(blocks);
}

messageの作成部分 (jsx-slackで書き直された場所)

/** @jsx JSXSlack.h */
import JSXSlack, { Block, Section } from "@speee-js/jsx-slack";
import moment from "moment-timezone";
import { DatadogHostMetrics } from "./datadog";
import { ProductMetrics } from "./ProductMetrics";

export function slackMessageBlock(fromTime: string, toTime: string, hostMetrics: DatadogHostMetrics[]) {
  const messages = [];
  for (const metrics of hostMetrics) {
    const productMetrics = new ProductMetrics(metrics);
    const message = (
      <blockquote>
        <b> {productMetrics.name} </b>
        <br />
        min:${productMetrics.minHostCount()} ~ max:${productMetrics.maxHostCount()}
        sum(host*hours):${productMetrics.sum()}
      </blockquote>
    );
    messages.push(message);
  }

  return JSXSlack(
    <Block>
      <Section>
        <p>datadog monitoring daily report</p>
        {moment.unix(parseInt(fromTime, 10)).toString()} ~ {moment.unix(parseInt(toTime, 10)).toString()}
        {messages}
      </Section>
    </Block>
  );
}

messageの送信部分

import { WebClient } from "@slack/client";

export class SlackClient {
  private static username: string = "Datadog按分計算Bot";
  private readonly channelID: string = process.env.CHANNEL_ID as string;
  private readonly token: string = process.env.SlackToken as string;
  private readonly client: WebClient;

  constructor() {
    this.client = new WebClient(this.token);
  }

  public post(blocks: any) {
    return this.client.chat.postMessage({
      channel: this.channelID,
      text: "",
      blocks,
      username: SlackClient.username,
    });
  }
}

実際のコードはこちらを参照してください。 https://github.com/selmertsx/datadog_slack_report/pull/4

基本的な書き方

  • 最初にslack本家のblock-kit-builderで、サンプルを見ながらどのようなblock kitが作りたいのか考えます。
  • jsx-slackのリポジトリ内にはサンプルがいくつかあるので、そちらを参照しても良いでしょう
  • その後、jsx-slackのサイトで目的のjsonが生成できる記法を探します。

jsx-slackの導入設定

TypeScriptで導入する際は少しだけ設定をする必要があります。公式ドキュメントを見ると下記のような記載があります。

https://github.com/speee/jsx-slack

A prgama would work in Babel (@babel/plugin-transform-react-jsx) and TypeScript >= 2.8 with --jsx react.

ということで、jsx-slackをTypeScriptで利用するにはtsconfigの設定が必要であることが分かります。以降、それらの設定を行っていきましょう。

@speee-js/jsx-slackのinstall

npm i @speee-js/jsx-slack

今回の設定に必要なpackageを上記コマンドで全部installします。

tsconfigの設定

TypeScriptのjsxに関するドキュメントを読んでみましょう。

https://www.typescriptlang.org/docs/handbook/jsx.html#factory-functions

The exact factory function used by the jsx: react compiler option is configurable. It may be set using either the jsxFactory command line option, or an inline @jsx comment pragma to set it on a per-file basis.

上記コメントで示されているように、TypeScriptでjsxを利用するためには、ファイルごとのpragmaの設定と、コンパイラオプションの指定が必要になります。pragmaの設定とは、サンプルコードで示されていた /** @jsx JSXSlack.h */ の部分を指します。コンパイラオプションは、下記のように設定します。

{
  "compilerOptions": {
    "target": "es2018",
    "module": "commonjs",
    "lib": [
      "es5",
      "es2015",
      "es2016.array.include",
      "esnext.asynciterable"
    ],
    "strict": true,
    "outDir": "./",
    "esModuleInterop": true,
    "noImplicitAny": true,
    "allowJs": true,
    "jsx": "react" // <= 今回追加した設定
  },
  "include": [
    "src/**/*.ts"
  ],
  "exclude": [
    "node_modules",
    "__tests__",
    "**/__mocks__/*.ts"
  ]
}

実際のコードはこちら

最後に

jsx-slack 導入の手順は以上になります。今回の例はちょっとシンプルすぎるのであまり有り難みを感じにくいかも知れませんが、もうちょっと複雑になってくると jsxを利用してReactっぽく書ける jsx-slackに大きなメリットを感じられるかと思います。

Sidekiqのjobの信頼性向上方法と Sidekiq Proの検討について

自分のための覚書

TL;DR

  • sidekiq proでは、server processが死んでも jobの復活がサポートされる
  • sidekiq proにおいて、redis が死んでも、1000件程度のジョブならclientが保持し続けて、redisが復活したタイミングでenqueue してくれる
  • ↑の状況において、client processが死ねば、蓄積された1000件のジョブは全て消える
  • sidekiq enterpriseでは、unique な jobになるように諸々やってくれるが、完全に保証してくれるものでは無いので、そこんところを考えたjob設計にすること。

Pricing

  • Pro
    • $9,500 / yr
  • Enterprise
    • 250 thread $19,500 / yr
    • 500 thread $23,400 / yr

Jobが失われないようにする仕組みについて

sidekiqにおいてjobが失われるケースは下記の3点

  • redisに接続できないケース
  • client processが死んだケース
  • server processが死んだケース

この中でSidekiq Proが対策をしているのは redisに接続できないserver processが死んだ の2つのケース。

redisに接続できない場合

基本的に、下記の資料に書いてある内容。 https://github.com/mperham/sidekiq/wiki/Pro-Reliability-Client

通常版だと、redisが死んでいたらjobがpush出来ない。しかしPro版だとpushできなかったjobをclientのプロセスが保持しており、特定のjobがpushできるようになったタイミングで残っているjobも一緒にpushしてしまう。なお、client が保持できるjobの数は最新の1000件のみ。1000件を越えた場合は、永遠に失われてしまうので、そこは気をつけること。

server processが死んだ場合

  • sidekiq basicは、Redis queueからのjobのフェッチにBRPOPを使っている
  • sidekiqがjobを実行している最中にクラッシュしたら、そのjobは永遠に失われてしまう
  • jobが失われない様に保証するための唯一の方法は、jobが完了するまでredisから削除しないこと
  • sidekiq proはRedisのPROPLPUSH コマンドを利用してそれを実現している
  • BROPLPUSH を使う場合は下記のように記述する必要がある
Sidekiq::Client.reliable_push! unless Rails.env.test?

Sidekiq.configure_server do |config|
  config.super_fetch!
  config.reliable_scheduler!
end

なお、super_fetchをする際は、redisデータベースの全走査を行うので、cache dataとjob dataで保持するredisを分けるのが望ましい。

JobのUnique制約について

大前提として jobは (class, 引数, queue)を組み合わせてuniqueであることが求められる。同じ引数のjobを異なるqueueに入れることができる。そのため、同一引数のjobで違う結果を期待してはいけない。基本的に unique な制約は時間に掛けることになる。

class MyWorker
  include Sidekiq::Worker
  sidekiq_options unique_for: 10.minutes

  def perform(...)
  end
end

上記のような設定をした場合、jobが成功するか、jobが失敗して10分が経過するまで同一引数のjobを実行することは出来ない。