feat(sse): sync temps réel multi-appareils via Server-Sent Events v0.5.8
- Broadcaster asyncio.Queue avec keepalive 25s (prévient timeout proxy) - Endpoint GET /api/events/stream (StreamingResponse text/event-stream) - Broadcast notes_changed / todos_changed / shopping_changed sur toutes mutations - Hook useServerEvents: EventSource avec reconnexion automatique (3s) - Pages Notes, Todos, Shopping abonnées aux événements SSE - nginx: location SSE dédiée (proxy_buffering off, timeout 24h) Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
This commit is contained in:
@@ -0,0 +1,18 @@
|
||||
from fastapi import APIRouter
|
||||
from fastapi.responses import StreamingResponse
|
||||
|
||||
from app.core.broadcaster import broadcaster
|
||||
|
||||
router = APIRouter()
|
||||
|
||||
|
||||
@router.get("/stream")
|
||||
async def event_stream():
|
||||
return StreamingResponse(
|
||||
broadcaster.subscribe(),
|
||||
media_type="text/event-stream",
|
||||
headers={
|
||||
"Cache-Control": "no-cache",
|
||||
"X-Accel-Buffering": "no",
|
||||
},
|
||||
)
|
||||
@@ -5,6 +5,7 @@ from sqlalchemy import select, text, and_
|
||||
from sqlalchemy.ext.asyncio import AsyncSession
|
||||
from sqlalchemy.orm import selectinload
|
||||
|
||||
from app.core.broadcaster import broadcaster
|
||||
from app.core.database import get_session
|
||||
from app.core.redis import enqueue
|
||||
from app.models.notes import NoteItem, NoteAttachment
|
||||
@@ -74,6 +75,7 @@ async def create_note(payload: NoteCreate, session: AsyncSession = Depends(get_s
|
||||
await session.commit()
|
||||
await session.refresh(note, ["attachments"])
|
||||
await enqueue("export_note_markdown", str(note.id))
|
||||
broadcaster.broadcast("notes_changed")
|
||||
return note
|
||||
|
||||
|
||||
@@ -97,6 +99,7 @@ async def update_note(
|
||||
await session.commit()
|
||||
await session.refresh(note, ["attachments"])
|
||||
await enqueue("export_note_markdown", str(note.id))
|
||||
broadcaster.broadcast("notes_changed")
|
||||
return note
|
||||
|
||||
|
||||
@@ -108,6 +111,7 @@ async def delete_note(note_id: uuid.UUID, session: AsyncSession = Depends(get_se
|
||||
await session.delete(note)
|
||||
await session.commit()
|
||||
await enqueue("remove_note_markdown", str(note_id))
|
||||
broadcaster.broadcast("notes_changed")
|
||||
return Response(status_code=204)
|
||||
|
||||
|
||||
@@ -151,6 +155,7 @@ async def add_attachment(
|
||||
await session.commit()
|
||||
await session.refresh(note, ["attachments"])
|
||||
await enqueue("export_note_markdown", str(note_id))
|
||||
broadcaster.broadcast("notes_changed")
|
||||
return note
|
||||
|
||||
|
||||
@@ -177,4 +182,5 @@ async def delete_attachment(
|
||||
await session.delete(att)
|
||||
await session.commit()
|
||||
await enqueue("export_note_markdown", str(note_id))
|
||||
broadcaster.broadcast("notes_changed")
|
||||
return Response(status_code=204)
|
||||
|
||||
@@ -1,4 +1,3 @@
|
||||
# backend/app/api/shopping.py
|
||||
import uuid
|
||||
from datetime import datetime, timezone, date as date_type
|
||||
from decimal import Decimal
|
||||
@@ -8,6 +7,8 @@ from sqlalchemy import select, text, or_
|
||||
from sqlalchemy.ext.asyncio import AsyncSession
|
||||
from sqlalchemy.orm import selectinload
|
||||
|
||||
from app.core.broadcaster import broadcaster
|
||||
|
||||
from app.core.database import get_session
|
||||
from app.models.shopping import ShoppingList, ListItem, Product, Store
|
||||
from app.schemas.shopping import (
|
||||
@@ -172,6 +173,7 @@ async def create_shopping_list(
|
||||
session.add(lst)
|
||||
await session.commit()
|
||||
await session.refresh(lst, ["items"])
|
||||
broadcaster.broadcast("shopping_changed")
|
||||
return ShoppingListDetailResponse(
|
||||
**_list_to_response(lst).model_dump(),
|
||||
items=[],
|
||||
@@ -214,6 +216,7 @@ async def update_shopping_list(
|
||||
for field, value in payload.model_dump(exclude_unset=True).items():
|
||||
setattr(lst, field, value)
|
||||
await session.commit()
|
||||
broadcaster.broadcast("shopping_changed")
|
||||
sorted_items = sorted(lst.items, key=lambda i: (i.sort_order or 999, str(i.id)))
|
||||
return ShoppingListDetailResponse(
|
||||
**_list_to_response(lst).model_dump(),
|
||||
@@ -228,6 +231,7 @@ async def delete_shopping_list(list_id: uuid.UUID, session: AsyncSession = Depen
|
||||
raise HTTPException(404, "Liste introuvable")
|
||||
await session.delete(lst)
|
||||
await session.commit()
|
||||
broadcaster.broadcast("shopping_changed")
|
||||
return Response(status_code=204)
|
||||
|
||||
|
||||
@@ -245,6 +249,7 @@ async def add_item(
|
||||
item = ListItem(list_id=list_id, **payload.model_dump())
|
||||
session.add(item)
|
||||
await session.commit()
|
||||
broadcaster.broadcast("shopping_changed")
|
||||
stmt = (
|
||||
select(ListItem)
|
||||
.where(ListItem.id == item.id)
|
||||
@@ -294,6 +299,7 @@ async def update_item(
|
||||
product.frequency_score += 1
|
||||
|
||||
await session.commit()
|
||||
broadcaster.broadcast("shopping_changed")
|
||||
await session.refresh(item, ["product"])
|
||||
return _item_to_response(item)
|
||||
|
||||
@@ -311,6 +317,7 @@ async def delete_item(
|
||||
raise HTTPException(404, "Article introuvable")
|
||||
await session.delete(item)
|
||||
await session.commit()
|
||||
broadcaster.broadcast("shopping_changed")
|
||||
return Response(status_code=204)
|
||||
|
||||
|
||||
@@ -386,6 +393,7 @@ async def generate_magic_list(session: AsyncSession = Depends(get_session)):
|
||||
))
|
||||
|
||||
await session.commit()
|
||||
broadcaster.broadcast("shopping_changed")
|
||||
|
||||
stmt = (
|
||||
select(ShoppingList)
|
||||
@@ -433,6 +441,7 @@ async def finish_shopping(list_id: uuid.UUID, session: AsyncSession = Depends(ge
|
||||
))
|
||||
|
||||
await session.commit()
|
||||
broadcaster.broadcast("shopping_changed")
|
||||
sorted_items = sorted(lst.items, key=lambda i: (i.sort_order or 999, str(i.id)))
|
||||
return ShoppingListDetailResponse(
|
||||
**_list_to_response(lst).model_dump(),
|
||||
|
||||
@@ -5,6 +5,7 @@ from fastapi.responses import Response
|
||||
from sqlalchemy import select, and_
|
||||
from sqlalchemy.ext.asyncio import AsyncSession
|
||||
|
||||
from app.core.broadcaster import broadcaster
|
||||
from app.core.database import get_session
|
||||
from app.models.todos import TodoItem
|
||||
from app.schemas.todos import TodoCreate, TodoUpdate, PostponeRequest, TodoResponse
|
||||
@@ -55,6 +56,7 @@ async def create_todo(
|
||||
session.add(item)
|
||||
await session.commit()
|
||||
await session.refresh(item)
|
||||
broadcaster.broadcast("todos_changed")
|
||||
return item
|
||||
|
||||
|
||||
@@ -74,6 +76,7 @@ async def update_todo(
|
||||
|
||||
await session.commit()
|
||||
await session.refresh(item)
|
||||
broadcaster.broadcast("todos_changed")
|
||||
return item
|
||||
|
||||
|
||||
@@ -87,6 +90,7 @@ async def delete_todo(
|
||||
raise HTTPException(status_code=404, detail="Tâche introuvable")
|
||||
await session.delete(item)
|
||||
await session.commit()
|
||||
broadcaster.broadcast("todos_changed")
|
||||
return Response(status_code=204)
|
||||
|
||||
|
||||
@@ -108,4 +112,5 @@ async def postpone_todo(
|
||||
|
||||
await session.commit()
|
||||
await session.refresh(item)
|
||||
broadcaster.broadcast("todos_changed")
|
||||
return item
|
||||
|
||||
@@ -0,0 +1,36 @@
|
||||
import asyncio
|
||||
import json
|
||||
from typing import AsyncGenerator
|
||||
|
||||
|
||||
class EventBroadcaster:
|
||||
def __init__(self):
|
||||
self._queues: set[asyncio.Queue] = set()
|
||||
|
||||
async def subscribe(self) -> AsyncGenerator[str, None]:
|
||||
queue: asyncio.Queue[str] = asyncio.Queue(maxsize=32)
|
||||
self._queues.add(queue)
|
||||
try:
|
||||
while True:
|
||||
try:
|
||||
msg = await asyncio.wait_for(queue.get(), timeout=25)
|
||||
yield msg
|
||||
except asyncio.TimeoutError:
|
||||
yield ": keepalive\n\n"
|
||||
except asyncio.CancelledError:
|
||||
pass
|
||||
finally:
|
||||
self._queues.discard(queue)
|
||||
|
||||
def broadcast(self, event_type: str, data: dict | None = None) -> None:
|
||||
if not self._queues:
|
||||
return
|
||||
msg = f"event: {event_type}\ndata: {json.dumps(data or {})}\n\n"
|
||||
for queue in list(self._queues):
|
||||
try:
|
||||
queue.put_nowait(msg)
|
||||
except asyncio.QueueFull:
|
||||
pass
|
||||
|
||||
|
||||
broadcaster = EventBroadcaster()
|
||||
@@ -4,6 +4,7 @@ from fastapi import FastAPI
|
||||
from fastapi.middleware.cors import CORSMiddleware
|
||||
from fastapi.staticfiles import StaticFiles
|
||||
from app.api.admin import router as admin_router
|
||||
from app.api.events import router as events_router
|
||||
from app.api.health import router as health_router
|
||||
from app.api.media import router as media_router
|
||||
from app.api.notes import router as notes_router
|
||||
@@ -36,6 +37,7 @@ app.add_middleware(
|
||||
)
|
||||
|
||||
app.include_router(health_router, prefix="/api")
|
||||
app.include_router(events_router, prefix="/api/events")
|
||||
app.include_router(admin_router, prefix="/api/admin")
|
||||
app.include_router(media_router, prefix="/api/media")
|
||||
app.include_router(notes_router, prefix="/api/notes")
|
||||
|
||||
@@ -14,6 +14,17 @@ server {
|
||||
add_header Cache-Control "public, immutable";
|
||||
}
|
||||
|
||||
location /api/events/stream {
|
||||
proxy_pass http://backend:8000/api/events/stream;
|
||||
proxy_set_header Host $host;
|
||||
proxy_set_header X-Real-IP $remote_addr;
|
||||
proxy_http_version 1.1;
|
||||
proxy_set_header Connection "";
|
||||
proxy_buffering off;
|
||||
proxy_cache off;
|
||||
proxy_read_timeout 86400s;
|
||||
}
|
||||
|
||||
location /api/ {
|
||||
proxy_pass http://backend:8000/api/;
|
||||
proxy_set_header Host $host;
|
||||
|
||||
@@ -1,7 +1,7 @@
|
||||
{
|
||||
"name": "homehub-frontend",
|
||||
"private": true,
|
||||
"version": "0.5.7",
|
||||
"version": "0.5.8",
|
||||
"type": "module",
|
||||
"scripts": {
|
||||
"dev": "vite",
|
||||
|
||||
@@ -0,0 +1,28 @@
|
||||
import { useEffect, useRef } from 'react'
|
||||
|
||||
export function useServerEvents(handlers: Record<string, () => void>) {
|
||||
const handlersRef = useRef(handlers)
|
||||
handlersRef.current = handlers
|
||||
|
||||
useEffect(() => {
|
||||
let es: EventSource
|
||||
let retryTimeout: ReturnType<typeof setTimeout>
|
||||
|
||||
function connect() {
|
||||
es = new EventSource('/api/events/stream')
|
||||
Object.keys(handlersRef.current).forEach(event => {
|
||||
es.addEventListener(event, () => handlersRef.current[event]?.())
|
||||
})
|
||||
es.onerror = () => {
|
||||
es.close()
|
||||
retryTimeout = setTimeout(connect, 3000)
|
||||
}
|
||||
}
|
||||
|
||||
connect()
|
||||
return () => {
|
||||
es?.close()
|
||||
clearTimeout(retryTimeout)
|
||||
}
|
||||
}, [])
|
||||
}
|
||||
@@ -1,4 +1,5 @@
|
||||
import { useState, useEffect, useCallback, useRef } from 'react'
|
||||
import { useServerEvents } from '../hooks/useServerEvents'
|
||||
import type { Note, NoteFilters } from '../api/notes'
|
||||
import { fetchNotes, createNote, updateNote, deleteNote, addAttachment, deleteAttachment } from '../api/notes'
|
||||
import Modal from '../components/Modal'
|
||||
@@ -354,6 +355,7 @@ export default function NotesPage() {
|
||||
}, [filters])
|
||||
|
||||
useEffect(() => { void load() }, [load])
|
||||
useServerEvents({ notes_changed: () => void load() })
|
||||
|
||||
function handleSearchChange(val: string) {
|
||||
setSearchInput(val)
|
||||
|
||||
@@ -1,5 +1,6 @@
|
||||
// frontend/src/pages/ShoppingPage.tsx
|
||||
import { useState, useEffect, useCallback, useRef } from 'react'
|
||||
import { useServerEvents } from '../hooks/useServerEvents'
|
||||
import { matchesSearch } from '../utils/search'
|
||||
import type { ShoppingListDetail, ShoppingList, Store, Product, ShoppingItem } from '../api/shopping'
|
||||
import {
|
||||
@@ -127,6 +128,7 @@ export default function ShoppingPage() {
|
||||
}, [])
|
||||
|
||||
useEffect(() => { void loadData() }, [loadData])
|
||||
useServerEvents({ shopping_changed: () => void loadData() })
|
||||
|
||||
async function refreshProducts() {
|
||||
try {
|
||||
|
||||
@@ -1,5 +1,6 @@
|
||||
// frontend/src/pages/TodosPage.tsx
|
||||
import { useState, useEffect, useCallback } from 'react'
|
||||
import { useServerEvents } from '../hooks/useServerEvents'
|
||||
import type { Todo, TodoCreate, TodoFilters } from '../api/todos'
|
||||
import { fetchTodos, createTodo, updateTodo, deleteTodo, postponeTodo } from '../api/todos'
|
||||
import SwipeableRow from '../components/todos/SwipeableRow'
|
||||
@@ -73,6 +74,7 @@ export default function TodosPage() {
|
||||
}, [filters])
|
||||
|
||||
useEffect(() => { void load() }, [load])
|
||||
useServerEvents({ todos_changed: () => void load() })
|
||||
|
||||
async function handleCreate(data: TodoCreate) {
|
||||
try {
|
||||
|
||||
Reference in New Issue
Block a user