はじめに
この記事では、Azure Durable Functionsについて解説します。
Azure Durable Functions
Durable Functionsは、Azure Functionsの拡張機能で、状態を保持する(ステートフルな)ワークフローを実現可能にする機能です。
複数の処理を順番や並列で実行し、状態管理や再実行といった処理をユーザーは実装する必要がなく、Durable Functions側で自動で行うため、開発者はビジネスロジックの実装に集中することができます。
また、HTTPトリガー関数の場合3分50秒のタイムアウト制限があるため、長時間の処理を行うことができませんが、Durable Functionsを使うことで、長時間の処理を行うことが可能になります。
Azure Durable Functionsのアーキテクチャ
Durable Fucntionsは以下の4つの関数で構成されています。
-
クライアント関数 (Client Function)
-
オーケストレーター関数 (Orchestrator Function)
-
アクティビティ関数 (Activity Function)
-
エンティティ関数 (Entity Function)
それぞれの次の項で、それぞれの関数について解説していきます。
クライアント関数
Durable Functionsをスタートさせるトリガー関数です。
通常のFunctionsと同様にHTTPトリガーやタイマートリガーなどイベントドリブンで実装されており、
定義したトリガーからイベントなどを受け取って、オーケストレーター関数を起動するのがこの関数の役目です。
オーケストレーター関数
後述するアクティビティ関数の実行を管理する役割の関数です。
後述するアクティビティ関数や他の関数を記載された通りに起動します。
ただし注意点として、オーケストレーター関数は 決定的(deterministic) である必要があります。
決定的とは、「同じ入力なら、いつ呼んでもまったく同じ動きをする」ように書く必要があることを示します。
現在時刻やランダム値を使った処理など、毎回結果が変わるようなコードをオーケストレーター関数で定義することはできません。
アクティビティ関数
実際のビジネスロジック部分の処理を行う関数です。
例えば、データベースにアクセスしたり、APIを呼んだり、実行時間のかかる処理をしたりするのはこの関数の役目。
この関数は「少なくとも1回は呼ばれること」が保証されていますが、「2回以上呼ばれる可能性」もあります。
そのため、べき等性(同じ処理を何回やっても結果が変わらないこと)を意識して書く必要があります。
エンティティ関数
Durable Functions内で状態を保持・更新するための関数。
例えば、「ユーザーのポイントを加算する」とか、「現在のステータスを記録しておく」といった使い方をします。
オーケストレーター関数と違って、コードの書き方に制限はありません。
また、他の関数と違って、必ずしも実装する必要はありません。
ステータスを保持する必要がある場合にのみ、実装しましょう。
エンティティ関数は、クライアント関数または、オーケストレーター関数から呼び出されます。
※この機能は Durable Functions の バージョン2.0以降で使用可能です。
Durable Functionsのコード例
以下にMicrosoft公式ドキュメントに記載されているDurable FunctionsのPythonコードの例を示します。
import azure.functions as func
import azure.durable_functions as df
myApp = df.DFApp(http_auth_level=func.AuthLevel.ANONYMOUS)
# クライアント関数: HTTPトリガーで定義され、リクエストURLに応じたオーケストレーター関数の起動および、クライアントへのレスポンスを応答
@myApp.route(route="orchestrators/{functionName}")
@myApp.durable_client_input(client_name="client")
async def http_start(req: func.HttpRequest, client):
function_name = req.route_params.get('functionName')
instance_id = await client.start_new(function_name)
response = client.create_check_status_response(req, instance_id)
return response
# オーケストレーター関数: アクティビティ関数を呼び出し、管理する
# この例だと3つのアクティビティ関数を実行しており、3つの実行がすべて完了してからレスポンスを応答するようになっています
@myApp.orchestration_trigger(context_name="context")
def hello_orchestrator(context):
result1 = yield context.call_activity("hello", "Seattle")
result2 = yield context.call_activity("hello", "Tokyo")
result3 = yield context.call_activity("hello", "London")
return [result1, result2, result3]
# アクティビティ関数: 実際のビジネスロジックを定義:この例だと`Hello {都市の名前}`を応答
@myApp.activity_trigger(input_name="city")
def hello(city: str):
return f"Hello {city}"
リクエストフロー
初回リクエスト
はじめにオーケストレーターを起動するため、https:/{Functionsのエンドポイント}/orchestrators/hello_orchestratorにリクエストをします。
するとHTTPステータスコード202で以下のメッセージが応答されます。
{
"id": "a09fbb75133a4e2fa4e251255bc3e0b5",
"statusQueryGetUri": "https://....",
"sendEventPostUri": "https://....",
"terminatePostUri": "https://....",
"rewindPostUri": "https://....",
"purgeHistoryDeleteUri": "https://....",
"restartPostUri": "https://....",
"suspendPostUri": "https://....",
"resumePostUri": "https://...."
}
ここで重要となるのがstatusQueryGetUriの項目です。
2回目のリクエスト(statusQueryGetUri)
statusQueryGetUriのURLにリクエストすると、オーケストレーター関数の状態を取得できます。
{
"name":"hello_orchestrator",
"instanceId":"1a38e3f7860044d1a05885633c9aadc9",
"runtimeStatus":"Pending",
"input":null,
"customStatus":null,
"output": null,
"createdTime":"2025-03-07T01:44:16Z",
"lastUpdatedTime":"2025-03-07T01:44:22Z"
}
ここで、runtimeStatusに注目してください。
runtimeStatusがPendingのときはまだ待機中で、クライアント側のアプリはruntimeStatusがCompletedになるまでポーリングする必要があります。
runtimeStatusのステータス種類は以下の通りです。
ステータス | 内容 |
---|---|
Running | オーケストレーター関数が実行中の状態 |
Completed | オーケストレーター関数の処理正常終了 |
Failed | オーケストレーター関数の処理が失敗 |
Terminated | オーケストレーター関数の処理が完了前に停止 |
Pending | オーケストレーター関数が待機中 |
runtimeStatusがCompletedになったら、outputに結果が入ります。
以下のように3つのレスポンスがリスト形式で応答されており、複数のアクティビティ関数の実行がすべて完了してから、
レスポンスが応答されるようになっていることが分かります。
{
"name":"hello_orchestrator",
"instanceId":"1a38e3f7860044d1a05885633c9aadc9",
"runtimeStatus":"Completed",
"input":null,
"customStatus":null,
"output": ["Hello Seattle","Hello Tokyo", "Hello London"],
"createdTime":"2025-03-07T01:44:16Z",
"lastUpdatedTime":"2025-03-07T01:44:22Z"
}
Durable Fuctionsを使用することで、複数のアクティビティ関数を実行するなど処理時間がかかるケースでも、 非同期的に処理を管理し、結果を効率的に取得することが可能です。これにより、開発者は複雑なワークフローを簡潔に実装でき、アプリケーションのスケーラビリティや信頼性を向上させることができます。
Durable Functionsの用語解説
Durable Functions の「タスクハブ」
タスクハブは、オーケストレーター関数やアクティビティ関数などの進行状況・履歴・メッセージを格納する領域です。
Azure公式ドキュメント:Durable Functions におけるタスク ハブ (Azure Functions)
タスクハブがあるおかげで、アプリケーションが何らかの理由で一時的に停止または中断した後、再起動が必要になった場合でも、中断したところから処理を再開することができます。
タスクハブに使用するプロパイダー
タスクハブを使用するにはデータを保存するためのストレージやDBを選択する必要があります。
プロパイダーとしては、Azure StorageのQueueストレージ、MicrosoftSQLなどが使用可能です。
ワークアイテム
Azure Functionsの関数アプリが処理するタスクハブ内のメッセージの処理単位のことをワークアイテムといいます。
関数アプリは継続的に処理すべきワークアイテムがないか監視します。
ワークアイテムには2種類あり、処理させる関数によって種類が分かれています。
-
アクティビティワークアイテム
アクティビティ関数を実行してメッセージ処理する -
オーケストレーターワークアイテム
オーケストレーター関数またはエンティティ関数を実行する → 処理が終わるとタスク ハブに状態が保存(コミット)
タスクハブでの処理の流
- クライアントがリクエスト
- タスクハブにインスタンス状態と初期メッセージが記録
- Functionsのワーカーがオーケストレーター関数を実行し、アクティビティ関数を呼び出す
- 各アクティビティ関数の処理完了ごとに結果が返り、順次再開
- 最後のアクティビティ関数の処理完了後、runtimeStatusをCompletedに設定
Durable Functionsの活用例
Durable Functionsは、以下のようなシナリオで特に有用です。
1. 長時間実行されるワークフロー
例えば、複数の外部APIを順番に呼び出し、それぞれの結果を集約するような処理に適しています。Durable Functionsを使用することで、タイムアウトの制限を気にせずに実装できます。
2. 状態を持つプロセスの管理
ユーザーの注文処理や、複数ステップに分かれた承認フローなど、状態を保持しながら進行するプロセスを簡単に管理できます。
おわりに
Azure Durable Functionsは、複雑なワークフローや長時間実行される処理を簡潔に実装できる強力なツールです。この記事で紹介した基本的な概念やコード例を参考に、ぜひ実際のプロジェクトで活用してみてください。