selmertsxの素振り日記

ひたすら日々の素振り内容を書き続けるだけの日記

Azitに入社しました

TL;DR

  • 2019年7月にSpeeeを退職し、Azitに入社しました
  • Azitではインフラエンジニア・スクラムマスターをやってます
  • Speee、とても良い会社なので色んな人に勧めたい
  • Azit、とても良い会社だけど、死ぬほど人足りない

Azitに転職しました

2019年7月にSpeeeを退職し、Azitに転職しました。

転職の理由はプライベートの状況の変化で、自分の課題意識が変化したからです。 2018年くらいに父親が体を崩しまして、ちょいちょい見舞いとか病院の送迎とかをやっていました。 地元は公共の交通網が完全に死んでいてとても車なしで通院できる状況ではなかったので、 ぼくが2〜3時間掛けて実家に帰って、そこから車を1時間運転して病院に送迎していました。 ( ちなみに、今はもう父親は退院して元気にやってます。)

そんなことがありまして、田舎の交通事情は今すでに危機的だし、これからはもっとやばいことになるなーとか色々考えていたところ、 ちょうど地方の交通課題の解消に取り組んでいるAzitの人に会って話す機会がありまして、意気投合した結果、自分の技術力をこの課題の解消に使いたいと思ってAzitへ転職することにしました。

Azitでやってること

www.wantedly.com

Azitではだいたいここに書いてあることをやっています。 インフラをいちから作り直す仕事と、Railsコードの負債解消、将来マイクロサービス化を見据えた準備と、スクラムマスターっぽい諸々をやってます。

インフラエンジニアは僕1人しかいないので不安だらけですが、前前職同期の太田くんに技術顧問的に相談に乗って貰いながらなんとかやってます。

スクラムマスターぽい業務については、正しいものを正しくつくるカイゼン・ジャーニーユーザーストーリーマッピングの3つの書籍をベースに、様々な部署と協力しながらいいものが作れるように進めています。 この書籍をベースに僕たちのチームを作りたいぞ!って話をしたら、デザイナー、PM、データ分析、さらには広報、人事まで本を読んでくれたりしているので、 この機会をちゃんと掴んで良いチームにしていきたいな〜と思っとります。

Speee とても良い会社なので色んな人に勧めたい

まぁそんなこんなでAzitで楽しくチームづくりに関わっているわけですが、こういったふるまいの大部分は前職上司の大場さんに学びました。 面談等で諸々フィードバックを受けたとき、僕に知識がなくて言われたことを全然理解できなかったことが多々あったのですが、そういうときは理解するために必要な本を色々勧めて貰いました。 同じ本を読み、共通理解を持って物事を進めていくというやり方は、今の会社でもめちゃくちゃ使わせて貰っています。 正しい知識、HRTな態度、実行力の3つが揃えば、たいていの物事はちゃんと進んでいくことを大場さんの仕事を見ながら学ばせていただきました。

上長の荻原さんにも色んな面で助けられました。尊敬するエンジニアの一人です。 IDaaS関連システムについて、中々良い仕上がりになった ( 運用フェーズに関われてないので、本当に良かったのか後で教えてほしい...!! ) のは完全に荻原さんのおかげなので、感謝しかないです。今の現場でも、荻原さんならどう振る舞うかを考えながら開発しています。

tech.speee.jp

他にも pataiji と一緒にwebapp-revieeeをワクワクしながら一緒に作って、それをクリアコードの須藤さんにレビューして貰ったり、 井原さん、gfxさん、yhattさん、飯田さん、nisshieeさん、kohtaro24さんとかとか色んなタイプの優秀なエンジニアの人たちと一緒に働かせて貰いました。

Speeeには退職の意思を2018年末から伝えていました。 けれども、それ以降も今までと何も変わらずに、AWS re:inventに送り出して貰ったり、 いろんな業界の第一人者と呼ばれるエンジニアの方々にお会いさせて貰ったり、一緒に仕事する機会をもらいました。 そういう訳で、最後まで最高の会社でした。まじ感謝! そんな人たちと働きたいという人はこちらから応募して貰えるといいかと思います!

www.wantedly.com

Azitも人を募集しているよ!

Azitは若い人たちばかりで、エンジニアは30歳の僕が最年長です。 メンバーの学習意欲はとても高く、社内勉強会を開けば職種を問わずみんな来てくれて、 自分たちのサービスを改善するために勉強会で学んだ知識をどう活かすのかワイワイガヤガヤしながら楽しく開発しています。

そんなチームですが、エンジニアが本当に足りなくて困ってます! もし興味を持ってくれた人がいましたら、Wantedly、もしくは森岡個人にDMに連絡をください〜。 軽くお話だけでもぜひぜひ!

恒例のあれ

Amazon ほしいものリスト

AWS System Manager Sessions Manager のPort Forwardingを利用して踏み台を経由せずに手元からitamaeを実行する

AWS System Manager Session Managerとか、ちょっと冗談みたいな名前ですよね。 おいおいEKS化とかやってく!!

このドキュメントに書いてあること。

このドキュメントには、AWS System Manager Sessions Manager のPort Forwardingを利用して踏み台を経由せずに手元の端末からitamaeを実行するための設定方法 が記載されています。 具体的には、下記の3点になります。

  • 手元の端末にSession Manager Plugin をインストールする方法
  • インフラを構成するterraformのコードの一部(iamの設定部分)
  • itamaeを実行する手順

AWS System Manager Sessions Manager のPort Forwardingとは

aws.amazon.com

2019年8月28日に、AWS System Manager Sessions Manager の Port Forwarding という機能が発表されました。 これは、プライベートサブネットにデプロイされたEC2インスタンスと自分のPC間に、トンネルを作成してくれる機能です。 この機能を利用すると、踏み台サーバーを経由せずにEC2インスタンスにアクセスすることが可能になります。 今回はこの機能を利用して、踏み台を経由せずに、EC2インスタンスにitamaeを実行していきます。

