こんにちは。エンジニアのnobushiです。
以前PythonフレームワークのFastAPIを紹介しました。

今回 FastAPI で Relational Database を使ってみたいと思います。
使用するORMライブラリはSQLAlchemyです。
SQLAlchemy はPythonのORMライブラリの中では最もポピュラーなものの一つです。
Python Software Foundationの開発者アンケートではTOPに位置付けられています。

出典:Python Software Foundation、JetBrains「2020 年度 Python 開発者アンケートの結果」
https://www.jetbrains.com/ja-jp/lp/python-developers-survey-2020/

導入

以前のFastAPIのブログ

をベースに構築します。

今回はDBのコンテナとアプリのコンテナの複数のコンテナで稼働させるため、 docker-compose を使用します。
docker-compose が使える環境で、任意のディレクトリに以下の4ファイルを配置してください。

/
  docker-compose.yml
  Dockerfile
  pyproject.toml
  main.py

docker-compose.yml

version: '3.5'
services:
  db:
    image: mysql:5.7
    command: >
      --default-authentication-plugin=mysql_native_password
      --character-set-server=utf8mb4
      --collation-server=utf8mb4_bin
    environment:
      MYSQL_ROOT_PASSWORD: secret
      MYSQL_DATABASE: app
      MYSQL_USER: johndoe
      MYSQL_PASSWORD: secret
    ports:
      - 3306:3306
    restart: always
  app:
    build:
      context: .
    depends_on:
      - db
    ports:
      - 80:80
    restart: always

mysqlのコンテナとアプリのコンテナを定義しています。

Dockerfile

FROM python:3.10-alpine3.15

WORKDIR /app

CMD ["hypercorn", "main:app", "--bind", "0.0.0.0:80", "--access-logfile", "-", "--error-logfile", "-"]

ENV PYTHONPYCACHEPREFIX=/var/cache/python
ENV PYTHONPATH=/app
ENV SQLALCHEMY_WARN_20=1

EXPOSE 80

ARG POETRY_VERSION=1.1.12

