はじめに
2024年5月から、Azure FunctionsのPythonでもHTTP Streamingが公式サポートされるようになりました。 本記事では、実装の流れとハマりやすい落とし穴について解説します。
ストリーミングとは
ストリーミングとは、サーバーからのレスポンスを一括で応答せず、小さなデータに分割し、逐次的に配信する通信方式のことを指します。
小規模データに分割して、逐次配信するため、応答時間が短縮されるため、 動画や音声などの大規模データの配信や、チャット通知や株価更新など、リアルタイム性が求められる場面で利用されています。
SSE(Server-Sent Events)とは
SSEは、日本語にするサーバー送信イベント。
クライアントが最初に送信する HTTP GETリクエスト をトリガーに、サーバーが通信を閉じずに継続送信する仕組みです。
サーバーは Content-Type: text/event-stream
ヘッダーを返し、その後は通信を閉じずイベントを逐次的にクライアントへ送信し続けます。
類似の通信方式として、WebSocketがありますが、こちらは双方向の通信方式になります。
- SSE:サーバー → クライアントの一方向通信(クライアントから送信するのは初回リクエストのみ)
- WebSocket:サーバー ↔ クライアントの双方向通信
背景と利用例
SSEは「受信専用のリアルタイム更新」が求められるケースに適しております。たとえば OpenAIのChatGPTをはじめとする生成AIサービス では、返答を一括で返すのではなく、トークン単位でストリーミング表示 する際に用いられることが多く、ユーザーはレスポンスを待たずに順次内容を確認できるため、体感速度が大きく向上いたします。
Azure Functionsでストリーミングを実装する
以前のAzure Functionsでは言語ランタイムがPythonの場合、Streamingがサポートされていませんでしたが、
2024年5月から、Azure FunctionsのPythonでもHTTP Streamingが公式サポートされるようになりました。
Functionsでストリーミングを実装する場合、以下の設定が必要になります。
環境変数設定
ストリーミングを使用する場合は、Azure Functionsの環境変数に以下の設定が必要になります。
この2つの変数の設定がないと、Functionsのリクエストがタイムアウトします。
設定がなくてもエラーログなどは出力されないので、トラブルシューティングに難儀することになることを防ぐため、設定忘れがないよう注意しましょう。
PYTHON_ENABLE_INIT_INDEXING=1
PYTHON_ISOLATE_WORKER_DEPENDENCIES=1
requirements.txt
ストリーミングする際は、拡張機能としてFast-APIを有効にする必要があります。
azure-functions
azurefunctions-extensions-http-fastapi
function_app.py
下記はHello → from → Azure → Functions → Stream!
の順に文字列をストリーミングで応答するコードです。
StreamingResponse
クラスに非同期関数generate_hello_stream()
の戻り値をyield
で指定することでストリーミングで応答されます。
yield
は、return
のように関数の戻り値を指定するキーワードですが、returnと違い、ジェネレーターとして値を順次返します。
Pythonにおけるジェネレータとは、「次のデータが必要になったら、そのときにデータをつくる仕組み」です。
import asyncio
import azure.functions as func
from azurefunctions.extensions.http.fastapi import Request, StreamingResponse
app = func.FunctionApp(http_auth_level=func.AuthLevel.ANONYMOUS)
async def generate_hello_stream():
messages = ["Hello", "from", "Azure", "Functions", "Stream!"]
for msg in messages:
yield f"data: {msg}\n\n" # SSE形式
await asyncio.sleep(0.5)
yield "data: DONE\n\n"
@app.route(route="chat", methods=[func.HttpMethod.GET])
async def hello_stream_function(req: Request) -> StreamingResponse:
return StreamingResponse(generate_hello_stream(), media_type="text/event-stream")
他機能との併用制限
Fast-APIを使用するので、function_app.py内に通常のHTTPトリガー関数と併用して実装することはできません。
FunctionLoadError: cannot load the http_start function:
'req' binding type "httpTrigger" ... do not match ...
そのため、Durable Functionsと、Fast-APIの併用も実装することはできないため、Functions自体を分離する必要があります。
参考
おわりに
SSEのサポートでPython Functionsのリアルタイム配信がぐっとやりやすくなりました。
ただし、環境変数の設定忘れと併用不可はハマりやすいポイントなので要注意です。