手元端末に Session Manager Plugin をインストールする

https://docs.aws.amazon.com/ja_jp/systems-manager/latest/userguide/session-manager-working-with-install-plugin.html#install-plugin-macos

上記のドキュメントにaws cliに Session Manager Pluginをインストールする方法が記載されています。 実際実行するコマンドは下記のようになります。

$ curl "https://s3.amazonaws.com/session-manager-downloads/plugin/latest/mac/sessionmanager-bundle.zip" -o "sessionmanager-bundle.zip"
$ unzip sessionmanager-bundle.zip
$ sudo ./sessionmanager-bundle/install -i /usr/local/sessionmanagerplugin -b /usr/local/bin/session-manager-plugin
$ session-manager-plugin

Terraform設定

EC2 instanceに対してSSM Port Forwardingができるように権限を付与する

AWS SSM Port Forwardingを利用して EC2 instanceにアクセスするためには、 EC2 instanceの IAM Roleに対して、AmazonSSMManagedInstanceCoreのポリシーを付与する必要があります。 僕は下記のTerraformのコードを利用して、ポリシーの付与を行いました。

resource "aws_iam_instance_profile" "default" {
  name = "${var.name}-profile"
  role = "${aws_iam_role.default.name}"
}

resource "aws_iam_role" "default" {
  name               = "${var.name}"
  path               = "/"
  assume_role_policy = file("${path.module}/assume_role_policy.json")
}

// SSMのポリシーを付与する
data "aws_iam_policy" "ssm_core" {
  arn = "arn:aws:iam::aws:policy/AmazonSSMManagedInstanceCore"
}

resource "aws_iam_role_policy" "default" {
  name   = "${var.name}-policy"
  role   = "${aws_iam_role.default.id}"
  policy = "${data.aws_iam_policy.ssm_core.policy}"
}
// assume_role_policy.json
{
  "Version": "2012-10-17",
  "Statement": [
    {
      "Action": "sts:AssumeRole",
      "Principal": {
        "Service": "ec2.amazonaws.com"
      },
      "Effect": "Allow",
      "Sid": ""
    }
  ]
}

EC2 instanceにssm agentをインストールする

EC2インスタンスにおいてAWS SSMを利用するには System Manager Agentをインストールして、起動する必要があります。 今回はその設定をEC2インスタンスuser_data にて行うことにしました。 そのterraformの設定が下記のようになります。

resource "aws_instance" "ap-northeast-1a" {
  ami                         = "xxx"
  associate_public_ip_address = true
  availability_zone           = "ap-northeast-1a"
  instance_type               = "t3.medium"
  key_name                    = "xxx"
  monitoring                  = true
  ebs_optimized               = true
  disable_api_termination     = false
  source_dest_check           = true
  subnet_id                          = "$var.subnet_id"
  vpc_security_group_ids      = "${var.security_group_ids}"
  iam_instance_profile        = "${var.instance_profile_name}"

  tags = {
    Env  = "${var.env}"
    Name = "${var.name}"
  }

  user_data = file("${path.module}/install.sh")
}
#! /bin/bash

sudo yum install -y https://s3.amazonaws.com/ec2-downloads-windows/SSMAgent/latest/linux_amd64/amazon-ssm-agent.rpm
sudo systemctl enable amazon-ssm-agent
sudo systemctl start amazon-ssm-agent

itamaeの実行

上記でSystem Manager Port Forwardingを実行する環境が整ったので、あとはitamaeを実行するだけです。 itamaeの実行は .ssh/config にちょっとした設定を追加して、aws ssm start-session コマンドを実行すれば、後は itamae コマンドを実行するだけです。 ということで下記の手順で進めていきましょう。

.ssh/configの設定

Host xxx-staging-app-1
  HostName 127.0.0.1
  User ec2-user
  Port 9999

itamaeの実行

aws ssm start-sessionコマンドを実行して、手元の端末とEC2インスタンスの間にトンネルを作成しましょう。

$ aws ssm start-session --target i-xxx \
  --document-name AWS-StartPortForwardingSession \
  --parameters '{"portNumber":["22"],"localPortNumber":["9999"]}'

Starting session with SessionId: shuhei.morioka-xxx
Port 9999 opened for sessionId shuhei.morioka-xxx
Connection accepted for session shuhei.morioka-xxx

トンネルの作成が完了したら、あとはいつものようにitamaeのコマンドを実行していきます。

bundle exec itamae ssh -h xxx-staging-app-1 --node-yaml roles/xxx-staging-api/node.yml roles/xxx-staging-api/default.rb
 INFO : Starting Itamae... 
 INFO : Loading node data from /Users/xxx/node.yml...
 INFO : Recipe: /Users/xxx/default.rb
 INFO :   Recipe: /Users/xxx/itamae/cookbooks/git/default.rb
 INFO :     package[git] installed will change from 'false' to 'true'

ということで無事に、踏み台を経由せずに itamaeを実行することができました! 今の時代あんまり需要ないかもですが、この記事が誰かの手助けになるといいっすな。

SREについてDevOpsの違いや各種用語についてのまとめ

自分用メモです。

DevOpsとSREの違い

DevOpsとは開発(Development)と運用(Operations)を組み合わせた言葉であり、開発担当者と運用担当者が連携して協力し、さらには両担当者の境目も曖昧にする開発手法を指します。 厳密な定義は存在しておらず、抽象的な概念にとどまっています。 その目的から、DevOpsは組織(より具体的には任意のプロダクトに関わる全ての人達)の課題解決にフォーカスしています。 しかしながら、解決するための具体的な方法について定めるようなものではなく、大きな方針や文化を大切にしています。