# Install Poetry
RUN apk update --no-cache &&\
    apk add --no-cache curl autoconf g++ libtool make libffi-dev &&\
    curl -sSL https://raw.githubusercontent.com/python-poetry/poetry/master/install-poetry.py |\
    POETRY_HOME=/opt/poetry POETRY_VERSION=${POETRY_VERSION} python &&\
    cd /usr/local/bin &&\
    ln -s /opt/poetry/bin/poetry &&\
    poetry config virtualenvs.create false &&\
    apk del --no-cache curl autoconf g++ libtool make &&\
    rm -rf /tmp/*

# Install Libraries
COPY ./pyproject.toml ./poetry.lock* /app/
RUN apk update --no-cache &&\
    apk add --no-cache autoconf g++ libtool make &&\
    poetry install --no-root &&\
    apk del --no-cache autoconf g++ libtool make &&\
    rm -rf /tmp/*

# Uninstall Poetry
RUN apk update --no-cache &&\
    apk add --no-cache curl autoconf g++ libtool make libffi-dev &&\
    unlink /usr/local/bin/poetry &&\
    curl -sSL https://raw.githubusercontent.com/python-poetry/poetry/master/install-poetry.py |\
    POETRY_HOME=/opt/poetry POETRY_VERSION=${POETRY_VERSION} python - --uninstall &&\
    apk del --no-cache curl autoconf g++ libtool make &&\
    rm -rf /tmp/*

COPY . /app

以前のFastAPIのブログのサンプル
と同じくpythonのコンテナをベースにして、FastAPIも含め、必要なライブラリはPoetryでインストールするようにしています。

さらに、今回SQLAlchemy用の環境変数として、

ENV SQLALCHEMY_WARN_20=1

を追加しています。

この環境変数はSQLAlchemyの次バージョンリリースで廃止になるようなコードに対してWarningを発生させるためのものです。
使えなくなるようなコードを書くのを防ぐために設定しておいた方が良いと思います。

pyproject.toml

[tool.poetry]
name = "fastapi-sample"
version = "0.1.0"
description = ""
authors = []

[tool.poetry.dependencies]
python = "3.10.*"
fastapi = "0.73.*"
Hypercorn = "0.13.*"
PyMySQL = "1.0.*"
python-multipart = "0.0.*"
SQLAlchemy = "1.4.*"

[tool.poetry.dev-dependencies]

[build-system]
requires = ["poetry-core=1.1.*"]
build-backend = "poetry.core.masonry.api"

以前のブログと同じ FastAPI と Hypercorn に加え、
SQLAlchemy と、MySQLドライバーの PyMySQL 、
FastAPIでのFormデータ処理に必要な python-multipart を追加しています。

サンプルではpoetry.lockを省略していますが、実運用ではpoetry.lockも作成しておくべきです。

main.py

from functools import wraps
from typing import Any
from typing import Callable

from fastapi import FastAPI
from fastapi import Form
from sqlalchemy import Column
from sqlalchemy import Integer
from sqlalchemy import String
from sqlalchemy.future import create_engine
from sqlalchemy.orm import declarative_base
from sqlalchemy.orm import scoped_session
from sqlalchemy.orm import sessionmaker

engine = create_engine("mysql+pymysql://johndoe:secret@db/app")

Base = declarative_base()

class SomeInfo(Base):
    __tablename__ = "some_infos"
    id = Column(Integer, primary_key=True)
    desc = Column(String(255), nullable=True)

Base.metadata.create_all(engine)

ScopedSession = scoped_session(
    sessionmaker(bind=engine, future=True),
)

app = FastAPI()

def entrypoint(func: Callable[..., Any]) -> Callable[..., Any]:
    @wraps(func)
    def _entry_point(*args: Any, **keywords: Any) -> Any:
        session: Session = ScopedSession()
        try:
            result = func(*args, **keywords)
        except Exception:
            session.rollback()
            raise
        else:
            session.commit()
        finally:
            ScopedSession.remove()
        return result
    return _entry_point

@app.post(
    "/info",
)
@entrypoint
def post_info(
    desc: str = Form(...),
) -> dict:
    session: Session = ScopedSession()
    with session.begin_nested():
        some_info = SomeInfo(
            desc=desc,
        )
        session.add(some_info)
        session.flush()
        return {
            "id": some_info.id,
            "desc": some_info.desc,
        }

以下の部分でmysqlへ接続しています。

engine = create_engine("mysql+pymysql://johndoe:secret@db/app")

引数のURIは以下の形式となっています。

{db-type}+{db-api}://{user}:{password}@{host}/{database}

今回はmysqlにpymysqlドライバーを使って接続するので上記の文字列になります。
ユーザー、パスワード、ホスト名、データベースは
docker-compose.yml
でmysqlコンテナに設定した内容です。

次にModelを定義して、テーブルを作成しています。

Base = declarative_base()

class SomeInfo(Base):
    __tablename__ = "some_infos"
    id = Column(Integer, primary_key=True)
    desc = Column(String(255), nullable=True)

Base.metadata.create_all(engine)

通常マイグレーションツール等使うかと思いますが、それは別の機会に。

次にSession定義です。

ScopedSession = scoped_session(
    sessionmaker(bind=engine, future=True),
)

Sessionは、SQLAlchemyにおけるデータベースとの接続コンテキストのようなものです。
アプリケーションはSessionに対してModelの登録等の操作、またトランザクションの操作を行います。Sessionを生成している関数が sessionmaker() です。
future=True は今後のアップデートで使えなくなる使用法でエラーを出すために設定しています。
上記コードでは生成したSessionをベースに scoped_session() でScopedSessionを生成しています。ScopedSessionは実行スレッドに紐づけられた共有のSessionです。
アプリケーション内のどこからでも ScopedSession() で共有のSessionを取得することができるようになります。
一つのアプリケーション内で複数のSessionが必要なケースはあまり無いと思われるので通常このパターンで良いと思います。

次はそのScopedSessionに関するトランザクション定義です。

def entrypoint(func: Callable[..., Any]) -> Callable[..., Any]:
    @wraps(func)
    def _entry_point(*args: Any, **keywords: Any) -> Any:
        session: Session = ScopedSession()
        try:
            result = func(*args, **keywords)
        except Exception:
            session.rollback()
            raise
        else:
            session.commit()
        finally:
            ScopedSession.remove()
        return result
    return _entry_point

これはSQLAlchemyで必ず必要という構造では無いんですが、こういう風にしてみました、という話です。
サーバーの処理は all-or-nothing が基本だと思いますので、
トランザクション管理はWebのエントリーポイントで握っておきたいところです。
そこで、FastAPIの全エントリーポイント(コントローラ)をこのデコレータでラップすることを前提として、
例外が発生した場合は全てをロールバックするようにしています。

その代わり、ここ以外(この内部)のコードで扱うトランザクションはMySQLの SAVEPOINT に相当する begin_nested() を使い、
入れ子構造とします。
また、ScopedSessionの破棄 ScopedSession.remove() もここで行います。
ちなみにFastAPIに詳しい方なら、デコレータではなく
Global Dependencies
を使えば良いのでは?と思われたかもしれません。
しかし、これには明確にダメな理由があります。その理由は 後述 します。

次はコントローラ本体の定義です。

@app.post(
    "/info",
)
@entrypoint
def post_info(
    desc: str = Form(...),
) -> dict:
    session: Session = ScopedSession()
    with session.begin_nested():
        some_info = SomeInfo(
            desc=desc,
        )
        session.add(some_info)
        session.flush()
        return {
            "id": some_info.id,
            "desc": some_info.desc,
        }

前述の entrypoint でラップしています。
また、これも前述の ScopedSession() で共有のSessionを取り出していて、
begin_nested() で入れ子のトランザクションを生成しています。
session.add() でModelを追加していますが、その後に session.flush() を呼び出しています。
SQLが実際に発行されるのは session.flush() のタイミングです。
このケースではレスポンスで id を返していますので、これを得るためには session.flush() の実行が必要です。
(実行する前は None

実行

実行してみましょう。

> docker-compose build
> docker-compose up -d
> curl -X POST -d 'desc=hoge' http://localhost/info
{"id":1,"desc":"hoge"}

DBに登録されました。
* mysqlの起動に少し時間がかかるのでcurlを行うのはしばらく待ってからの方が良さそうです。

所感

後述するスレッドに気を付ければ特に利用に問題はないと思います。
ただ、SQLAlchemy自体が今2.0へのバージョンアップの途中にあり、
1.Xとは仕様が変わってしまうのでその点は気をつけたいところです。

スレッドで苦労した話

前述の entrypoint はデコレータで実装していますが、当初は
Global Dependencies
を使って実装していました。
その方がわざわざエントリーポイントで @entrypoint と記述する必要もなく、
強制的に呼び出されることになるので好都合です。
しかし、そこに大きな落とし穴がありました。
問題はFastAPIの以下の仕様です。

The same applies for dependencies. If a dependency is a standard def function instead of async def, it is run in the external threadpool.

https://fastapi.tiangolo.com/async/#dependencies

Depends() で指定した関数がasyncではない通常の関数の場合、その関数はthreadpoolから取得したスレッド上で実行されます。
一方、SQLAlchemyはasyncioに対応していません( ベータレベルでの対応 はあるようです)。そのため、 entrypoint はasyncでは無い通常の関数であり、threadpoolから取得したスレッドで実行されることになります。
ここで困るのが、Sessionの問題です。

Sessionはそれ自体はマルチスレッドでの呼び出しには対応せず、その代わりにScopedSessionでスレッド毎に別のSessionを使用するアプローチが取られています。つまりSessionはスレッド毎に独立しています。
https://docs.sqlalchemy.org/en/14/orm/contextual.html#thread-local-scope

では、Global Dependenciesを使って実装した場合にどうなるか調査してみます。

コードを以下のように変更しました。

def _entrypoint(
    request: Request,
) -> Generator[None, None, None]:
    request.state.id = uuid.uuid4().hex
    session: Session = ScopedSession()
    try:
        print(f"before yield: id:{request.state.id}: thid:{threading.get_ident()}")
        yield
        print(f"after yield: id:{request.state.id}: thid:{threading.get_ident()}")
    except Exception:
        session.rollback()
        raise
    else:
        session.commit()
    finally:
        ScopedSession.remove()

app = FastAPI(dependencies=[Depends(_entrypoint)])

@app.post(
    "/info",
)
def post_info(
    request: Request,
    desc: str = Form(...),
) -> dict:
    print(f"post_info: id:{request.state.id}: thid:{threading.get_ident()}")
    session: Session = ScopedSession()
    with session.begin_nested():
        some_info = SomeInfo(
            desc=desc,
        )
        session.add(some_info)
        session.flush()
        return {
            "id": some_info.id,
            "desc": some_info.desc,
        }

entrypoint をGlobal Dependenciesで行うように変更しています。
また、それぞれのコードがどのスレッドで実行されたか分かるようにスレッドIDを出力するようにしました。
その結果、通常単独で実行する分には問題ないのですが、複数リクエストを同時に呼び出すようなことをすると、以下のようなログが出ます。

before yield: id:f390dfb6cb3a44cb95de2dab2fff785c: thid:139675703495480
post_info: id:f390dfb6cb3a44cb95de2dab2fff785c: thid:139675704556344
after yield: id:f390dfb6cb3a44cb95de2dab2fff785c: thid:139675705617208

それぞれのコードが別スレッドで実行されてしまっています。
またデータベースを確認するとデータは保存されていません。
Depends() を使用する場合はスレッドに気を付けないと思わぬところで足を掬われることになるので気を付けましょう。

SHARE

  • facebook
  • twitter

SQRIPTER

AGEST Engineers

AGEST

記事一覧

AGESTのエンジニアが情報発信してます!
AGESTのサービスやソリューションのお問い合わせページはこちらです。

株式会社AGEST

  • 新規登録/ログイン
  • 株式会社AGEST
#TAGS人気のタグ
RANKINGアクセスランキング
NEWS最新のニュース

Sqriptsはシステム開発における品質(Quality)を中心に、エンジニアが”理解しやすい”Scriptに変換して情報発信するメディアです

  • 新規登録/ログイン
  • 株式会社AGEST