本日も乙

ただの自己満足な備忘録。

Stackdriver Logging, Cloud Pub/Sub, Cloud Functionsで特定のイベント時にSlack通知する

GCPのCloud FunctionsはHTTPリクエスト、Cloud Pub/Sub、Firebase、GCS *1 をトリガーに呼び出すことができる、イベント駆動型のサーバレスサービスです。よくある例として、Cloud Storage(GCS)にファイルを置いた場合にCloud Functionsを実行して何かしらの処理をするといったことができます。上手く使えば色んなことができる楽しいサービスなのですが、トリガーとして連携できるサービスがまだまだ少ないのが実情です。

例えば、

  • Firewall Rules を変更した場合に通知したり、あるいは要件として定めている設定に戻す
  • Firewall Rules で危ない設定( 0.0.0.0/0 でSSH接続できる )にしたときに通知する
  • Compute Engine(GCE)インスタンスが起動したときに、ラベルが要件として定めている条件に設定されているかをチェックする(条件に沿わない場合は通知する)

といった、特定のイベント時に何かアクションをしたい場合が出てきます。 AWSだと AWS Config + Amason SNS + AWS Lambda でやるようなことですね。

今回は、例としてGCEインスタンスが停止もしくは削除したときにSlack通知する仕組みを Stackdriver Logging(Cloud Audit Logging)、Cloud Pub/Sub、Cloud Functions で実装してみました。

目次

アーキテクチャ

全体の流れを下図にまとめました。

event-occur-architecture

Stackdriver Logging のなかでも Cloud Audit Logging で取得される、システムイベントログで特定のイベントのフィルタを設定します。フィルタを通過したイベントログは Cloud Pub/Sub 経由で Cloud Functions が呼び出され、Slack通知される、といった仕組みです。

Stackdriver Logging のエクスポートの設定

マネジメントコンソール にアクセスし、「エクスポート」をクリック、「エクスポートを作成」をクリックします。

stackdriver-logging

フィルタの右側(▼)から「高度なフィルタに変換」をクリックし、テキストボックスに以下のようなフィルタを入力します。

resource.type="gce_instance"
protoPayload.response.operationType=("stop" OR "delete")

このフィルタはGCEインスタンスが停止または削除された条件を表しています。
右側の「エクスポートの編集」で下図のように入力します(シンク名は gce_instance_stop_or_delete としています)。

stackdriver-logging

シンクサービスに Cloud Pub/Sub 、シンクのエクスポート先で「新しい Cloud Pub/Sub トピックを作成」を選択し、トピック名を入力します(トピック名は gce_instance_stop_or_delete としています)。

stackdriver-logging

Cloud Functions の設定

次に Cloud Functionsのページ に飛んで Cloud Functionsの関数を作成します。もし、「Cloud Functions API が有効になっていません」と表示された場合は「APIを有効にする」というボタンをクリックしてください。

下図のように入力します(関数名は send-to-slack としています)。

cloudfunctions

ランタイムを Python 3.7 にし、以下のコードをインラインエディタに貼り付けます。

import base64, os, json, time
import urllib.request

channel = os.environ.get('SLACK_CHANNEL')
url = os.environ.get('SLACK_WEBHOOK_URL')

def send_slack(payload):
    headers = {'Content-Type': 'application/json'}
    req = urllib.request.Request(url, data=payload, headers=headers, method='POST')
    try:
        with urllib.request.urlopen(req) as res:
            body = res.read()
    except urllib.error.HTTPError as err:
        print(err.code)
    except urllib.error.URLError as err:
        print(err.reason)

def main_handler(event, context):
    """Triggered from a message on a Cloud Pub/Sub topic.
    Args:
         event (dict): Event payload.
         context (google.cloud.functions.Context): Metadata for the event.
    """
    pubsub_message = json.loads(base64.b64decode(event['data']).decode('utf-8'))

    message = """
TimeStamp: %s
OperationType: %s
ResourceName: %s
""" % (
    pubsub_message[u'receiveTimestamp'],
    pubsub_message[u'protoPayload'][u'response'][u'operationType'],
    pubsub_message[u'protoPayload'][u'resourceName']
    )

    payload={
    'channel': channel,
    'username': 'GCE Instance Stop/Delete',
    "icon_emoji": ":information_source:",
    'attachments': [{
        'pretext': 'GCE Instance has been stopped or deleted.',
        'color': '#00F35A',
        'text': message,
    }]
}
    send_slack(json.dumps(payload).encode('utf-8'))

設定の続きは下図のようにします。上のコードは環境変数を2つ設定する必要があります。
SLACK_WEBHOOK_URL は Slack Webhook URL、 SLACK_CHANNEL は Slack のチャンネル名を指定してください。

cloudfunctions

動作検証

意図した通りに動作するのかを確認します。適当なGCEインスタンスを停止や削除すると、下図のように Slack チャンネルにポストされていればOKです。

slack

もし動作しなかったら?

以下のポイントを参考にデバッグしていってください。

  1. Stackdriver Logging のフィルタに誤りがないか?フィルタを適用して意図したイベントが拾えているか?
  2. Cloud Functions が正常に実行できているか、Stackdriver Logging の Cloud Functions のログを見てエラー等が出ていないか?

注意点

Cloud Pub/Sub が At-Least-Once であるため、場合によっては2回 Cloud Functions が実行される可能性があります。複数回実行しても同じ結果になる(=冪等性)ように処理を実装するか、複数回実行しても良い処理にする必要があることにご注意ください。

まとめ

今回は例としてGCEインスタンスの停止または削除したときにSlack通知する方法を紹介しました。Stackdriver Logging のフィルタ設定次第では任意のイベントで Cloud Functions を実行できるので色々できそうです。

参考文献

今日の一言

子供ができるとEテレ番組に詳しくなってしまいます。最近のEテレは大人が見ても楽しく作られています。とくに「みいつけた!」は有名歌手が作詞作曲 & 歌 & アニメキャラクター出演しているので誰が出ているのかを当てるのが楽しいです。

*1:今回紹介するStackdriver Logging はCloud Pub/Subを間に挟むため省略しています