SREとはGoogleのVPoEであるBen Taylor Slossが作成した言葉であり、Googleが実践しているシステム管理とサービス運用の方法論です。 事業責任者とService Level Objective(SLOs)を定め、それを守ることに責任を持ちます。 その他にもトイルに対する対処方法であったり、ポストモーテム、自動化などなど様々な内容に関して基準や、具体的な解決の手順を定めています。 DevOpsと親しい概念であり、class SREは interface DevOpsをimplementsしているとも言えます。

SREもDevOpsもサービスをより良くするという目的については一致しています。しかしながら、そのアプローチ方法が異なります。 DevOpsは組織的な課題解決にフォーカスすることによってその目的を達成しようとするのに対して、 SREは様々なオペレーションをソフトウェアで解決できる形に置き換え、ソフトウェアエンジニアリングのアプローチでもって課題に対処していくことでその目的を達成しようとします。

DevOpsは幅広い文化や哲学を指すのに対して、SREは厳密に定義された手法を持っていると言えます。

SLO/SLI/SLA/Error Budget

サービスを管理するためには、継続的に計測可能で具体的な指標が必要です。 その指標は、ユーザーにとって価値のあるサービスの振る舞いになります。 Googleではこのサービスレベルを管理するために、SLI(Service Level Indicators)、SLO (Service Level Objectives )、SLA (Service Level Agreements) の3つの概念を用いています。

SLO

SLOsとは、Service Level Objectivesの略称で、サービスの信頼性の目標のことを指します。 SLOを定める際は、ユーザーがそのサービスを利用する上で何が大切なのかを考え、それを数値化して指標とすることが大切です。 なお指標は平均を利用するのではなく、パーセンタイルを利用することが望ましいです。具体的には下記2つのような例があります。

  • eg1. gRPCのリクエストレイテンシの99%が100 ms以下であること
  • eg2. 可用率 99.9%

サービスには適切な信頼性というものがあります。過度な信頼性はコストとしてサービスに跳ね返ります。 また、明示的にSLOを定めなければ、ユーザーはサービスに対して過度な信頼を寄せることもあります。 障害の大部分はサービスの更新時において発生します。 なので、サービスの成長・拡大と信頼性のトレードオフから適正な値を定める必要があります。

サービスのSLOは最初から完璧に決められるものではありません。 最も大切なことは、計測することによって得たものでサービスとそのユーザーに対する理解を深め、よりよいSLOを模索し続けることです。 そのため、一度決めて終わりではなく、継続的に見直していく必要性があります。

SLO Document: https://landing.google.com/sre/workbook/chapters/slo-document/

SLI

SLIとはサービスの品質を決める計測可能な指標です。 サービスの品質を決める指標なので、ユーザーの視点に最も近い指標を利用することをおすすめします。 良いイベント数 / 全体のイベント数という形で定義することをおすすめします。 一般的によく使われる例は下記のようになります。

  • 成功したリクエストの数 / 全体のリクエスト数
  • 100ms以内に完了するgRPCのリクエスト数 / 全体のgRPCリクエスト数

こちらは、Google Workbookに記載される指標の一覧です。

このように定義すると、SLIは必ず 0 ~ 100 %の値を取ります。 SLIを一貫したスタイルで設定することによって、アラートツールを作るとき、エラーバジェットを計算するとき、レポートを作成するときに、同じようなINPUTを望むことができるようになります。 最初のSLIは、最小の工数でできる物を選びましょう。 調査に1週間かかってJSの搭載に数ヶ月かかるぐらいなら、すでにWebサーバのログがあるのであれば、ログを使いましょう。

Error Budget

Error Budgetの概念は、下記の数式で表されます。

SLIの計測値 - SLO = Error Budget

具体的な例を使って説明します。 例えば、とあるサービスのSLI を HTTP Responseを返したリクエスト数/ Load Balancerに到達したリクエスト数 としたとして、SLOを99%と置いたとします。 ここで SLIの計測値が 10000/10000 であった場合、100リクエスト分追加で失ってしまったとしても、SLOを満たしていると言えます。

この失うことのできる100リクエストをError Budget(予算)と言います。

エラーバジェットが残る限り、システムのリリースは継続が可能です。 逆に言えば、エラーバジェットが尽きそうになったら、リリースの頻度を下げる or ロールバックする必要があります。 エラーバジェットが多ければプロダクト開発者はリスクを取ることが出来ますし、エラーバジェットが少なければプロダクト開発者はリスクを下げる仕組みづくりに取り組むことになります。 例え外部要因によって障害が起きたとしても、エラーバジェットは消費されます。 プロダクトマネージャーは、イノベーションを取るならSLOを下げ、安定性を取るならSLOを上げることになります。

Error Budgetがなくなったときの取り決めをしなければ、SLOは何の実効性も持ちません。 そのためこの取り決めはとても重要です。全てのステークホルダーが合意するまで、SLI、SLOは調整する必要があります。

Error Budget: https://landing.google.com/sre/workbook/chapters/error-budget-policy/

SLOのTime Window

SLOsは様々時間幅で定義されます。代表的なものは Rolling windowと Calendar Windowの2つです。 ウィンドウを選択するとき、考慮するべき要素がいくつかあります。

Rolling Windows移動平均でSLOを導出します。そのためSLOはユーザー体験とより近しい指標になります。 Calendar Windowsは期間を決めてSLOを導出します。そのため、ビジネス上の戦略を立てやすくなります。

時間窓の期間によって、プロジェクトが取る戦略は変わります。 短い時間窓は迅速な意思決定を可能にし、長い時間窓は戦略的な意思決定を可能にします。 Googleでは4週間のローリングウィンドウを採用しています。

functions-framework を利用したGoogle Cloud Functionsにおいてpubsubのテストをする方法

このドキュメントに書いてあること

