Alfred — Architecture du flux & Refactoring

Représentation du flux logiciel actuel + 2 options de refactorisation comparées

Architecture actuelle
Option A — Channel Adapter
Option B — Shared Helpers
Comparaison

Flux actuel avec 3 points d'entrée (Telegram, WebSocket, Heartbeat) qui convergent vers un noyau LLM partagé, puis divergent vers leurs sorties respectives. Les étapes en violet sont partagées entre les canaux.

Points d'entrée
POST /webhook/telegramtelegram_webhook
WS /api/v1/chat/{chat_id}websocket_chat_endpoint
background taskheartbeat_loop
process_telegram_updateextrait chat_id, text, media
process_ws_requestextrait chat_id, text, image/audio b64
trigger_heartbeatlit heartbeat.md
Logique partagée (dupliquée)
ensure_user_workspace(chat_id)crée arborescence + copie templates si absents
handle_command(chat_id, text)/reset /context /load_xxx /voice_xxx /link_xxx…
cmd trouvée →
send reply & return earlyWS → send_ws_message · TG → send_telegram_message
aucune commande → continue
Gestion des médias
save_telegram_file()télécharge depuis Telegram API · convertit ogg→wav
save_ws_file()decode base64 · détecte ext · écrit disque
Logique partagée (dupliquée)
mcp.transcribe_audio(path)si audio : whisper → injecte [TRANSCRIPTION DU VOCAL : …] dans text
Noyau LLM partagé
process_request(chat_id, text, image_b64, rel_path, stream)prépare contexte · get_llm_config · get_system_prompt · gère NOTES/HISTORY · enregistre ACTIVE_TASKS
stream=True (Telegram / Heartbeat)
stream=False (WebSocket)
call_llm(stream=True)SSE → générateur d'events
process_request_legacy → call_llm(stream=False)JSON direct
stream_telegram_response()gère events : message.delta · message.end · tool_call.* · chat.end
update_status · send_telegram_message
return reply (str)le caller envoie via send_ws_message
finalize_request(chat_id, state)LAST_STATS · HISTORY[chat_id] = response_id · ou NOTES si image
CancelledError →
NOTES[chat_id] = (interrupted_context, image_b64)contexte d'interruption injecté au prochain tour
HISTORYresponse_id par chat_id
NOTEScontexte vision / interruption
LAST_STATStokens, vitesse, latence
Boucle Heartbeat (fond)
heartbeat_loopsleep aléatoire 15–45 min
→ pour chaque user dir → trigger_heartbeat → process_request(stream=True) → ↑ noyau LLM
Entrée Telegram
Entrée WebSocket
Heartbeat
Utilitaire partagé
Gestion commande
Gestion média
Noyau LLM
Stream Telegram
Finalisation
Sortie anticipée (cmd)

Option A Channel Adapter Pattern. Chaque canal expose un adaptateur minimaliste qui normalise le message entrant en un objet ChannelRequest standard, puis un pipeline unifié gère toute la logique (workspace, commandes, média, LLM). La sortie est aussi déléguée à un OutputAdapter par canal. Élimine ~150 lignes de duplication.

