現代のPython開発では、Dockerを活用した環境構築とデプロイが必須のスキルとなっています。本記事では、FastAPIを使用したマイクロサービスの開発から本格的なCI/CDパイプラインの構築まで、実際のプロジェクトで活用できる実践的な技術を解説します。
プロジェクト構成
今回構築するのは、ユーザー管理APIサービスです:
python-microservice/
├── app/
│ ├── __init__.py
│ ├── main.py
│ ├── models/
│ │ ├── __init__.py
│ │ └── user.py
│ ├── routers/
│ │ ├── __init__.py
│ │ └── users.py
│ ├── services/
│ │ ├── __init__.py
│ │ └── user_service.py
│ └── database/
│ ├── __init__.py
│ └── connection.py
├── tests/
│ ├── __init__.py
│ ├── test_users.py
│ └── conftest.py
├── Dockerfile
├── docker-compose.yml
├── requirements.txt
├── .env.example
└── .github/workflows/ci-cd.yml
FastAPI アプリケーションの実装
メインアプリケーション
# app/main.py
from fastapi import FastAPI, Depends
from fastapi.middleware.cors import CORSMiddleware
from contextlib import asynccontextmanager
import logging
import uvicorn
from app.routers import users
from app.database.connection import create_tables, close_db_connection
# ログ設定
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
@asynccontextmanager
async def lifespan(app: FastAPI):
# 起動時の処理
logger.info("Starting up...")
await create_tables()
yield
# シャットダウン時の処理
logger.info("Shutting down...")
await close_db_connection()
app = FastAPI(
title="User Management API",
description="Python + Docker マイクロサービス実践例",
version="1.0.0",
lifespan=lifespan
)
# CORS設定
app.add_middleware(
CORSMiddleware,
allow_origins=["*"],
allow_credentials=True,
allow_methods=["*"],
allow_headers=["*"],
)
# ルーター登録
app.include_router(users.router, prefix="/api/v1", tags=["users"])
@app.get("/health")
async def health_check():
return {"status": "healthy", "version": "1.0.0"}
if __name__ == "__main__":
uvicorn.run(
"app.main:app",
host="0.0.0.0",
port=8000,
reload=True
)
ユーザーモデル
# app/models/user.py
from pydantic import BaseModel, EmailStr, validator
from datetime import datetime
from typing import Optional
from enum import Enum
class UserStatus(str, Enum):
ACTIVE = "active"
INACTIVE = "inactive"
SUSPENDED = "suspended"
class UserBase(BaseModel):
username: str
email: EmailStr
full_name: str
status: UserStatus = UserStatus.ACTIVE
@validator('username')
def validate_username(cls, v):
if len(v) < 3:
raise ValueError('ユーザー名は3文字以上である必要があります')
if not v.isalnum():
raise ValueError('ユーザー名は英数字のみ使用できます')
return v.lower()
class UserCreate(UserBase):
password: str
@validator('password')
def validate_password(cls, v):
if len(v) < 8:
raise ValueError('パスワードは8文字以上である必要があります')
return v
class UserUpdate(BaseModel):
full_name: Optional[str] = None
email: Optional[EmailStr] = None
status: Optional[UserStatus] = None
class UserResponse(UserBase):
id: int
created_at: datetime
updated_at: Optional[datetime] = None
class Config:
from_attributes = True
データベース接続
# app/database/connection.py
import asyncpg
import os
from typing import Optional
class DatabaseConnection:
def __init__(self):
self.pool: Optional[asyncpg.Pool] = None
async def connect(self):
database_url = os.getenv(
"DATABASE_URL",
"postgresql://user:password@postgres:5432/userdb"
)
self.pool = await asyncpg.create_pool(database_url)
return self.pool
async def disconnect(self):
if self.pool:
await self.pool.close()
# グローバルインスタンス
db = DatabaseConnection()
async def get_db_pool():
if not db.pool:
await db.connect()
return db.pool
async def create_tables():
pool = await get_db_pool()
async with pool.acquire() as connection:
await connection.execute('''
CREATE TABLE IF NOT EXISTS users (
id SERIAL PRIMARY KEY,
username VARCHAR(50) UNIQUE NOT NULL,
email VARCHAR(100) UNIQUE NOT NULL,
full_name VARCHAR(100) NOT NULL,
password_hash VARCHAR(255) NOT NULL,
status VARCHAR(20) DEFAULT 'active',
created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
updated_at TIMESTAMP
)
''')
async def close_db_connection():
await db.disconnect()
ユーザーサービス
# app/services/user_service.py
import asyncpg
import bcrypt
from typing import List, Optional
from app.models.user import UserCreate, UserUpdate, UserResponse, UserStatus
from app.database.connection import get_db_pool
class UserService:
@staticmethod
async def create_user(user_data: UserCreate) -> UserResponse:
pool = await get_db_pool()
# パスワードハッシュ化
password_hash = bcrypt.hashpw(
user_data.password.encode('utf-8'),
bcrypt.gensalt()
).decode('utf-8')
async with pool.acquire() as connection:
try:
user_id = await connection.fetchval('''
INSERT INTO users (username, email, full_name, password_hash, status)
VALUES ($1, $2, $3, $4, $5)
RETURNING id
''', user_data.username, user_data.email, user_data.full_name,
password_hash, user_data.status.value)
return await UserService.get_user_by_id(user_id)
except asyncpg.UniqueViolationError as e:
if 'username' in str(e):
raise ValueError("このユーザー名は既に使用されています")
elif 'email' in str(e):
raise ValueError("このメールアドレスは既に使用されています")
raise e
@staticmethod
async def get_user_by_id(user_id: int) -> Optional[UserResponse]:
pool = await get_db_pool()
async with pool.acquire() as connection:
row = await connection.fetchrow(
'SELECT * FROM users WHERE id = $1', user_id
)
if row:
return UserResponse(**dict(row))
return None
@staticmethod
async def get_users(skip: int = 0, limit: int = 100) -> List[UserResponse]:
pool = await get_db_pool()
async with pool.acquire() as connection:
rows = await connection.fetch('''
SELECT * FROM users
ORDER BY created_at DESC
LIMIT $1 OFFSET $2
''', limit, skip)
return [UserResponse(**dict(row)) for row in rows]
@staticmethod
async def update_user(user_id: int, user_data: UserUpdate) -> Optional[UserResponse]:
pool = await get_db_pool()
# 更新フィールドを動的に構築
update_fields = []
values = []
param_count = 1
for field, value in user_data.dict(exclude_unset=True).items():
update_fields.append(f"{field} = ${param_count}")
values.append(value.value if isinstance(value, UserStatus) else value)
param_count += 1
if not update_fields:
return await UserService.get_user_by_id(user_id)
update_fields.append(f"updated_at = CURRENT_TIMESTAMP")
values.append(user_id)
query = f'''
UPDATE users
SET {", ".join(update_fields)}
WHERE id = ${param_count}
RETURNING id
'''
async with pool.acquire() as connection:
updated_id = await connection.fetchval(query, *values)
if updated_id:
return await UserService.get_user_by_id(updated_id)
return None
@staticmethod
async def delete_user(user_id: int) -> bool:
pool = await get_db_pool()
async with pool.acquire() as connection:
result = await connection.execute(
'DELETE FROM users WHERE id = $1', user_id
)
return result == "DELETE 1"
API エンドポイント
# app/routers/users.py
from fastapi import APIRouter, HTTPException, Query, Path
from typing import List
import logging
from app.models.user import UserCreate, UserUpdate, UserResponse
from app.services.user_service import UserService
router = APIRouter()
logger = logging.getLogger(__name__)
@router.post("/users", response_model=UserResponse, status_code=201)
async def create_user(user_data: UserCreate):
try:
user = await UserService.create_user(user_data)
logger.info(f"Created user: {user.username}")
return user
except ValueError as e:
raise HTTPException(status_code=400, detail=str(e))
except Exception as e:
logger.error(f"Error creating user: {str(e)}")
raise HTTPException(status_code=500, detail="ユーザー作成に失敗しました")
@router.get("/users", response_model=List[UserResponse])
async def get_users(
skip: int = Query(0, ge=0, description="スキップするレコード数"),
limit: int = Query(100, ge=1, le=1000, description="取得するレコード数")
):
try:
users = await UserService.get_users(skip=skip, limit=limit)
return users
except Exception as e:
logger.error(f"Error fetching users: {str(e)}")
raise HTTPException(status_code=500, detail="ユーザー一覧の取得に失敗しました")
@router.get("/users/{user_id}", response_model=UserResponse)
async def get_user(
user_id: int = Path(..., gt=0, description="ユーザーID")
):
try:
user = await UserService.get_user_by_id(user_id)
if not user:
raise HTTPException(status_code=404, detail="ユーザーが見つかりません")
return user
except HTTPException:
raise
except Exception as e:
logger.error(f"Error fetching user {user_id}: {str(e)}")
raise HTTPException(status_code=500, detail="ユーザー取得に失敗しました")
@router.put("/users/{user_id}", response_model=UserResponse)
async def update_user(
user_data: UserUpdate,
user_id: int = Path(..., gt=0, description="ユーザーID")
):
try:
user = await UserService.update_user(user_id, user_data)
if not user:
raise HTTPException(status_code=404, detail="ユーザーが見つかりません")
logger.info(f"Updated user: {user.username}")
return user
except HTTPException:
raise
except Exception as e:
logger.error(f"Error updating user {user_id}: {str(e)}")
raise HTTPException(status_code=500, detail="ユーザー更新に失敗しました")
@router.delete("/users/{user_id}", status_code=204)
async def delete_user(
user_id: int = Path(..., gt=0, description="ユーザーID")
):
try:
success = await UserService.delete_user(user_id)
if not success:
raise HTTPException(status_code=404, detail="ユーザーが見つかりません")
logger.info(f"Deleted user ID: {user_id}")
except HTTPException:
raise
except Exception as e:
logger.error(f"Error deleting user {user_id}: {str(e)}")
raise HTTPException(status_code=500, detail="ユーザー削除に失敗しました")
Docker設定
Dockerfile(マルチステージビルド)
# Dockerfile
FROM python:3.11-slim as builder
# システム依存関係のインストール
RUN apt-get update && apt-get install -y \
gcc \
&& rm -rf /var/lib/apt/lists/*
# Python依存関係のインストール
COPY requirements.txt .
RUN pip install --no-cache-dir --user -r requirements.txt
# プロダクション用イメージ
FROM python:3.11-slim
# システム依存関係のインストール(最小限)
RUN apt-get update && apt-get install -y \
&& rm -rf /var/lib/apt/lists/*
# ユーザー作成(セキュリティ強化)
RUN groupadd -r appuser && useradd -r -g appuser appuser
# Python パッケージのコピー
COPY --from=builder /root/.local /home/appuser/.local
# アプリケーションディレクトリの作成
WORKDIR /app
# アプリケーションコードのコピー
COPY --chown=appuser:appuser . .
# パスの設定
ENV PATH=/home/appuser/.local/bin:$PATH
ENV PYTHONPATH=/app
# ユーザー切り替え
USER appuser
# ヘルスチェック
HEALTHCHECK --interval=30s --timeout=3s --start-period=5s --retries=3 \
CMD python -c "import requests; requests.get('http://localhost:8000/health')"
# ポート公開
EXPOSE 8000
# アプリケーション起動
CMD ["uvicorn", "app.main:app", "--host", "0.0.0.0", "--port", "8000"]
Docker Compose 設定
# docker-compose.yml
version: '3.8'
services:
api:
build:
context: .
dockerfile: Dockerfile
ports:
- "8000:8000"
environment:
- DATABASE_URL=postgresql://postgres:password@postgres:5432/userdb
- ENVIRONMENT=development
depends_on:
postgres:
condition: service_healthy
volumes:
- ./app:/app/app # 開発時のホットリロード用
networks:
- app-network
restart: unless-stopped
postgres:
image: postgres:15-alpine
environment:
- POSTGRES_DB=userdb
- POSTGRES_USER=postgres
- POSTGRES_PASSWORD=password
ports:
- "5432:5432"
volumes:
- postgres_data:/var/lib/postgresql/data
- ./init.sql:/docker-entrypoint-initdb.d/init.sql
healthcheck:
test: ["CMD-SHELL", "pg_isready -U postgres"]
interval: 10s
timeout: 5s
retries: 5
networks:
- app-network
restart: unless-stopped
redis:
image: redis:7-alpine
ports:
- "6379:6379"
volumes:
- redis_data:/data
command: redis-server --appendonly yes
healthcheck:
test: ["CMD", "redis-cli", "ping"]
interval: 10s
timeout: 3s
retries: 5
networks:
- app-network
restart: unless-stopped
nginx:
image: nginx:alpine
ports:
- "80:80"
- "443:443"
volumes:
- ./nginx.conf:/etc/nginx/nginx.conf
- ./ssl:/etc/nginx/ssl
depends_on:
- api
networks:
- app-network
restart: unless-stopped
volumes:
postgres_data:
redis_data:
networks:
app-network:
driver: bridge
テスト実装
pytest設定
# tests/conftest.py
import pytest
import asyncio
import asyncpg
from httpx import AsyncClient
from app.main import app
from app.database.connection import db
@pytest.fixture(scope="session")
def event_loop():
loop = asyncio.get_event_loop_policy().new_event_loop()
yield loop
loop.close()
@pytest.fixture(scope="session")
async def test_db():
# テスト用データベース接続
test_db_url = "postgresql://postgres:password@localhost:5433/testdb"
pool = await asyncpg.create_pool(test_db_url)
# テーブル作成
async with pool.acquire() as connection:
await connection.execute('''
DROP TABLE IF EXISTS users;
CREATE TABLE users (
id SERIAL PRIMARY KEY,
username VARCHAR(50) UNIQUE NOT NULL,
email VARCHAR(100) UNIQUE NOT NULL,
full_name VARCHAR(100) NOT NULL,
password_hash VARCHAR(255) NOT NULL,
status VARCHAR(20) DEFAULT 'active',
created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
updated_at TIMESTAMP
);
''')
# テスト用プールを設定
db.pool = pool
yield pool
await pool.close()
@pytest.fixture
async def client(test_db):
async with AsyncClient(app=app, base_url="http://test") as ac:
yield ac
API テスト
# tests/test_users.py
import pytest
from httpx import AsyncClient
@pytest.mark.asyncio
async def test_create_user(client: AsyncClient):
user_data = {
"username": "testuser",
"email": "test@example.com",
"full_name": "テスト ユーザー",
"password": "password123",
"status": "active"
}
response = await client.post("/api/v1/users", json=user_data)
assert response.status_code == 201
data = response.json()
assert data["username"] == "testuser"
assert data["email"] == "test@example.com"
assert "password" not in data # パスワードが含まれていないことを確認
@pytest.mark.asyncio
async def test_get_users(client: AsyncClient):
response = await client.get("/api/v1/users")
assert response.status_code == 200
data = response.json()
assert isinstance(data, list)
@pytest.mark.asyncio
async def test_get_user_by_id(client: AsyncClient):
# まずユーザーを作成
user_data = {
"username": "getuser",
"email": "get@example.com",
"full_name": "取得 ユーザー",
"password": "password123"
}
create_response = await client.post("/api/v1/users", json=user_data)
user_id = create_response.json()["id"]
# 作成したユーザーを取得
response = await client.get(f"/api/v1/users/{user_id}")
assert response.status_code == 200
data = response.json()
assert data["username"] == "getuser"
@pytest.mark.asyncio
async def test_update_user(client: AsyncClient):
# ユーザー作成
user_data = {
"username": "updateuser",
"email": "update@example.com",
"full_name": "更新 ユーザー",
"password": "password123"
}
create_response = await client.post("/api/v1/users", json=user_data)
user_id = create_response.json()["id"]
# ユーザー更新
update_data = {
"full_name": "更新済み ユーザー",
"status": "inactive"
}
response = await client.put(f"/api/v1/users/{user_id}", json=update_data)
assert response.status_code == 200
data = response.json()
assert data["full_name"] == "更新済み ユーザー"
assert data["status"] == "inactive"
@pytest.mark.asyncio
async def test_delete_user(client: AsyncClient):
# ユーザー作成
user_data = {
"username": "deleteuser",
"email": "delete@example.com",
"full_name": "削除 ユーザー",
"password": "password123"
}
create_response = await client.post("/api/v1/users", json=user_data)
user_id = create_response.json()["id"]
# ユーザー削除
response = await client.delete(f"/api/v1/users/{user_id}")
assert response.status_code == 204
# 削除されたことを確認
get_response = await client.get(f"/api/v1/users/{user_id}")
assert get_response.status_code == 404
@pytest.mark.asyncio
async def test_duplicate_username(client: AsyncClient):
user_data = {
"username": "duplicate",
"email": "first@example.com",
"full_name": "最初のユーザー",
"password": "password123"
}
# 最初のユーザー作成
response1 = await client.post("/api/v1/users", json=user_data)
assert response1.status_code == 201
# 同じユーザー名で再度作成を試行
user_data["email"] = "second@example.com"
response2 = await client.post("/api/v1/users", json=user_data)
assert response2.status_code == 400
assert "既に使用されています" in response2.json()["detail"]
CI/CD パイプライン
GitHub Actions 設定
# .github/workflows/ci-cd.yml
name: CI/CD Pipeline
on:
push:
branches: [ main, develop ]
pull_request:
branches: [ main ]
env:
REGISTRY: ghcr.io
IMAGE_NAME: ${{ github.repository }}
jobs:
test:
runs-on: ubuntu-latest
services:
postgres:
image: postgres:15
env:
POSTGRES_PASSWORD: password
POSTGRES_DB: testdb
ports:
- 5433:5432
options: >-
--health-cmd pg_isready
--health-interval 10s
--health-timeout 5s
--health-retries 5
steps:
- uses: actions/checkout@v4
- name: Set up Python
uses: actions/setup-python@v4
with:
python-version: '3.11'
cache: 'pip'
- name: Install dependencies
run: |
python -m pip install --upgrade pip
pip install -r requirements.txt
pip install pytest pytest-asyncio httpx
- name: Run tests
env:
DATABASE_URL: postgresql://postgres:password@localhost:5433/testdb
run: |
pytest tests/ -v --tb=short
- name: Run linting
run: |
pip install flake8 black isort
flake8 app/ --max-line-length=88
black --check app/
isort --check-only app/
build:
needs: test
runs-on: ubuntu-latest
steps:
- uses: actions/checkout@v4
- name: Log in to Container Registry
uses: docker/login-action@v3
with:
registry: ${{ env.REGISTRY }}
username: ${{ github.actor }}
password: ${{ secrets.GITHUB_TOKEN }}
- name: Extract metadata
id: meta
uses: docker/metadata-action@v5
with:
images: ${{ env.REGISTRY }}/${{ env.IMAGE_NAME }}
tags: |
type=ref,event=branch
type=ref,event=pr
type=sha
type=raw,value=latest,enable={{is_default_branch}}
- name: Build and push Docker image
uses: docker/build-push-action@v5
with:
context: .
push: true
tags: ${{ steps.meta.outputs.tags }}
labels: ${{ steps.meta.outputs.labels }}
cache-from: type=gha
cache-to: type=gha,mode=max
deploy:
if: github.ref == 'refs/heads/main'
needs: build
runs-on: ubuntu-latest
environment: production
steps:
- name: Deploy to production
run: |
echo "Production deployment would happen here"
# kubectl apply -f k8s/
# or docker-compose up -d on VPS
# or API call to cloud provider
実行とデプロイ
ローカル開発環境の起動
# 環境変数の設定
cp .env.example .env
# Docker Compose でサービス起動
docker-compose up -d
# ログの確認
docker-compose logs -f api
# API テスト
curl -X POST http://localhost:8000/api/v1/users \
-H "Content-Type: application/json" \
-d '{
"username": "testuser",
"email": "test@example.com",
"full_name": "テスト ユーザー",
"password": "password123"
}'
本番環境デプロイ
# イメージビルド
docker build -t user-api:latest .
# 本番環境での起動
docker run -d \
--name user-api \
-p 8000:8000 \
-e DATABASE_URL="postgresql://user:pass@db:5432/userdb" \
-e ENVIRONMENT="production" \
--restart unless-stopped \
user-api:latest
まとめ
PythonとDockerを組み合わせることで、以下のメリットを実現できます:
- 環境の一貫性: 開発からプロダクションまで同じ環境
- スケーラビリティ: コンテナオーケストレーションによる水平スケーリング
- メンテナンス性: マイクロサービスアーキテクチャによる独立したデプロイ
- CI/CD統合: 自動化されたテストとデプロイメント
このアーキテクチャをベースに、認証機能やキャッシュ機能、ログ集約システムなどを追加することで、より本格的なプロダクション環境を構築できます。
参考文献
- FastAPI公式ドキュメント - FastAPIの包括的な公式ガイド
- Docker公式ドキュメント - Dockerfile ベストプラクティス - Dockerコンテナ構築のベストプラクティス
- PostgreSQL公式ドキュメント - asyncpg - Python用高性能非同期PostgreSQLクライアント
- GitHub Actions公式ドキュメント - CI/CDパイプライン構築ガイド
- Pydantic公式ドキュメント - Pythonデータ検証ライブラリの詳細仕様