これまで Google Cloud Functionsをローカル環境でテストするときは、cloud-functions-emulator という公式で提供されているツールが一般的に利用されていました。しかしながらこのツールは現在archiveされており、作者が2019年5月16日に作成した issueによると functions-framework という新しいツールの利用を推奨しています。

このドキュメントでは、functions-framework を利用して Google Cloud FunctionsをLocalから実行する方法、特に公式では提供されていない eventsトリガーを利用してpubsubのメッセージを読み込ませる方法について記載します。

※ なお、2019年6月26日になっても公式ドキュメントでは @google-cloud/functions-emulatorを利用するようにと書かれています。 https://cloud.google.com/functions/docs/emulator?hl=ja#getting_started

スクリーンショット 2019-06-26 16.07.17.png

functions-frameworkとは?

Node.jsを利用してFaaSを書くためのOSSフレームワークです。このフレームワークを利用して書かれたコードは Cloud Functionsだけでなく、Cloud Runなどでも利用することができます。

そのようなことを目的としていると公式ドキュメントには書かれているものの、Cloud Run複数のAPIを持つ場合のケースに対応しておらず、現状では Cloud Functionsを Localで起動するためのツールとして使われることがメインになりそうです。Cloud Functionsでの利用であれば、Localの開発環境のみinstallするだけですぐに利用することができます。

npm i -D @google-cloud/functions-framework

このあたりの実装を読んでみると、内部でexpressサーバーを起動して、利用者が作成した functionをラッピングしていることが見て取れます。実行は簡単で、ラッピングしたいfunctionを下記のように指定してコマンドを実行すれば動きます。

npx functions-framework --target=helloWorld

実際に動かしてみる

プログラムの用意

僕が実際に利用しているプログラムからの抜粋です。cloud pubsubのmessageを受け取った後、その中身を見てSlackに通知しています。

export async function slack_reporter(data: any) {
  const dataBuffer = Buffer.from(data.data, "base64");
  const body = dataBuffer.toString("ascii");
  const client = await SlackClient.create();
  await client.post(body);
}
import { WebAPICallResult, WebClient } from "@slack/client";

export class SlackClient {
  public static async create(): Promise<SlackClient> {
    if (!this.instance) {
      const token = process.env.SLACK_TOKEN as string;
      const channel = process.env.SLACK_CHANNEL_ID as string;
      this.instance = new SlackClient(token, channel);
    }

    return this.instance;
  }

  private static instance: SlackClient;

  private slackCleint: WebClient;
  private channel: string;

  constructor(token: string, channel: string) {
    this.slackCleint = new WebClient(token);
    this.channel = channel;
  }

  public async post(text: string): Promise<WebAPICallResult> {
    return this.slackCleint.chat.postMessage({
      username: "ERP-HR Bot",
      channel: this.channel,
      text,
    });
  }
}

pubsubのmessageを作成する

pubsubで送信されるmessageのフォーマットについては、公式ドキュメントによると下記のように指定されています。

{
  "data": string,
  "attributes": {
    string: string,
    ...
  },
  "messageId": string,
  "publishTime": string
}

このように色々なパラメータが存在していますが、今回のケースで利用するのは data のみです。理由については公式ドキュメントに記載されています。ということで messageを作成していきます。この dataパラメータはbase64エンコードする必要があるため、下記のコマンドでエンコードします。

$ echo -n "hogehoge" | base64
aG9nZWhvZ2U=

これをmessageで送信するjsonに組み込むと下記のようになります。

{
  "data": "aG9nZWhvZ2U="
}

Local環境でCloud Functionsを起動する

公式のドキュメントによると、functions frameworkを起動する際のoptionは --port, --target, --signature-typeの3点です。ここでは実行したい functionは slack_reporterであり、トリガーはpubsubにしたいので、下記のように設定をしました。

$ npx functions-framework --target=slack_reporter --signature-type=event

Serving function...
Function: slack_reporter
URL: http://localhost:8080/

呼び出し

Cloud FunctionもLocalで起動したので、次は起動しているCloud Functionを実行していきます。ここで Functions Frameworkのissueを読んでいくと次のようなissueが見つかります。

https://github.com/GoogleCloudPlatform/functions-framework-nodejs/issues/37

そしてこちらのPR上で、どのようにmessageを送信することが適切なのか議論されています。ということで、これから記述する方法は将来正しくない方法になってしまう可能性がありますが、とはいえ今試す必要がある人がいるとも思うので書いておきます。

curlを使って下記のように実行すると、Localで動いているCloud Functionsが pubsubのメッセージを読み込むことができます。

$ curl -X POST -H 'Content-Type:application/json; charset=utf-8' \
  -H 'ce-type: xxx' \
  -H 'ce-specversion: xxx' \
  -H 'ce-source: xxx' \
  -H 'ce-id: xxx' \
  -d "$(cat mock_pubsub.json)" http://localhost:8080

すると、こんな感じでslackに通知されました。めでたしめでたし。

スクリーンショット 2019-06-26 18.47.51.png

この理由については、このあたりのコードに書いてあるのですが、もうちょっと追加調査したいことがあるので、またの機会に書こうと思います〜。

SAM Localを利用してLocalで動かしているAWS Lambda からdynamodb-localにアクセスする方法

この記事に書かれていること

  • SAM CLIの環境構築方法
  • SAM CLIを使ってLocalでLambdaを起動する方法
  • SAM CLIを使ってLocalで起動しているLambdaから、Localで用意したDynamoDB containerにアクセスする方法
  • これらの処理を僕が趣味で作っているAWS Lambdaを例に説明します。

この記事に書かれていないこと

  • SAM CLIとは何か?
  • Lambdaを利用する際のwebpackの設定

利用環境

  • nodejs8.10
  • TypeScript 3.4.5
  • SAM CLI 0.15.0
  • python 3.7.2

