feat(mcp): 5 outils todos + tests

Ajoute mcp_server.py avec get_todos, create_todo, update_todo, postpone_todo, delete_todo.
Ajoute test_mcp.py (7 tests). Corrige conftest pour injecter NullPool dans AsyncSessionLocal des outils MCP (évite les conflits d'event loop entre tests).

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
This commit is contained in:
2026-05-25 22:58:20 +02:00
parent 24598c836b
commit 05db49f27a
3 changed files with 250 additions and 0 deletions
+180
View File
@@ -0,0 +1,180 @@
import json
import uuid
from datetime import datetime, timedelta, timezone, date as date_type
from decimal import Decimal
from mcp.server.fastmcp import FastMCP
from sqlalchemy import select, and_
from app.core.database import AsyncSessionLocal
from app.models.todos import TodoItem
from app.models.notes import NoteItem
from app.models.shopping import ShoppingList, ListItem, Product
mcp = FastMCP("HomeHub", stateless_http=True)
def _serialize(obj):
if isinstance(obj, uuid.UUID):
return str(obj)
if isinstance(obj, datetime):
return obj.isoformat()
if isinstance(obj, date_type):
return obj.isoformat()
if isinstance(obj, Decimal):
return float(obj)
raise TypeError(f"Type non sérialisable : {type(obj)}")
def _dumps(data) -> str:
return json.dumps(data, default=_serialize)
def _iso_week_label() -> str:
now = datetime.now(tz=timezone.utc)
iso = now.isocalendar()
return f"S{iso[1]} {iso[0]}"
# ── TODOS ──────────────────────────────────────────────────────────────────────
@mcp.tool()
async def get_todos(
status: str = "pending",
domain: str | None = None,
priority: str | None = None,
) -> str:
"""Liste filtrée des tâches. status: pending/done/cancelled. priority: low/medium/high."""
async with AsyncSessionLocal() as session:
conditions = [TodoItem.status == status]
if domain:
conditions.append(TodoItem.domains.contains([domain]))
if priority:
conditions.append(TodoItem.priority == priority)
stmt = (
select(TodoItem)
.where(and_(*conditions))
.order_by(TodoItem.due_date.asc().nulls_last(), TodoItem.created_at.desc())
.limit(100)
)
result = await session.execute(stmt)
items = result.scalars().all()
return _dumps([{
"id": item.id,
"title": item.title,
"status": item.status,
"priority": item.priority,
"domains": item.domains,
"tags": item.tags,
"due_date": item.due_date,
"postponed_count": item.postponed_count,
} for item in items])
@mcp.tool()
async def create_todo(
title: str,
due_date: str | None = None,
priority: str | None = None,
domain: str | None = None,
) -> str:
"""Crée une tâche. due_date en ISO 8601 (ex: 2025-06-01T10:00:00Z). priority: low/medium/high."""
async with AsyncSessionLocal() as session:
parsed_due = None
if due_date:
try:
parsed_due = datetime.fromisoformat(due_date.replace("Z", "+00:00"))
except ValueError:
return _dumps({"error": f"Format de date invalide : {due_date}"})
item = TodoItem(
title=title,
due_date=parsed_due,
priority=priority or "medium",
domains=[domain] if domain else [],
)
session.add(item)
await session.commit()
await session.refresh(item)
return _dumps({
"id": item.id,
"title": item.title,
"status": item.status,
"priority": item.priority,
"domains": item.domains,
"due_date": item.due_date,
})
@mcp.tool()
async def update_todo(
id: str,
title: str | None = None,
status: str | None = None,
priority: str | None = None,
) -> str:
"""Modifie une tâche. status: pending/done/cancelled. priority: low/medium/high."""
async with AsyncSessionLocal() as session:
try:
todo_id = uuid.UUID(id)
except ValueError:
return _dumps({"error": f"UUID invalide : {id}"})
item = await session.get(TodoItem, todo_id)
if not item:
return _dumps({"error": f"Tâche introuvable : {id}"})
if title is not None:
item.title = title
if status is not None:
item.status = status
if priority is not None:
item.priority = priority
item.updated_at = datetime.now(timezone.utc)
await session.commit()
await session.refresh(item)
return _dumps({
"id": item.id,
"title": item.title,
"status": item.status,
"priority": item.priority,
})
@mcp.tool()
async def postpone_todo(id: str, days: int) -> str:
"""Reporte une tâche de N jours à partir de sa date d'échéance (ou maintenant si nulle)."""
async with AsyncSessionLocal() as session:
try:
todo_id = uuid.UUID(id)
except ValueError:
return _dumps({"error": f"UUID invalide : {id}"})
item = await session.get(TodoItem, todo_id)
if not item:
return _dumps({"error": f"Tâche introuvable : {id}"})
now = datetime.now(timezone.utc)
base = item.due_date if item.due_date else now
item.due_date = base + timedelta(days=days)
item.postponed_count += 1
item.updated_at = now
await session.commit()
await session.refresh(item)
return _dumps({
"id": item.id,
"title": item.title,
"due_date": item.due_date,
"postponed_count": item.postponed_count,
})
@mcp.tool()
async def delete_todo(id: str) -> str:
"""Supprime une tâche définitivement."""
async with AsyncSessionLocal() as session:
try:
todo_id = uuid.UUID(id)
except ValueError:
return _dumps({"error": f"UUID invalide : {id}"})
item = await session.get(TodoItem, todo_id)
if not item:
return _dumps({"error": f"Tâche introuvable : {id}"})
await session.delete(item)
await session.commit()
return _dumps({"deleted": id})
+14
View File
@@ -5,6 +5,7 @@ from sqlalchemy.pool import NullPool
from app.main import app
from app.core import database
from app.core.config import settings
import app.api.mcp_server as mcp_server_module
def make_test_engine():
@@ -12,6 +13,19 @@ def make_test_engine():
return create_async_engine(settings.database_url, poolclass=NullPool)
@pytest.fixture(autouse=True)
async def mcp_nullpool_session():
"""Remplace AsyncSessionLocal dans mcp_server par un sessionmaker NullPool
pour éviter les conflits d'event loop entre tests."""
engine = make_test_engine()
test_session_factory = async_sessionmaker(engine, expire_on_commit=False)
original = mcp_server_module.AsyncSessionLocal
mcp_server_module.AsyncSessionLocal = test_session_factory
yield
mcp_server_module.AsyncSessionLocal = original
await engine.dispose()
@pytest.fixture
async def db_session():
engine = make_test_engine()
+56
View File
@@ -0,0 +1,56 @@
import json
import pytest
from app.api.mcp_server import (
get_todos, create_todo, update_todo, postpone_todo, delete_todo,
)
async def test_get_todos_retourne_liste_json():
result = await get_todos(status="pending")
data = json.loads(result)
assert isinstance(data, list)
async def test_create_todo_outil_cree_une_tache():
result = await create_todo(title="TEST_MCP_todo_create")
data = json.loads(result)
assert data["title"] == "TEST_MCP_todo_create"
assert data["status"] == "pending"
assert data["priority"] == "medium"
# Cleanup
await delete_todo(id=str(data["id"]))
async def test_update_todo_outil():
created = json.loads(await create_todo(title="TEST_MCP_todo_update"))
result = await update_todo(id=str(created["id"]), status="done")
data = json.loads(result)
assert data["status"] == "done"
await delete_todo(id=str(created["id"]))
async def test_postpone_todo_outil():
created = json.loads(await create_todo(title="TEST_MCP_todo_postpone"))
result = await postpone_todo(id=str(created["id"]), days=3)
data = json.loads(result)
assert data["postponed_count"] == 1
await delete_todo(id=str(created["id"]))
async def test_delete_todo_outil():
created = json.loads(await create_todo(title="TEST_MCP_todo_delete"))
result = await delete_todo(id=str(created["id"]))
data = json.loads(result)
assert "deleted" in data
async def test_update_todo_id_invalide():
result = await update_todo(id="pas-un-uuid", title="x")
data = json.loads(result)
assert "error" in data
async def test_delete_todo_introuvable():
result = await delete_todo(id="00000000-0000-0000-0000-000000000000")
data = json.loads(result)
assert "error" in data