Techに戻る

AWS Step Functionsを利用するでの個人的ヒント

AWSのサーバーレス開発の強い味方Step Functionsのまとめ

Step Functionsとは?

分散アプリケーション、マイクロサービスのコンポーネント疎結合化を可能にするAWSマネージドサービスのひとつです。各コンポーネントを独立させることが可能のため、アプリケーションのスケールや変更が比較的容易になります。

もっとも簡単なイメージは複数のLambda関数を連続して呼び出していく機構を手軽に作れるものですが、Lambda以外にもSNSやDynamoDB、SNSなど複数のAWSサービスと連携することが可能です。(Step Functions でサポートされる AWS サービス統合

ワークフローやステップに関して

Step FunctionsではワークフローをStatesで定義します。個別のStateでは、入力に基づいて決定を行い、アクションを実行して、出力を他の状態に渡すことが可能です。

AWS Step Functionsでは、ワークフローをAmazon States Language(JSON形式)で定義し、Step Functionsのコンソールにてグラフィカルに表示されます。

States内では以下に示すようなさまざまな関数を実行できます。

  • ステートマシンで何らかの作業をする (Task 状態)。
  • 実行の選択肢間で選択する (Choice 状態)
  • 失敗または成功で実行を停止する (Fail または Succeed 状態)
  • 入力を単純に出力に渡す、または一部の固定データを出力する (Pass 状態)
  • 一定時間または指定された時刻/日付まで遅延を提供する (Wait 状態)
  • 実行の並列ブランチを開始する (Parallel 状態)
  • ステップを動的に反復する (Map 状態)

ステートマシンの構造

要素内容
CommentASL(Amazon States Language)についてのコメント
StartAt最初に実行するStateを定義
TimeoutSecondsタイムアウト時間の指定
VersionASLのバージョン
Statesステートマシンを構成するStateを定義する。ワークフローを定義

Stateタイプ

要素内容
Taskひとつの処理単位
Wait処理をストップする時間
Pass入力値をそのまま出力
Parallel並列処理
Choice条件分岐
Map配列の要素ごとに動的並列処理を実行する
Fail実行結果を失敗とする
Succeed実行結果を成功とする

Task内で定義可能な要素

フィールド内容
NameTags
ResouceURI/呼び出すサービスのARN
InputPath値をStateに渡す
ParametersJSON形式でStateに値を渡す
ResultPathStateの実行結果をどのようなフィールド名で受け取るかを指定する
OutputPath次のStateに渡す値を指定する(未指定の場合、ResultPathの結果が採用される)
RetryStateでランタイムエラーが発生した場合の再試行ポリシー
Catch再試行ポリシーが使い果たされたか定義されていない場合に実行される処理
TimeoutSeconds処理をタイムアウトさせる時間
HeartbeatSecondsハートビートの間隔が指定した時間を超えた場合に失敗させる

State MachineのサンプルテンプレートがAWSから多数提供されているため、実現したい処理に近いものを参考にできます。
テンプレートからもわかるように、かなり多くのAWSサービスと連携でき、柔軟なワークフローが作成できます。

sample_template

Serverless FrameworkによるStep Functionsアプリのデプロイ

本記事ではStep FunctionsのStates作成をServerless Frameworkにて実施します。
ここではServerless Frameworkの詳細に関しては触れません。詳細に関してはServerless Framework公式をご覧ください。

Serverless Frameworkの導入

bash
$ sls create --template aws-python3 --path sls_sample_project
Serverless: Generating boilerplate...
Serverless: Generating boilerplate in "/mnt/c/Users/Yuta/git/sls_sample_project"
 _______                             __
|   _   .-----.----.--.--.-----.----|  .-----.-----.-----.
|   |___|  -__|   _|  |  |  -__|   _|  |  -__|__ --|__ --|
|____   |_____|__|  \___/|_____|__| |__|_____|_____|_____|
|   |   |             The Serverless Application Framework
|       |                           serverless.com, v2.41.2
 -------'

Serverless: Successfully generated boilerplate for template: "aws-python3"

今回はStep Functionsを使用したいため、Step Functions Pluginをインストールします。

bash
$ npm install --save-dev serverless-step-functions

Step Functionsの定義

Step Functionsの定義を行う場合、Serverless FrameworkではYAMLで記述する必要があります。はじめて作成したい場合などで、すべてをコードのみで書いていくのは非常に難しいと思います。
そこで、AWS StepFunctionsコンソールにてGUI操作でステップを作成し、CloudFormationのデザイナーを使用してJSONをYAML形式に変換するのがオススメです。

今回は以下の構成にしました。 CreateListにて処理を実施したいUser IDリストを作成し、MAP処理に渡します。MAPではインプットのリストを自動認識し、配列に合わせた並列を動的に行います。 MAP処理ではUser IDをもとにDynamoDBからGetItemにてデータを取得します。そしてLambda Processにて取得したデータを処理し、単純に出力します。その際、DynamoDBに指定したName IDがなかった場合、Lambdaの返り値をNullにし、次のJudgeSucceedInMapで判定を行います。

bash
.
├── serverless.yml
└── src
    ├── create_list.py
    └── process.py
  • create_list.py
python
def lambda_handler(event, context):
    
    process_list = [
        {
            'name_id': '1'
        },
        {
            'name_id': '2'        
        },
        {
            'name_id': '3'
            
        }
    ]
    
    return process_list
  • process.py
python
def lambda_handler(event, context):

    if 'Item' in event['DynamoDB']:
        item = event['DynamoDB']['Item']

    else:
        item = None

    return item
  • serverless.yml
yaml
service: sls-sample-project

frameworkVersion: '2'

provider:
  name: aws
  runtime: python3.8
  stage: dev
  region: us-east-1
  lambdaHashingVersion: 20201221
  memorySize: 128
  timeout: 10
  logRetentionInDays: 7

plugins:
  - serverless-step-functions


package:
  patterns:
    - '!./**/**'
    - '!./**'
  individually: true


functions:
  CreateList:
    handler: src/create_list.lambda_handler
    package:
      patterns:
        - src/**
  MainProcess:
    handler: src/process.lambda_handler
    package:
      patterns:
        - src/**


stepFunctions:
  stateMachines:
    SampleState:
      role:
        Fn::GetAtt: ["StateMachineRole", "Arn"]
      definition:
        Comment: >-
          Sample StepFunctions
        StartAt: StartProcess?
        States:
          StartProcess?:
            Type: Choice
            Choices:
              - Variable: $
                StringEquals: No messages
                Next: Finish
            Default: Create List

          Create List:
            Type: Task
            Resource: 
              Fn::GetAtt: [CreateList, Arn]
            InputPath: $
            ResultPath: $
            Next: MAP Process
          
          MAP Process:
            Type: Map
            Next: Finish
            Iterator:
              StartAt: Get Item from DynamoDB
              States:
                Get Item from DynamoDB:
                  Type: Task
                  Resource: arn:aws:states:::dynamodb:getItem
                  ResultPath: $.DynamoDB
                  Parameters:
                    TableName: sample-dynamodb
                    Key:
                      NameID:
                        S.$: $.name_id
                  Next: Lambda Process
                Lambda Process:
                  Type: Task
                  Resource: 
                    Fn::GetAtt: [MainProcess, Arn]
                  InputPath: $
                  ResultPath: $.result
                  Next: JudgeSucceedInMap
                JudgeSucceedInMap:
                  Type: Choice
                  Choices:
                    - Variable: $.result
                      IsNull: true
                      Next: NoItems
                  Default: SuccessInMap
                SuccessInMap:
                  Type: Pass
                  Result:
                    is_succeed: true
                  ResultPath: $.is_succeed
                  End: true
                NoItems:
                  Type: Pass
                  Result:
                    is_succeed: False
                  ResultPath: $.is_succeed
                  End: true

          Finish:
            Type: Succeed

resources:
  Resources:
    # DynamoDBの構築
    DynamoDbTable:
      Type: 'AWS::DynamoDB::Table'
      Properties:
        AttributeDefinitions:
          -
            AttributeName: NameID
            AttributeType: S

        # プロビジョニングするキャパシティーユニットの設定
        KeySchema:
          -
            AttributeName: NameID
            KeyType: HASH
        ProvisionedThroughput:
          ReadCapacityUnits: 1
          WriteCapacityUnits: 1
        # テーブル名の指定
        TableName: sample-dynamodb
    StateMachineRole:
      Type: AWS::IAM::Role
      Properties:
        RoleName: RoleName
        Path: /path_of_state_machine_roles/
        AssumeRolePolicyDocument:
          Statement:
          - Effect: Allow
            Principal:
              Service:
                - states.amazonaws.com
            Action:
              - sts:AssumeRole
        ManagedPolicyArns:
          - arn:aws:iam::aws:policy/PowerUserAccess # テストなので権限をかなり緩めにする

なお、コードはGitHubにもアップしました。

実行

DynamoDBに適当な項目を追加しておきます。

Stateマシーンを開き、実行を行います。

しっかり結果が得られていることが確認できました。

MAP利用時の注意

MAP内でのエラー発生

MAP内でエラーが発生したしまった場合、Stateマシーン全体が停止してしまいます。
そこで、MAP内にエラーをキャッチした場合に受けるType: PassのStateを用意することで回避できます。

本記事の例でもこの方法を採用しました。

並列数の注意

MAPはインプットから動的に並列数を決定し、実行します。
MAP内でAthenaなどを呼び出す際、API実行の上限があるため何かしらの対策が必要です。

一番良いのはSQSを利用する手だと思いますが、MAPに用意されているMaxConcurrencyRetryを使う方法もあります。

  • MaxConcurrency: 反復子の呼び出しを並列実行できる上限数を指定
  • Retry: 状態でランタイムエラーが発生した場合の再試行ポリシーを定義

まとめ

Step Functionsの基本的な利用方法と参考になるサイトをまとめました。
今回、Step Functionsに関して詳しくドキュメントを見ましたが、AWSのさまざまなサービスを呼び出すことができ、 これらを簡単にワークフロー化できるため、Serverlessなワークフローを作成する際にとても有用なサービスだと再認識しました。
Step Functionsを利用する機会は今後も増えると思いますので、新たな知見を今後もまとめていきます。

ここでの開発はServerless Frameworkを利用しましたが、AWSのサポートツールも充実してきているようですので、これらも触れてみたいと思います。

参考