POST /webhook/telegram
WS /api/v1/chat/{chat_id}
heartbeat_loop
Input Adapters (thin)
TelegramInputAdapterextrait Update → ChannelRequest
WSInputAdapterextrait dict WS → ChannelRequest
HeartbeatAdapterlit heartbeat.md → ChannelRequest
ChannelRequest(chat_id, text, image_b64, audio_b64, source, stream)objet de données normalisé — toute la logique métier en ignore la source
Pipeline unifié
channel_pipeline(req: ChannelRequest)unique point d'entrée pour la logique métier
1. ensure_user_workspace(req.chat_id)
2. handle_command(req.chat_id, req.text)
→ output_adapter.send(req, reply) · return
3. save_media(req) → (rel_path, image_b64)unique fonction unifiée TG + WS
4. transcribe_if_audio(req.chat_id, rel_path, req.text) → text enrichi
5. process_request(req.chat_id, text, image_b64, rel_path, req.stream)noyau LLM inchangé
6. output_adapter.send(req, reply)dispatch selon req.source
Output Adapters (thin)
TelegramOutputAdaptersend_telegram_message / stream_telegram_response
WSOutputAdaptersend_ws_message
Avantages
Supprime ~150 lignes de code dupliqué entre process_telegram_update et process_ws_request
Ajouter un nouveau canal = 1 adaptateur, 0 modification du pipeline
La logique métier est testable indépendamment du transport
ChannelRequest rend le flux explicite et auto-documenté
Inconvénients
Refactoring plus structurant — nécessite de déplacer du code existant
stream_telegram_response reste spécifique Telegram (logique d'UI bot), pas vraiment abstraitable sans surcoût
Introduce un objet de données (ChannelRequest) à maintenir
@dataclass
class ChannelRequest:
  chat_id: str
  text: str
  image_b64: str | None = None
  audio_b64: str | None = None
  source: str = "ws" # "telegram" | "ws" | "heartbeat"
  stream: bool = False

async def channel_pipeline(req: ChannelRequest) -> None:
  ensure_user_workspace(req.chat_id)
  if reply := await handle_command(req.chat_id, req.text):
    await output_adapter.send(req, reply); return
  rel_path, image_b64 = await save_media(req)
  text = await transcribe_if_audio(req.chat_id, rel_path, req.text)
  reply = await process_request(req.chat_id, text, image_b64, rel_path, req.stream)
  if reply: await output_adapter.send(req, reply)

Option B Shared Helpers (refacto chirurgicale). On garde les deux handlers de canaux existants mais on extrait les fonctions dupliquées : save_media() unifie save_ws_file + save_telegram_file, et transcribe_if_audio() remplace le bloc copié-collé. Moins de disruption, même résultat sur la duplication de logique.

process_telegram_updatehandler Telegram (inchangé structurellement)
process_ws_requesthandler WebSocket (inchangé structurellement)
Helpers partagés extraits
ensure_user_workspace · handle_command (déjà centralisés)
🆕 save_media(chat_id, source, data, text, file_type)unifie save_ws_file + save_telegram_file
source="telegram"|"ws" détermine comment récupérer les bytes
Avant : deux fonctions de ~40 lignes
faisant la même chose (slug, timestamp,
ext detection, write, return rel_path+b64)

Après : 1 fonction + 2 petits adaptateurs
pour la récupération des bytes (TG vs WS)
🆕 transcribe_if_audio(chat_id, rel_path, text) → strunifie le bloc try/except whisper copié dans les deux handlers
Avant : ~15 lignes dupliquées dans
process_telegram_update et process_ws_request

Après : 1 appel propre dans chaque handler
Noyau LLM inchangé
process_request · call_llm · stream_telegram_response · finalize_requestaucune modification requise
send_telegram_message (inchangé)
send_ws_message (inchangé)
Avantages
Refactoring minimal, risque très faible — on extrait sans restructurer
Élimine ~80 lignes de duplication (save_file × 2, transcribe × 2)
Les handlers existants restent lisibles — structure identique
Facile à faire en une PR, sans casser les tests existants
Inconvénients
La structure en double (process_tg / process_ws) reste — si un 3e canal arrive, copier-coller encore
Le couplage stream/output dans process_request n'est pas adressé
Amélioration moindre qu'Option A sur l'architecture globale
# Nouvelle signature unifiée (exemples)

async def save_media(
  chat_id: str, source: str,
  data: bytes | str, # bytes pour TG, b64 str pour WS
  text: str, file_type: str
) -> tuple[str | None, str | None]:
  ...

async def transcribe_if_audio(
  chat_id: str, rel_path: str | None, text: str
) -> str:
  if not rel_path or "/audio/" not in rel_path: return text
  # ... appel whisper, injection transcription ...
  return enriched_text

Comparaison directe entre l'architecture actuelle et les deux options de refactorisation sur les critères clés.

Critère Actuel Option B Shared Helpers Option A Channel Adapter
Duplication de code ~180 lignes dupliquées
process_tg ≈ process_ws ; save_tg ≈ save_ws ; transcribe × 2
~100 lignes restantes
handlers toujours en double structurellement
~30 lignes (adapters thin)
pipeline + helpers centralisés
Ajouter un 3e canal Copier process_xxx
re-implémenter les 5 étapes
Copier handler + appeler helpers
moins de copie mais structure répétée
1 InputAdapter + 1 OutputAdapter
pipeline inchangé
Risque du refactoring Faible
extraction de fonctions, pas de restructuration
Modéré
déplacement de logique entre modules
Testabilité Faible
logique métier mélangée au transport
Meilleure
helpers testables unitairement
Bonne
channel_pipeline testable sans bot/WS
Couplage stream/output Fort
stream_telegram_response hardcodée dans process_request
Inchangé Partiellement résolu
output_adapter isole la livraison, mais stream_tg reste spécifique
Lisibilité des handlers Moyenne
~50 lignes chacun, logique répétée
Bonne
handlers courts, logique dans les helpers
Excellente
adapters de ~10 lignes
Effort d'implémentation ~1h · 1 PR ~3h · 1-2 PR
Recommandation Bon point de départ
à faire en premier, peut évoluer vers A
Cible idéale
après avoir fait B, ou directement si on veut scale

Commencer par Option B : extraire save_media() et transcribe_if_audio() dans une PR rapide. C'est un gain immédiat (−80 lignes, 0 régression) qui prépare le terrain. Ensuite, si un 3e canal (Discord, API REST…) devient nécessaire, migrer vers Option A qui capitalise sur les helpers déjà extraits.