こんにちは。エンジニアのnobushiです。
以前PythonフレームワークのFastAPIを紹介しました。
今回 FastAPI で Relational Database を使ってみたいと思います。
使用するORMライブラリはSQLAlchemyです。
SQLAlchemy はPythonのORMライブラリの中では最もポピュラーなものの一つです。
Python Software Foundationの開発者アンケートではTOPに位置付けられています。
出典:Python Software Foundation、JetBrains「2020 年度 Python 開発者アンケートの結果」
https://www.jetbrains.com/ja-jp/lp/python-developers-survey-2020/
導入
以前のFastAPIのブログ
をベースに構築します。
今回はDBのコンテナとアプリのコンテナの複数のコンテナで稼働させるため、 docker-compose を使用します。
docker-compose が使える環境で、任意のディレクトリに以下の4ファイルを配置してください。
/
docker-compose.yml
Dockerfile
pyproject.toml
main.py
docker-compose.yml
version: '3.5'
services:
db:
image: mysql:5.7
command: >
--default-authentication-plugin=mysql_native_password
--character-set-server=utf8mb4
--collation-server=utf8mb4_bin
environment:
MYSQL_ROOT_PASSWORD: secret
MYSQL_DATABASE: app
MYSQL_USER: johndoe
MYSQL_PASSWORD: secret
ports:
- 3306:3306
restart: always
app:
build:
context: .
depends_on:
- db
ports:
- 80:80
restart: always
mysqlのコンテナとアプリのコンテナを定義しています。
Dockerfile
FROM python:3.10-alpine3.15
WORKDIR /app
CMD ["hypercorn", "main:app", "--bind", "0.0.0.0:80", "--access-logfile", "-", "--error-logfile", "-"]
ENV PYTHONPYCACHEPREFIX=/var/cache/python
ENV PYTHONPATH=/app
ENV SQLALCHEMY_WARN_20=1
EXPOSE 80
ARG POETRY_VERSION=1.1.12
# Install Poetry
RUN apk update --no-cache &&\
apk add --no-cache curl autoconf g++ libtool make libffi-dev &&\
curl -sSL https://raw.githubusercontent.com/python-poetry/poetry/master/install-poetry.py |\
POETRY_HOME=/opt/poetry POETRY_VERSION=${POETRY_VERSION} python &&\
cd /usr/local/bin &&\
ln -s /opt/poetry/bin/poetry &&\
poetry config virtualenvs.create false &&\
apk del --no-cache curl autoconf g++ libtool make &&\
rm -rf /tmp/*
# Install Libraries
COPY ./pyproject.toml ./poetry.lock* /app/
RUN apk update --no-cache &&\
apk add --no-cache autoconf g++ libtool make &&\
poetry install --no-root &&\
apk del --no-cache autoconf g++ libtool make &&\
rm -rf /tmp/*
# Uninstall Poetry
RUN apk update --no-cache &&\
apk add --no-cache curl autoconf g++ libtool make libffi-dev &&\
unlink /usr/local/bin/poetry &&\
curl -sSL https://raw.githubusercontent.com/python-poetry/poetry/master/install-poetry.py |\
POETRY_HOME=/opt/poetry POETRY_VERSION=${POETRY_VERSION} python - --uninstall &&\
apk del --no-cache curl autoconf g++ libtool make &&\
rm -rf /tmp/*
COPY . /app
以前のFastAPIのブログのサンプル
と同じくpythonのコンテナをベースにして、FastAPIも含め、必要なライブラリはPoetryでインストールするようにしています。
さらに、今回SQLAlchemy用の環境変数として、
ENV SQLALCHEMY_WARN_20=1
を追加しています。
この環境変数はSQLAlchemyの次バージョンリリースで廃止になるようなコードに対してWarningを発生させるためのものです。
使えなくなるようなコードを書くのを防ぐために設定しておいた方が良いと思います。
pyproject.toml
[tool.poetry]
name = "fastapi-sample"
version = "0.1.0"
description = ""
authors = []
[tool.poetry.dependencies]
python = "3.10.*"
fastapi = "0.73.*"
Hypercorn = "0.13.*"
PyMySQL = "1.0.*"
python-multipart = "0.0.*"
SQLAlchemy = "1.4.*"
[tool.poetry.dev-dependencies]
[build-system]
requires = ["poetry-core=1.1.*"]
build-backend = "poetry.core.masonry.api"
以前のブログと同じ FastAPI と Hypercorn に加え、
SQLAlchemy と、MySQLドライバーの PyMySQL 、
FastAPIでのFormデータ処理に必要な python-multipart を追加しています。
サンプルではpoetry.lockを省略していますが、実運用ではpoetry.lockも作成しておくべきです。
main.py
from functools import wraps
from typing import Any
from typing import Callable
from fastapi import FastAPI
from fastapi import Form
from sqlalchemy import Column
from sqlalchemy import Integer
from sqlalchemy import String
from sqlalchemy.future import create_engine
from sqlalchemy.orm import declarative_base
from sqlalchemy.orm import scoped_session
from sqlalchemy.orm import sessionmaker
engine = create_engine("mysql+pymysql://johndoe:secret@db/app")
Base = declarative_base()
class SomeInfo(Base):
__tablename__ = "some_infos"
id = Column(Integer, primary_key=True)
desc = Column(String(255), nullable=True)
Base.metadata.create_all(engine)
ScopedSession = scoped_session(
sessionmaker(bind=engine, future=True),
)
app = FastAPI()
def entrypoint(func: Callable[..., Any]) -> Callable[..., Any]:
@wraps(func)
def _entry_point(*args: Any, **keywords: Any) -> Any:
session: Session = ScopedSession()
try:
result = func(*args, **keywords)
except Exception:
session.rollback()
raise
else:
session.commit()
finally:
ScopedSession.remove()
return result
return _entry_point
@app.post(
"/info",
)
@entrypoint
def post_info(
desc: str = Form(...),
) -> dict:
session: Session = ScopedSession()
with session.begin_nested():
some_info = SomeInfo(
desc=desc,
)
session.add(some_info)
session.flush()
return {
"id": some_info.id,
"desc": some_info.desc,
}
以下の部分でmysqlへ接続しています。
engine = create_engine("mysql+pymysql://johndoe:secret@db/app")
引数のURIは以下の形式となっています。
{db-type}+{db-api}://{user}:{password}@{host}/{database}
今回はmysqlにpymysqlドライバーを使って接続するので上記の文字列になります。
ユーザー、パスワード、ホスト名、データベースは
docker-compose.yml
でmysqlコンテナに設定した内容です。
次にModelを定義して、テーブルを作成しています。
Base = declarative_base()
class SomeInfo(Base):
__tablename__ = "some_infos"
id = Column(Integer, primary_key=True)
desc = Column(String(255), nullable=True)
Base.metadata.create_all(engine)
通常マイグレーションツール等使うかと思いますが、それは別の機会に。
次にSession定義です。
ScopedSession = scoped_session(
sessionmaker(bind=engine, future=True),
)
Sessionは、SQLAlchemyにおけるデータベースとの接続コンテキストのようなものです。
アプリケーションはSessionに対してModelの登録等の操作、またトランザクションの操作を行います。Sessionを生成している関数が sessionmaker()
です。future=True
は今後のアップデートで使えなくなる使用法でエラーを出すために設定しています。
上記コードでは生成したSessionをベースに scoped_session()
でScopedSessionを生成しています。ScopedSessionは実行スレッドに紐づけられた共有のSessionです。
アプリケーション内のどこからでも ScopedSession()
で共有のSessionを取得することができるようになります。
一つのアプリケーション内で複数のSessionが必要なケースはあまり無いと思われるので通常このパターンで良いと思います。
次はそのScopedSessionに関するトランザクション定義です。
def entrypoint(func: Callable[..., Any]) -> Callable[..., Any]:
@wraps(func)
def _entry_point(*args: Any, **keywords: Any) -> Any:
session: Session = ScopedSession()
try:
result = func(*args, **keywords)
except Exception:
session.rollback()
raise
else:
session.commit()
finally:
ScopedSession.remove()
return result
return _entry_point
これはSQLAlchemyで必ず必要という構造では無いんですが、こういう風にしてみました、という話です。
サーバーの処理は all-or-nothing が基本だと思いますので、
トランザクション管理はWebのエントリーポイントで握っておきたいところです。
そこで、FastAPIの全エントリーポイント(コントローラ)をこのデコレータでラップすることを前提として、
例外が発生した場合は全てをロールバックするようにしています。
その代わり、ここ以外(この内部)のコードで扱うトランザクションはMySQLの SAVEPOINT
に相当する begin_nested()
を使い、
入れ子構造とします。
また、ScopedSessionの破棄 ScopedSession.remove()
もここで行います。
ちなみにFastAPIに詳しい方なら、デコレータではなく
Global Dependencies
を使えば良いのでは?と思われたかもしれません。
しかし、これには明確にダメな理由があります。その理由は 後述 します。
次はコントローラ本体の定義です。
@app.post(
"/info",
)
@entrypoint
def post_info(
desc: str = Form(...),
) -> dict:
session: Session = ScopedSession()
with session.begin_nested():
some_info = SomeInfo(
desc=desc,
)
session.add(some_info)
session.flush()
return {
"id": some_info.id,
"desc": some_info.desc,
}
前述の entrypoint
でラップしています。
また、これも前述の ScopedSession()
で共有のSessionを取り出していて、begin_nested()
で入れ子のトランザクションを生成しています。session.add()
でModelを追加していますが、その後に session.flush()
を呼び出しています。
SQLが実際に発行されるのは session.flush()
のタイミングです。
このケースではレスポンスで id
を返していますので、これを得るためには session.flush()
の実行が必要です。
(実行する前は None
)
実行
実行してみましょう。
> docker-compose build
> docker-compose up -d
> curl -X POST -d 'desc=hoge' http://localhost/info
{"id":1,"desc":"hoge"}
DBに登録されました。
* mysqlの起動に少し時間がかかるのでcurlを行うのはしばらく待ってからの方が良さそうです。
所感
後述するスレッドに気を付ければ特に利用に問題はないと思います。
ただ、SQLAlchemy自体が今2.0へのバージョンアップの途中にあり、
1.Xとは仕様が変わってしまうのでその点は気をつけたいところです。
スレッドで苦労した話
前述の entrypoint
はデコレータで実装していますが、当初は
Global Dependencies
を使って実装していました。
その方がわざわざエントリーポイントで @entrypoint
と記述する必要もなく、
強制的に呼び出されることになるので好都合です。
しかし、そこに大きな落とし穴がありました。
問題はFastAPIの以下の仕様です。
The same applies for dependencies. If a dependency is a standard def function instead of async def, it is run in the external threadpool.
https://fastapi.tiangolo.com/async/#dependenciesDepends()
で指定した関数がasyncではない通常の関数の場合、その関数はthreadpoolから取得したスレッド上で実行されます。
一方、SQLAlchemyはasyncioに対応していません( ベータレベルでの対応 はあるようです)。そのため、 entrypoint
はasyncでは無い通常の関数であり、threadpoolから取得したスレッドで実行されることになります。
ここで困るのが、Sessionの問題です。
Sessionはそれ自体はマルチスレッドでの呼び出しには対応せず、その代わりにScopedSessionでスレッド毎に別のSessionを使用するアプローチが取られています。つまりSessionはスレッド毎に独立しています。
https://docs.sqlalchemy.org/en/14/orm/contextual.html#thread-local-scope
では、Global Dependenciesを使って実装した場合にどうなるか調査してみます。
コードを以下のように変更しました。
def _entrypoint(
request: Request,
) -> Generator[None, None, None]:
request.state.id = uuid.uuid4().hex
session: Session = ScopedSession()
try:
print(f"before yield: id:{request.state.id}: thid:{threading.get_ident()}")
yield
print(f"after yield: id:{request.state.id}: thid:{threading.get_ident()}")
except Exception:
session.rollback()
raise
else:
session.commit()
finally:
ScopedSession.remove()
app = FastAPI(dependencies=[Depends(_entrypoint)])
@app.post(
"/info",
)
def post_info(
request: Request,
desc: str = Form(...),
) -> dict:
print(f"post_info: id:{request.state.id}: thid:{threading.get_ident()}")
session: Session = ScopedSession()
with session.begin_nested():
some_info = SomeInfo(
desc=desc,
)
session.add(some_info)
session.flush()
return {
"id": some_info.id,
"desc": some_info.desc,
}
entrypoint
をGlobal Dependenciesで行うように変更しています。
また、それぞれのコードがどのスレッドで実行されたか分かるようにスレッドIDを出力するようにしました。
その結果、通常単独で実行する分には問題ないのですが、複数リクエストを同時に呼び出すようなことをすると、以下のようなログが出ます。
before yield: id:f390dfb6cb3a44cb95de2dab2fff785c: thid:139675703495480
post_info: id:f390dfb6cb3a44cb95de2dab2fff785c: thid:139675704556344
after yield: id:f390dfb6cb3a44cb95de2dab2fff785c: thid:139675705617208
それぞれのコードが別スレッドで実行されてしまっています。
またデータベースを確認するとデータは保存されていません。Depends()
を使用する場合はスレッドに気を付けないと思わぬところで足を掬われることになるので気を付けましょう。