事前準備

aws-sam-cliのinstall

Installing the AWS SAM CLI on macOS というAWS公式の手順に則ってinstallします。

aws-sam-cliは、pythonのバージョン 2.7、3.6、3.7 に対応しています。もし手元の環境がそれらのバージョンに一致していないのであれば、対応しているバージョンのpythonをinstallしましょう。なお2.7は2020年の1月にはメンテナンスが終了されますので、今から入れるのであれば 3以上にすると良いでしょう。

$ brew install pyenv
$ brew install pyenv-virtualenv
$ pyenv install 3.7.2
$ pyenv local 3.7.2
$ brew tap aws/tap
$ brew install aws-sam-cli
$ sam --version
SAM CLI, version 0.15.0

dynamodb-localのdocker imageをpull

こちらもamazon公式のdocker imageを利用します。下記のコマンドを実行してdocker imageをpullしましょう

docker pull amazon/dynamodb-local

SAM Localテスト用データ作成

aws-sam-cliを使ってLocalからLambdaを起動するためのデータを作成します。今回は、シンプルにAPI Gatewayから起動することにします。

sam local generate-event \
  apigateway aws-proxy \
  --path datadog_report \
  --method GET > events/event_apigateway.json

このコマンドによって作成されたjsonこちらになります。

実装

docker-composeの設定

# docker-compose.yml
version: "3"

services:
  dynamodb-local:
    container_name: dynamodb
    image: amazon/dynamodb-local
    build: ./
    ports:
      - 8000:8000
    command: -jar DynamoDBLocal.jar -dbPath /data -sharedDb
    volumes:
      - ./data:/data
    networks:
      - lambda-local
networks:
  lambda-local:
    external: true

この設定において重要な点は3点あります。

1点目は、DynamoDB localのコマンドオプションに -dbPath /data を指定している点です。-dbPathでdockerがマウントしているvolumeに書き出すことによって、指定したディレクトリにデータを吐き出させるようにしています。こうすることで、データを永続化しています。-inMemoryオプションを使ってしまうと、毎回データが削除されてしまうので、開発時にそのオプションを利用するのは少し手間が掛かってしまうでしょう。(テストのときはあると良さそうです)

2点目は、DynamoDB localのコマンドオプションに、 -sharedDbオプションを指定しているところです。-sharedDbオプションを指定しない場合、データはmyaccesskeyid_region.db というフォーマットで格納されます。これはこれで、毎回起動するときにそのあたりのパラメータをちゃんと設定できていればよいのですが、今回は簡単のため-sharedDbオプションを指定しています。

3点目は、networksを指定しているところです。aws-sam-localによってlocalで実行されるLambdaは、起動時にdockerのnetworkを指定することができます。ここで指定したnetworksを aws-sam-localの起動時にも利用することによって、localで起動しているLambdaから、このdocker containerにアクセスすることができるようになります。

これらDynamoDB localのオプション内容については、公式ドキュメントに記載があるので参照してください。ということで設定ができたので、下記コマンドを実行してDynamoDB Localの環境を構築しましょう。

docker network create lambda-local
docker-compose up

typescript

ぼくが趣味で作っている、AWS Lambdaのコードから取ってきたやつです。 https://github.com/selmertsx/datadog_slack_report

今思えばちょっと設計に改善の余地ありですな...。この後新しい機能を追加予定なので、そのときにでもリファクタリングしようと思います。一旦必要そうなもののみ引っ張ってきました。

// https://github.com/selmertsx/datadog_slack_report/blob/c4e59fdb60b2e190bd58f7e823268d8b697e3dfb/src/index.ts
import { APIGatewayEvent, Callback, Context } from "aws-lambda";
import moment from "moment-timezone";
import "source-map-support/register";
import { Billing } from "./Billing";
import { SlackClient } from "./SlackClient";

export async function datadog_handler(event: APIGatewayEvent, context: Context, callback: Callback) {
  const fromTime = moment({ hour: 0, minute: 0, second: 0 })
    .tz("Asia/Tokyo")
    .subtract(1, "days")
    .format("X");

  const toTime = moment({ hour: 23, minute: 59, second: 59 })
    .tz("Asia/Tokyo")
    .subtract(1, "days")
    .format("X");

  try {
    const billing = new Billing();
    const report = await billing.calculate(fromTime, toTime);
    const slackClient = new SlackClient();
    await slackClient.post(report.slackMessageDetail());

    callback(null, {
      statusCode: 200,
      headers: {
        "Content-Type": "application/json;charset=UTF-8",
      },
      body: JSON.stringify({ status: 200, message: "OK" }),
    });
  } catch (err) {
    throw new Error(err);
  }
}
// https://github.com/selmertsx/datadog_slack_report/blob/e078d2427806f3f9b402a3af1fbe79c98b0e2a5a/src/DynamoDBClient.ts
import { DynamoDB } from "aws-sdk";
import { ReservedPlan } from "./typings/datadog";

export class DynamoDBClient {
  private client = new DynamoDB.DocumentClient({
    endpoint: "http://dynamodb:8000", // ここが重要!!!!!
    region: "ap-north-east1",
  });

  public getReservedPlans(): Promise<ReservedPlan[]> {
    return new Promise<any>((resolve: any, rejects: any) => {
      this.client.scan({ TableName: "DatadogPlan" }, (error, data) => {
        if (error) {
          rejects(error);
        } else if (data.Items == undefined) {
          resolve([]);
        } else {
          const results: ReservedPlan[] = [];
          data.Items.forEach(item => {
            results.push({ productName: item.Product, plannedHostCount: item.PlannedHostCount });
          });
          resolve(results);
        }
      });
    });
  }
}

