BASEプロダクトチームブログ

ネットショップ作成サービス「BASE ( https://thebase.in )」、ショッピングアプリ「BASE ( https://thebase.in/sp )」のプロダクトチームによるブログです。

LocalStack の EventBridge Scheduler にある制約とその対処法

はじめに

この記事はBASEアドベントカレンダーの9日目の記事です。

devblog.thebase.in

基盤グループの @okinaka です。最近は、メール配信基盤の構築を担当しています。

今回は LocalStack の EventBridge Scheduler にある制約と、その対処法についてお話しします。

LocalStack と AWS EventBridge Scheduler

私が担当しているメール配信基盤は、AWS のサービスを組み合わせて作られています。

開発には Docker 上で AWS サービスをエミュレートした LocalStack を活用していて、私のお気に入りのツールです。特に Lambda 関数は、AWS サービスとの連携を前提としているため、ローカルでの動作確認には必須と言っていいかもしれません。

それに加えて最近のお気に入りの一つに AWS EventBridge Scheduler というサービスがあります。

EventBridge というサービスがありますが、それとは別のものです。

AWS EventBridge Scheduler には以下の特徴があります。

  • フルマネージドのサーバーレスなスケジューラーです。
  • AWSサービスや標準HTTP/Sエンドポイントを自動的に起動するスケジュールタスクを簡単に作成・管理できます。
  • Lambda、SQS、SNS、Step Functionsなど、200以上のAWSサービスを直接ターゲットとして呼び出すことができます。

メール配信では、日時を指定してメール配信するスケジュール機能として採用することにしました。一度限りのスケジュールを設定するのにとても有用です。(定期実行にも対応しています)

LocalStack にある制約

ありがたいことに LocalStack は、EventBridge Scheduler にも対応しています。

LocalStack は、よくできたエミュレーターですが完全に本物のAWS の挙動に対応しているわけではありません。初めのうちは喜んで開発を進めていたのですが、実装を進めているうちに以下の制約があることに気づきました。

EventBridge Scheduler in LocalStack only provides mocked functionality. It does not emulate actual features such as schedule execution or target triggering for Lambda functions or SQS queues. (LocalStack の EventBridge Scheduler はモック機能のみを提供します。スケジュール実行や Lambda 関数や SQS キューのターゲットトリガーといった実際の機能はエミュレートされません。)

https://docs.localstack.cloud/aws/services/scheduler/#current-limitations

肝心のスケジュール実行ができないなんて困ってしまいました。ただ、これで諦めてしまうのはもったいないです。

開発環境なので、実装方法はこだわらなくても動いてくれればよいので、足りない部分を補うような仕組みを用意してみました。

制約の対処方法

LocalStack は EventBridge (rule の方) にも対応しているので、これで Lambda 関数を定期実行することでスケジュール実行の代わりをさせます。

今回は、ターゲットとして SQS のキューに Input の内容を送る仕組みを作ってみます。

構成 (シーケンス図)

本来は EventBridge Scheduler にスケジュール作成すれば、SQS に送ってくれるのですが、LocalStack では、間に EventBridge と Lambda を挟む構成になっています。

  1. 本来のアプリから Scheduler にスケジュールを作成
  2. EventBridge Rule が毎分 Lambda を起動
  3. Lambda は Scheduler から情報を取得し、期限超過なら SQS へ投入後、スケジュールを削除

実装

Go 言語の Lambda 関数コードの例です。

このコードは一回限りの実行 (at 式)のみに対応しています。(cronrate などの繰り返しには未対応です)

package main
import (
    "context"
    "errors"
    "strings"
    "time"

    "github.com/aws/aws-lambda-go/lambda"
    "github.com/aws/aws-sdk-go-v2/aws"
    "github.com/aws/aws-sdk-go-v2/config"
    "github.com/aws/aws-sdk-go-v2/service/scheduler"
    "github.com/aws/aws-sdk-go-v2/service/sqs"
)
var (
    schClient *scheduler.Client
    sqsClient *sqs.Client
)
func init() {
    // aws クライアントの初期化
    cfg, err := config.LoadDefaultConfig(context.TODO())
    if err != nil {
        log.Fatalf("unable to load SDK config, %v", err)
    }
    schClient = scheduler.NewFromConfig(cfg, func(o *scheduler.Options) {
        o.BaseEndpoint = aws.String("http://localhost.localstack.cloud:4566")
    })
    sqsClient = sqs.NewFromConfig(cfg, func(o *sqs.Options) {
        o.BaseEndpoint = aws.String("http://localhost.localstack.cloud:4566")
    })
}
func handleRequest(ctx context.Context) error {
    var maxResults int32 = 10
    var nextToken *string

    // 有効なすべてのスケジュールを取得
    for {
        resp, err := schClient.ListSchedules(ctx, &scheduler.ListSchedulesInput{
            MaxResults: &maxResults,
            NextToken:  nextToken,
        })
        if err != nil {
            return err
        }
        for _, sch := range resp.Schedules {
            // スケジュールの詳細を取得
            s, err := schClient.GetSchedule(ctx, &scheduler.GetScheduleInput{
                Name:      sch.Name,
                GroupName: sch.GroupName,
            })
            if err != nil {
                return err
            }
            // スケジュールの必須フィールドが存在することを確認
            if s.ScheduleExpression == nil || s.ScheduleExpressionTimezone == nil || s.Target == nil || s.Target.Arn == nil {
                return errors.New("schedule is missing required fields")
            }
            loc, err := time.LoadLocation(*s.ScheduleExpressionTimezone)
            if err != nil {
                return err
            }
            t, err := timeFromAt(*s.ScheduleExpression, loc)
            if err != nil {
                return err
            }
            // スケジュールの実行時間が過ぎている場合、ジョブを実行
            if t.Before(time.Now()) {
                // ジョブの実行 (SQS にメッセージを送信)
                queueUrl := queueUrlFromArn(*s.Target.Arn)
                _, err = sqsClient.SendMessage(ctx, &sqs.SendMessageInput{
                    QueueUrl:    &queueUrl,
                    MessageBody: s.Target.Input,
                })
                if err != nil {
                    return err
                }
                // ジョブの実行後、スケジュールを削除
                _, err = schClient.DeleteSchedule(ctx, &scheduler.DeleteScheduleInput{
                    Name:      s.Name,
                    GroupName: s.GroupName,
                })
                if err != nil {
                    return err
                }
            }
        }
        if resp.NextToken == nil {
            break
        }
        nextToken = resp.NextToken
    }
    return nil
}
// "at(2025-12-24T12:00:00)" のような at 式から時間を抽出する関数
func timeFromAt(expr string, loc *time.Location) (time.Time, error) {
    expr = strings.TrimSpace(expr)
    if !strings.HasPrefix(expr, "at(") || !strings.HasSuffix(expr, ")") {
        return time.Time{}, errors.New("not an at expression")
    }
    body := strings.TrimSuffix(strings.TrimPrefix(expr, "at("), ")")
    t, err := time.ParseInLocation("2006-01-02T15:04:05", body, loc)
    if err != nil {
        return time.Time{}, err
    }
    return t, nil
}
// ARN からキュー名とリージョンを抽出し、QueueUrl を生成する関数
func queueUrlFromArn(arn string) string {
    parts := strings.Split(arn, ":")
    if len(parts) < 6 {
        return ""
    }
    region := parts[3]
    queueName := parts[len(parts)-1]
    // LocalStack用のURL
    return "http://sqs." + region + ".localhost.localstack.cloud:4566/000000000000/" + queueName
}
func main() {
    lambda.Start(handleRequest)
}

LocalStack を利用するための Docker の compose.yml の例です。

services:
  localstack:
    container_name: "${LOCALSTACK_DOCKER_NAME:-localstack-main}"
    image: localstack/localstack
    ports:
      - "127.0.0.1:4566:4566"            # LocalStack Gateway
      - "127.0.0.1:4510-4559:4510-4559"  # external services port range
    environment:
      - DEBUG=1                           # トラブルシューティングに役立つため、DEBUGログをonに設定
      - SERVICES=events,lambda,scheduler,sqs
    volumes:
      - "${LOCALSTACK_VOLUME_DIR:-./volume}:/var/lib/localstack"
      - "/var/run/docker.sock:/var/run/docker.sock"

LocalStack の環境を整えるための初期化スクリプト( init.sh ) の例です。 Lambda 関数のビルド&デプロイと EventBridge (rule) の設定を行います。 LocalStack の設定には awslocal コマンドを使用します。

#!/bin/bash

set -ex

# Lambda 関数の名前
func_name=localstack-schedule-executor

# Lambda 関数のビルドとパッケージング
GOARCH=arm64 GOOS=linux CGO_ENABLED=0 go build -o bootstrap main.go
zip ${func_name}.zip bootstrap

# LocalStack 上にデプロイ
awslocal lambda create-function \
  --function-name ${func_name} \
  --architectures arm64 \
  --runtime provided.al2023 \
  --handler bootstrap \
  --zip-file fileb://${func_name}.zip  \
  --role arn:aws:iam::000000000000:role/lambda-role \
  --timeout 30

# create-function 実行完了まで待つ
sleep 5

# EventBridge ルールの作成とターゲットの設定
awslocal events put-rule \
    --name schedule-execution-rule \
    --schedule-expression 'rate(1 minute)'

awslocal lambda add-permission \
  --function-name ${func_name} \
  --statement-id schedule-execution-permission \
  --action 'lambda:InvokeFunction' \
  --principal events.amazonaws.com \
  --source-arn arn:aws:events:${AWS_DEFAULT_REGION}:000000000000:rule/schedule-execution-rule

awslocal events put-targets \
  --rule schedule-execution-rule \
  --targets '[{"Id":"1","Arn":"arn:aws:lambda:'${AWS_DEFAULT_REGION}':000000000000:function:'${func_name}'"}]'

# SQS のキューを作成 (確認用)
awslocal sqs create-queue --queue-name test-queue

実行してみます。EventBridge Scheduler に値をセットして様子を見ます。(例では日本時間の 12/24 12:00 に設定)

$ docker compose up -d

$ sh init.sh

$ awslocal scheduler create-schedule \
 --name test-schedule \
 --schedule-expression 'at(2025-12-24T12:00:00)' \
 --target '{"RoleArn": "arn:aws:iam::000000000000:role/schedule-role", "Arn":"arn:aws:sqs:us-east-1:000000000000:test-queue", "Input": "test" }' \
 --flexible-time-window '{ "Mode": "OFF"}' \
 --schedule-expression-timezone 'Asia/Tokyo'

実際に SQS キューにメッセージが入るのかを確認します。

$ awslocal sqs receive-message --queue-url http://sqs.us-east-1.localhost.localstack.cloud:4566/000000000000/test-queue
{
    "Messages": [
        {
            "MessageId": "73db45fd-e1b1-4376-bdaf-348e1a6411cb",
            "ReceiptHandle": "NWVjMDAyZDMtNjlhYi00ZGVlLWE3MjAtNjQ5ZTc1ODlhOGJkIGFybjphd3M6c3FzOnVzLWVhc3QtMTowMDAwMDAwMDAwMDA6dGVzdC1xdWV1ZSA3M2RiNDVmZC1lMWIxLTQzNzYtYmRhZi0zNDhlMWE2NDExY2IgMTc2NDI0MzIyNC4xNDMzNTgy",
            "MD5OfBody": "098f6bcd4621d373cade4e832627b4f6",
            "Body": "test"
        }
    ]
}

完全なエミュレートではありませんが、これで必要な機能を実現できました。

やったね!

おわりに

LocalStack の足りない機能を、既存のものを組み合わせて補えることが面白いなと思い紹介しました。いずれは公式でサポートされることになるとは思いますが、それまでのつなぎとして参考になれば幸いです。

明日は、BASEアドベントカレンダーは @FujiiMichiro さんの記事です。お楽しみに!

BASE株式会社ではエンジニアを採用募集中ですのでご興味あれば下記の採用ページをご覧ください。 binc.jp