Flux actuel avec 3 points d'entrée (Telegram, WebSocket, Heartbeat) qui convergent vers un noyau LLM partagé,
puis divergent vers leurs sorties respectives. Les étapes en violet sont partagées entre les canaux.
Points d'entrée
POST /webhook/telegramtelegram_webhook
WS /api/v1/chat/{chat_id}websocket_chat_endpoint
background taskheartbeat_loop
process_telegram_updateextrait chat_id, text, media
process_ws_requestextrait chat_id, text, image/audio b64
trigger_heartbeatlit heartbeat.md
Logique partagée (dupliquée)
ensure_user_workspace(chat_id)crée arborescence + copie templates si absents
handle_command(chat_id, text)/reset /context /load_xxx /voice_xxx /link_xxx…
cmd trouvée →
send reply & return earlyWS → send_ws_message · TG → send_telegram_message
aucune commande → continue
Gestion des médias
save_telegram_file()télécharge depuis Telegram API · convertit ogg→wav
save_ws_file()decode base64 · détecte ext · écrit disque
—
Logique partagée (dupliquée)
mcp.transcribe_audio(path)si audio : whisper → injecte [TRANSCRIPTION DU VOCAL : …] dans text
Noyau LLM partagé
process_request(chat_id, text, image_b64, rel_path, stream)prépare contexte · get_llm_config · get_system_prompt · gère NOTES/HISTORY · enregistre ACTIVE_TASKS
stream=True (Telegram / Heartbeat)
stream=False (WebSocket)
call_llm(stream=True)SSE → générateur d'events
process_request_legacy → call_llm(stream=False)JSON direct
stream_telegram_response()gère events : message.delta · message.end · tool_call.* · chat.end
update_status · send_telegram_message
return reply (str)le caller envoie via send_ws_message
finalize_request(chat_id, state)LAST_STATS · HISTORY[chat_id] = response_id · ou NOTES si image
CancelledError →
NOTES[chat_id] = (interrupted_context, image_b64)contexte d'interruption injecté au prochain tour
HISTORYresponse_id par chat_id
NOTEScontexte vision / interruption
LAST_STATStokens, vitesse, latence
Boucle Heartbeat (fond)
heartbeat_loopsleep aléatoire 15–45 min
→ pour chaque user dir → trigger_heartbeat → process_request(stream=True) → ↑ noyau LLM
Option A
Channel Adapter Pattern. Chaque canal expose un adaptateur minimaliste qui normalise le message entrant en un objet
ChannelRequest standard, puis un pipeline unifié gère toute la logique (workspace, commandes, média, LLM).
La sortie est aussi déléguée à un OutputAdapter par canal. Élimine ~150 lignes de duplication.
POST /webhook/telegram
WS /api/v1/chat/{chat_id}
heartbeat_loop
Input Adapters (thin)
TelegramInputAdapterextrait Update → ChannelRequest
WSInputAdapterextrait dict WS → ChannelRequest
HeartbeatAdapterlit heartbeat.md → ChannelRequest
ChannelRequest(chat_id, text, image_b64, audio_b64, source, stream)objet de données normalisé — toute la logique métier en ignore la source
Pipeline unifié
channel_pipeline(req: ChannelRequest)unique point d'entrée pour la logique métier
1. ensure_user_workspace(req.chat_id)
2. handle_command(req.chat_id, req.text)
→ output_adapter.send(req, reply) · return
3. save_media(req) → (rel_path, image_b64)unique fonction unifiée TG + WS
4. transcribe_if_audio(req.chat_id, rel_path, req.text) → text enrichi
5. process_request(req.chat_id, text, image_b64, rel_path, req.stream)noyau LLM inchangé
6. output_adapter.send(req, reply)dispatch selon req.source
Output Adapters (thin)
TelegramOutputAdaptersend_telegram_message / stream_telegram_response
WSOutputAdaptersend_ws_message
Avantages
Supprime ~150 lignes de code dupliqué entre process_telegram_update et process_ws_request
Ajouter un nouveau canal = 1 adaptateur, 0 modification du pipeline
La logique métier est testable indépendamment du transport
ChannelRequest rend le flux explicite et auto-documenté
Inconvénients
Refactoring plus structurant — nécessite de déplacer du code existant
stream_telegram_response reste spécifique Telegram (logique d'UI bot), pas vraiment abstraitable sans surcoût
Introduce un objet de données (ChannelRequest) à maintenir
@dataclass
class ChannelRequest:
chat_id: str
text: str
image_b64: str | None = None
audio_b64: str | None = None
source: str = "ws" # "telegram" | "ws" | "heartbeat"
stream: bool = False
async def channel_pipeline(req: ChannelRequest) -> None:
ensure_user_workspace(req.chat_id)
if reply := await handle_command(req.chat_id, req.text):
await output_adapter.send(req, reply); return
rel_path, image_b64 = await save_media(req)
text = await transcribe_if_audio(req.chat_id, rel_path, req.text)
reply = await process_request(req.chat_id, text, image_b64, rel_path, req.stream)
if reply: await output_adapter.send(req, reply)
Option B
Shared Helpers (refacto chirurgicale). On garde les deux handlers de canaux existants mais on extrait les fonctions dupliquées :
save_media() unifie save_ws_file + save_telegram_file, et transcribe_if_audio() remplace le bloc copié-collé.
Moins de disruption, même résultat sur la duplication de logique.
process_telegram_updatehandler Telegram (inchangé structurellement)
process_ws_requesthandler WebSocket (inchangé structurellement)
Helpers partagés extraits
ensure_user_workspace · handle_command (déjà centralisés)
🆕 save_media(chat_id, source, data, text, file_type)unifie save_ws_file + save_telegram_file
source="telegram"|"ws" détermine comment récupérer les bytes
Avant : deux fonctions de ~40 lignes
faisant la même chose (slug, timestamp,
ext detection, write, return rel_path+b64)
Après : 1 fonction + 2 petits adaptateurs
pour la récupération des bytes (TG vs WS)
🆕 transcribe_if_audio(chat_id, rel_path, text) → strunifie le bloc try/except whisper copié dans les deux handlers
Avant : ~15 lignes dupliquées dans
process_telegram_update et process_ws_request
Après : 1 appel propre dans chaque handler
Noyau LLM inchangé
process_request · call_llm · stream_telegram_response · finalize_requestaucune modification requise
send_telegram_message (inchangé)
send_ws_message (inchangé)
Avantages
Refactoring minimal, risque très faible — on extrait sans restructurer
Élimine ~80 lignes de duplication (save_file × 2, transcribe × 2)
Les handlers existants restent lisibles — structure identique
Facile à faire en une PR, sans casser les tests existants
Inconvénients
La structure en double (process_tg / process_ws) reste — si un 3e canal arrive, copier-coller encore
Le couplage stream/output dans process_request n'est pas adressé
Amélioration moindre qu'Option A sur l'architecture globale
# Nouvelle signature unifiée (exemples)
async def save_media(
chat_id: str, source: str,
data: bytes | str, # bytes pour TG, b64 str pour WS
text: str, file_type: str
) -> tuple[str | None, str | None]:
...
async def transcribe_if_audio(
chat_id: str, rel_path: str | None, text: str
) -> str:
if not rel_path or "/audio/" not in rel_path: return text
# ... appel whisper, injection transcription ...
return enriched_text
Comparaison directe entre l'architecture actuelle et les deux options de refactorisation sur les critères clés.
| Critère |
Actuel |
Option B Shared Helpers |
Option A Channel Adapter |
| Duplication de code |
~180 lignes dupliquées process_tg ≈ process_ws ; save_tg ≈ save_ws ; transcribe × 2 |
~100 lignes restantes handlers toujours en double structurellement |
~30 lignes (adapters thin) pipeline + helpers centralisés |
| Ajouter un 3e canal |
Copier process_xxx re-implémenter les 5 étapes |
Copier handler + appeler helpers moins de copie mais structure répétée |
1 InputAdapter + 1 OutputAdapter pipeline inchangé |
| Risque du refactoring |
— |
Faible extraction de fonctions, pas de restructuration |
Modéré déplacement de logique entre modules |
| Testabilité |
Faible logique métier mélangée au transport |
Meilleure helpers testables unitairement |
Bonne channel_pipeline testable sans bot/WS |
| Couplage stream/output |
Fort stream_telegram_response hardcodée dans process_request |
Inchangé |
Partiellement résolu output_adapter isole la livraison, mais stream_tg reste spécifique |
| Lisibilité des handlers |
Moyenne ~50 lignes chacun, logique répétée |
Bonne handlers courts, logique dans les helpers |
Excellente adapters de ~10 lignes |
| Effort d'implémentation |
— |
~1h · 1 PR |
~3h · 1-2 PR |
| Recommandation |
— |
Bon point de départ à faire en premier, peut évoluer vers A |
Cible idéale après avoir fait B, ou directement si on veut scale |
Recommandation
Commencer par Option B : extraire save_media() et transcribe_if_audio() dans une PR rapide.
C'est un gain immédiat (−80 lignes, 0 régression) qui prépare le terrain.
Ensuite, si un 3e canal (Discord, API REST…) devient nécessaire, migrer vers
Option A qui capitalise sur les helpers déjà extraits.