さて、長々とコードが書いてあるのであれなのですが、重要なのは1点だけです。DynamoDBのendpointについて http://${dynamodb-localのcontainer名}:8000 としていることです。これによってSAM Localで起動したAWS Lambdaから、LocalのDynamoDBにアクセスすることができます。

  private client = new DynamoDB.DocumentClient({
    endpoint: "http://dynamodb:8000", // ここが重要!!!!!
    region: "ap-north-east1",
  });

起動方法

ということで、ここまでやったら後は起動するだけ。起動する際は、 sam local invoke コマンドの --docker-network オプションに、先程 docker-compose.yml で指定した network名を設定してみましょう。具体的には下記のコマンドになります。

$ npx webpack --config webpack.prod.js
$ sam local invoke --docker-network lambda-local -e events/event_apigateway.json --env-vars .env.json DatadogReport

2019-04-25 10:30:30 Found credentials in environment variables.
2019-04-25 10:30:30 Invoking index.datadog_handler (nodejs8.10)

Fetching lambci/lambda:nodejs8.10 Docker container image......
2019-04-25 10:30:33 Mounting /Users/shuhei.morioka/project/speee/datadog_slack_report as /var/task:ro,delegated inside runtime container
START RequestId: dbefc77e-42dc-1d21-a444-0abc44875df5 Version: $LATEST
END RequestId: dbefc77e-42dc-1d21-a444-0abc44875df5
REPORT RequestId: dbefc77e-42dc-1d21-a444-0abc44875df5  Duration: 4299.35 ms    Billed Duration: 4300 ms        Memory Size: 256 MB     Max Memory Used: 121 MB

ということで、Localで動いているAWS LambdaからDynamoDB Localにアクセスすることができました〜。

TypeScriptで書かれているCloud FunctionsからCloud PubSubのREST APIを叩く

この記事に書いてあること

この記事には、 TypeScriptで書かれているCloud FunctionsからCloud PubSubのAPIを叩く方法 が書かれています。それだけのことなのですが、現在GCPから公式で提供されているライブラリで実現するにはとても大変でした。

僕が把握している限り、firestoreを利用する際も同じ問題が発生しています。そのような問題に対処する際に参考になればと思い書きました。

概要

TypeScriptで書かれたCloud Functionsから、任意の条件を満たしたときに Cloud PubSubの特定のTOPICに対してメッセージを送信しようとしました。Cloud PubSubをnodeで利用する際、公式から提供されているのは @google-cloud/pubsub というライブラリです。しかしながら、このライブラリはCloud Functionsで利用することは難しいです。なぜなら、@google-cloud/pubsub で利用されている node-pre-gyp は、webpackでの利用を想定していないからです。(きっと現状、多くの人が Cloud Functions のbundleにはwebpackを利用していることでしょう! )

No. I designed node-pre-gyp and I've never used webpack nor do I understand what it is. So, its definitely not supposed to work. That said if it is feasible to get it working, I'd review a PR with tests. Until then I'll close this issue to avoid confusion/the assumption that things should work. (Issueのコメントから抜粋)

そこで、今回はCloud Functionsの中からCloud PubSub のAPIを直接実行するような方法で実装を行いました。以下、その詳細について記述します。

環境

  • Cloud Functions: runtime=nodejs8
  • TypeScript
  • webpackでbundle
  • Cloud Functionsを実行するサービスアカウントには、任意のPubSub Topicに対してメッセージを送信する権限を付与済み

問題

@google-cloud/pubsub を利用して特定のtopicに対してpubsubをするとき、local環境で ts-node を使って実行すると問題なく動作しました。しかしながら、webpackでbundleして cloud functionsにデプロイしようとしたところ、下記のようなエラーが出ました。

Detailed stack trace: Error: package.json does not exist at /package.json
    at Object.exports.find (webpack:///./node_modules/grpc/node_modules/node-pre-gyp/lib/pre-binding.js?:18:15)
    at Object.eval (webpack:///./node_modules/grpc/src/grpc_extension.js?:29:12)
    at eval (webpack:///./node_modules/grpc/src/grpc_extension.js?:63:30)
    at Object../node_modules/grpc/src/grpc_extension.js (/srv/index.js:11604:1)
    at __webpack_require__ (/srv/index.js:20:30)
    at eval (webpack:///./node_modules/grpc/src/client_interceptors.js?:144:12)
    at Object../node_modules/grpc/src/client_interceptors.js (/srv/index.js:11557:1)
    at __webpack_require__ (/srv/index.js:20:30)
    at eval (webpack:///./node_modules/grpc/src/client.js?:35:27)
    at Object../node_modules/grpc/src/client.js (/srv/index.js:11545:1)

この問題について調査していったところ、node-pre-gypのissue にたどり着きました。

対応方法

node-pre-gyp がwebpackでの利用を想定していないということなので、 @google-cloud/pubsub の利用を諦めて、直接 REST APIでpubsubを実行することにしました。GCPにおいてリソースを操作する方法は REST APIとgRPCの2つあります。nodeでgRPCで実行する際には node-pre-gyp が必須となってしまうため、今回は REST API で Cloud PubSubを操作することにしました。

メッセージ送信

GCPから提供されている Cloud PubSubのREST APIドキュメントは下記になります。

https://cloud.google.com/pubsub/docs/reference/rest/v1/projects.topics/publish https://cloud.google.com/pubsub/docs/reference/rest/v1/PubsubMessage

こちらのドキュメントには、Cloud PubSubでメッセージを送信するために必要なパラメータは下記のようになると記載されています。(こちらは必要最小限のデータのみを載せています)

POST https://pubsub.googleapis.com/v1/projects/${project_name}/topics/${topic_name}:publish
{
  "messages": [ { "data": string(Base 64でエンコードされている文字列) } ]
}

REST APIで実施する場合は、認証をする必要があります。認証に関するドキュメントはこちらです。 https://developers.google.com/identity/protocols/OAuth2#serviceaccount

認証はOAuth 2.0で行う必要があります。GoogleのサーバーからAccessTokenを取得し、そのAccessTokenをAPI Requestのbearer トークンタイプとして渡す必要があります。このとき、API Requestは下記のようになります。

POST https://pubsub.googleapis.com/v1/projects/${project_name}/topics/${topic_name}:publish
Content-Type: "application/json"
Authorization: Bearer ${accessToken}

{
  "messages": [ { "data": string(Base 64でエンコードされている文字列) } ]
}

参考: OAuth2.0 rfc アクセストークンを利用したAPI Request

コード

上記 REST API をリクエストするnodeのコードは下記のようになります。googleapis にはアクセストークンを取得するメソッドが存在するので、簡単に実現することができました。

import { google } from "googleapis";
import axios from "axios";
const url = "https://pubsub.googleapis.com/v1/projects/${project_name}/topics/${topic_name}:publish";

async function main() {
  const token = await google.auth.getAccessToken();
  const data = { messages: [ { data: new Buffer("hogehoge").toString("base64") }]}
  const config = {
    headers: {
      "Content-Type": "application/json",
      Authorization: `Bearer ${token}`,
    },
  };
  return await axios.post(url, data, config);
}

main();

結果

$ npx ts-node sample.ts
{ messageIds: [ '512512244825408' ] }

ということで、Cloud PubSubでメッセージを送信できていることを確認できました!(ちなみに、諸々の事情で載せてはいませんが、Cloud Functionsから実行しても問題なく動きました )

Stackdriver Logging を利用して特定の条件に一致したLogの情報をSlackに通知する

最近、Cloud Functionsを利用してサーバレスでシステムを構築しています。けれども、まだまだサーバレスでシステムを作り慣れていないので、Cloud Functionsが正しく実行されたのか、ちょっと不安なので確認したいと考えていました。そこで、Cloud Functionsが実行完了した際に、それを検知してSlackにメッセージを送信する仕組みを作ってみたので、その説明をします。

TL;DR

  • Cloud Functionsが実行完了した際に通知してくれる仕組みを作りたかった
  • Cloud Functionsの実行ログが流れる stackdriver loggingを利用して、フィルタを設定する
  • フィルタに一致するログを受け取ったら pub/subでメッセージを送信
  • 上のメッセージをフックにしてSlack通知を行う Cloud Functionを実装
  • それによって目的を達成するシステムを実装した

システムの全体像

f:id:selmertsx:20190325202219p:plain

Stackdriver Loggingのログエクスポート機能を用いて、指定したフィルタに一致するログを取得しCloud Pub/Subでメッセージとして流します。そのメッセージを受け取った Cloud FunctionsがSlackに通知します。

前提知識

Stackdriver Loggingについて

https://cloud.google.com/logging/?hl=ja

Stackdriver Loggingはログデータやイベントを格納、検索、分析、モニタリング、通知するためのサービスです。Stackdriver LoggingのAPIを使えば、あらゆるソースからデータを取り込むことができます。GCPのサービスは、基本的にStackdriver Loggingにログが送られるようになっており、すぐに利用することができます。

Stackdriver Loggingではすべてのクラウドログを一箇所に集めて管理します。そのためそれら膨大な種類のデータの中から、価値のあるデータを拾い集め、適切な処理を促す仕組みが充実しています。今回はその中のExport機能を利用してCloud Functionsの実行通知をすることにしました。

Stackdriver LoggingのExport機能について

https://cloud.google.com/logging/docs/export/?hl=ja

Stackdriver Loggingには、ログをエクスポートする機能があります。エクスポートする用途としては下記の項目が挙げられます。

  • ログを長期間保存するため。(通常のログは30日間程度保持されます)
  • ログを分析するため
  • 他のアプリケーションで利用するため

上記の用途で利用できるようにするため、Stackdriver Loggingでは Cloud Storage, BigQuery, Cloud Pub/Subの3つにログをエクスポートすることができます。

StackdriverでLogをエクスポートする設定を理解するためにはsinkと呼ばれるオブジェクトについて理解する必要があります。Sinkオブジェクトは、自身の名前、エクスポートするログを選択するためのフィルタ、そしてフィルタに引っかかったログのエクスポート先の3つの要素から構築されます。

Logging自体には、ログをエクスポートする上で料金や制限は存在しません。けれども、エクスポート割きでのログデータの保存や送信には料金が掛かります。

Sinkはproject, organizations, billing_accounts単位で設定できます。そのため、監査ログを一元管理したいときなどはorganizationsに対してSinkの設定をするなど、用途に応じて適用する範囲を設定する必要があります。

設定と実装

要件

sample_cloud_function という名前の cloud functionsが実行完了した際に、その実行完了のログを検知してSlackに通知する」という要件で Stackdriver LoggingのログをSlackに通知してみます。

フィルタの設定

フィルタの設定については、こちらのページに設定方法が記載されています。設定に利用できるプロパティについては、こちら に記載されています。なお、resource type毎のlabelの値については、資料を見つけることができなかったので記載していません。

上記資料をもとにフィルタを自分で作成してみます。今回はsample_cloud_functionという名前のcloud functionが実行を完了したら、任意のpub/sub topicにメッセージを送信する必要があります。その要件を満たすフィルタは下記のようなります。

resource.type = "cloud_function"
resource.labels.region = "asia-northeast1"
resource.labels.function_name = "sample_cloud_function"
textPayload: "finished"

なおフィルタ設定時のベストプラクティスについては、下記のように記述されています。僕は毎回判断することが手間だったので、必ず引用符を用いるようにしています。

ベスト プラクティス: フィールド値と比較する文字列は引用符で囲むようにしてください。これにより、比較の意味が変わったり、デバッグが困難になるような誤りを防ぐことができます。文字の後に連続した文字、数字、アンダースコア(_)が続く単語の場合は、引用符を省略できます。

エクスポート先の設定 [terraform]

https://www.terraform.io/docs/providers/google/r/logging_project_sink.html

今回はPub/Subの cloud-functions-activity というTOPICに対してメッセージを送信することにしています。上記terraformのドキュメントを見ると、下記のように設定すると書かれています。

destination - (Required) The destination of the sink (or, in other words, where logs are written to). Can be a Cloud Storage bucket, a PubSub topic, or a BigQuery dataset. Examples: "storage.googleapis.com/[GCS_BUCKET]" "bigquery.googleapis.com/projects/[PROJECT_ID]/datasets/[DATASET]" "pubsub.googleapis.com/projects/[PROJECT_ID]/topics/[TOPIC_ID]" The writer associated with the sink must have access to write to the above resource.

unique_writer_identity については、projectをまたいで利用する場合はtrueにする必要があります。将来的に複数のprojectのcloud functionsのlogをここで通知したいと考えているので、ここでは true としました。

resource "google_logging_project_sink" "sample_cloudfunction" {
    name = "sample_cloudfunction_sink"
    destination = "pubsub.googleapis.com/projects/xxx/topics/cloud-functions-activity"
    filter = "resource.type = 'cloud_function' AND resource.labels.region = 'asia-northeast1' AND resource.labels.function_name = 'sample_cloudfunction' AND textPayload: 'finished'"
    unique_writer_identity = true
}

デプロイの設定 [cloudbuild]

cloudbuildを利用してデプロイしているので、その設定を載せておきます。とはいえ、重要なポイントは --trigger-topic のオプションの引数が、エクスポート先の設定で指定した cloud-functions-activity というトピックを指定しているという点のみです。

- name: 'gcr.io/cloud-builders/gcloud'
    args:
    - beta
    - functions
    - deploy
    - slack_reporter
    - --region=asia-northeast1
    - --stage-bucket=cf-bucket-for-xxx
    - --trigger-topic=cloud-functions-activity
    - --runtime=nodejs8

Pub/Sub Messageの内容

https://cloud.google.com/logging/docs/export/using_exported_logs?hl=ja#pubsub-organization

上記ドキュメントによると、Cloud Pub/Subによってストリーミングされるログは、下記のようなフォーマットになります。

{
 "receivedMessages": [
  {
   "ackId": "dR1JHlAbEGEIBERNK0EPKVgUWQYyODM...QlVWBwY9HFELH3cOAjYYFlcGICIjIg",
   "message": {
    "data": "eyJtZXRhZGF0YSI6eyJzZXZ0eSI6Il...Dk0OTU2G9nIjoiaGVsbG93b3JsZC5sb2cifQ==",
    "attributes": {
     "compute.googleapis.com/resource_type": "instance",
     "compute.googleapis.com/resource_id": "123456"
    },
    "messageId": "43913662360"
   }
  }
 ]
}

dataフィールドを base64でデコードすると、下記のようなLogEntry オブジェクトが取得できます。Cloud Functionの中では下記のObjectを利用してSlackへ送信するメッセージを構築します。

{
  "log": "helloworld.log",
  "insertId": "2015-04-15|11:41:00.577447-07|10.52.166.198|-1694494956",
  "textPayload": "Wed Apr 15 20:40:51 CEST 2015 Hello, world!",
  "timestamp": "2015-04-15T18:40:56Z",
  "labels": {
    "compute.googleapis.com\/resource_type": "instance",
    "compute.googleapis.com\/resource_id": "123456"
  },
  "severity": "WARNING"
}

Cloud Functionの実装

Pub/Sub Messageの内容で確認したObjectを利用してSlackに通知するスクリプトは下記のようになります。このとき SlackのTokenやChannel ID については、こちらの公式ドキュメント に記載されている方法で行っていますが、今回は省略させていただきます。

//index.ts
export async function slack_reporter(data: any) {
  const dataBuffer = Buffer.from(data.data, "base64");
  const logEntry = JSON.parse(dataBuffer.toString("ascii"));
  const client = await SlackClient.create();
  await client.post(`cloud function: ${logEntry.resource.labels.function_name} textPayload: ${logEntry.textPayload}`);
}
// SlackClient.ts
import { WebAPICallResult, WebClient } from "@slack/client";

export class SlackClient {
  public static async create(): Promise<SlackClient> {
    if (!this.instance) {
      this.instance = new SlackClient();
    }
    return this.instance;
  }

  private static instance: SlackClient;

  private slackCleint: WebClient;
  private readonly channel: string = process.env.CHANNEL_ID as string;
  private readonly token: string = process.env.SlackToken as string;

  constructor() {
    this.slackCleint = new WebClient(token);
  }

  public async post(text: string): Promise<WebAPICallResult> {
    return this.slackCleint.chat.postMessage({
      username: "ERP-HR Bot",
      channel: channel,
      text,
    });
  }
}

結果

f:id:selmertsx:20190325202416p:plain

ということで sample_cloud_functionsの実行完了を検知してSlack通知する仕組みを作ることができました。今後は、severity のレベルに応じてメッセージの内容を変更し、迅速に対応が必要なものがあれば即座に分かるようなところまで作ろうかなーとか考えたりしています。

所感

Stackdriver Logging、めちゃくちゃ便利です。一度 Stackdriver Loggingに集約することによって、ログの処理を一元管理することができます。一元管理することで、ログの前処理やBigQueryへのインポート等々、様々な処理を共通化できる気配を感じます。今回はこのような使い方をしましたが、RailsのLogなどを全てBigQueryに入れてしまって、BigQueryで分析することなどもできるのではないかな〜と思ったりしてます(昔Amazon Athenaでやってて、それなりに便利だった)。

もっとガシガシ使って、ポテンシャルを引き出していこう。