automation
DiscordOllamaSQLitePython
Bot Discord avec Ollama et SQLite
Un bot Discord complet et asynchrone qui s'intègre à une instance locale d'Ollama et suit l'historique des utilisateurs dans SQLite.
Ce fragment fournit une implémentation robuste et basée sur des classes d’un bot Discord qui converse en utilisant un LLM local via Ollama. Il propose un traitement asynchrone des messages, des limites de concurrency (sémaphores), des réponses en streaming et un suivi persistant de l’historique des utilisateurs avec SQLite.
Fonctionnalités
- Asynchrone et Concurrent : Utilise les files d’attente et les sémaphores
asynciopour gérer plusieurs canaux efficacement sans surcharger le LLM local. - Réponses en Streaming : Transmet les tokens d’Ollama vers Discord en temps réel (ou par morceaux) pour une expérience réactive.
- Conscience du Contexte : Maintient l’historique des conversations par utilisateur/canal, limité à une taille configurable.
- Persistance : Enregistre l’activité des utilisateurs (dernière visite, nom d’utilisateur) dans une base de données SQLite locale.
- Architecture Propre : Refactorisé en classes (
Config,Database,OllamaClient,Bot) en suivant les principes SOLID.
Le Code
Enregistrez ce fichier sous discord_ollama_bot.py.
import discord
import httpx
import asyncio
import json
import sqlite3
import logging
from datetime import datetime, timezone
from typing import Dict, List, Optional, Set, Tuple
# Configure logging
logging.basicConfig(
level=logging.INFO,
format="%(asctime)s [%(levelname)s] %(message)s",
datefmt="%Y-%m-%d %H:%M:%S"
)
logger = logging.getLogger(__name__)
class Config:
"""Application configuration constants."""
# REPLACE THIS WITH YOUR ACTUAL TOKEN
DISCORD_TOKEN = "YOUR_DISCORD_BOT_TOKEN"
# Ollama Configuration
OLLAMA_MODEL = "llama3" # Change to your preferred model
OLLAMA_URL = "http://localhost:11434/v1/chat/completions"
# Bot Settings
MAX_HISTORY_LENGTH = 6
MAX_CONCURRENT_WORKERS = 2
DB_NAME = "bot_data.db"
class DatabaseManager:
"""Handles SQLite database interactions."""
def __init__(self, db_name: str):
self.db_name = db_name
self._init_db()
def _get_connection(self) -> sqlite3.Connection:
return sqlite3.connect(self.db_name)
def _init_db(self):
"""Initializes the database schema."""
with self._get_connection() as conn:
cursor = conn.cursor()
cursor.execute("""
CREATE TABLE IF NOT EXISTS users (
user_id TEXT PRIMARY KEY,
user_name TEXT,
last_message TEXT,
last_seen TIMESTAMP
)
""")
conn.commit()
def register_user(self, user_id: str, user_name: str, last_message: str):
"""Upserts user data into the database."""
try:
with self._get_connection() as conn:
cursor = conn.cursor()
cursor.execute("""
INSERT INTO users (user_id, user_name, last_message, last_seen)
VALUES (?, ?, ?, ?)
ON CONFLICT(user_id) DO UPDATE SET
user_name=excluded.user_name,
last_message=excluded.last_message,
last_seen=excluded.last_seen
""", (user_id, user_name, last_message, datetime.now(timezone.utc)))
conn.commit()
except sqlite3.Error as e:
logger.error(f"Database error: {e}")
class OllamaClient:
"""Handles communication with the Ollama API."""
def __init__(self, model: str, url: str):
self.model = model
self.url = url
async def generate_response(self, messages: List[Dict[str, str]]) -> Tuple[str, str]:
"""
Generates a response from Ollama.
Returns a tuple of (full_text, error_message).
"""
headers = {"Content-Type": "application/json"}
payload = {
"model": self.model,
"messages": messages,
"stream": True,
"keep_alive": -1
}
full_text = ""
try:
async with httpx.AsyncClient(timeout=None) as client:
async with client.stream("POST", self.url, json=payload, headers=headers) as response:
if response.status_code != 200:
return "", f"API Error: {response.status_code}"
async for line in response.aiter_lines():
line = line.strip()
if not line or not line.startswith("data: "):
continue
data = line[len("data: "):]
if data == "[DONE]":
break
try:
parsed = json.loads(data)
delta = parsed["choices"][0]["delta"].get("content", "")
full_text += delta
except json.JSONDecodeError:
continue
return full_text, ""
except httpx.RequestError as e:
return "", f"Connection error: {str(e)}"
class WorkerManager:
"""Manages async workers for processing message queues per channel."""
def __init__(self, process_callback):
self.queues: Dict[str, asyncio.Queue] = {}
self.semaphores: Dict[str, asyncio.Semaphore] = {}
self.active_channels: Set[str] = set()
self.process_callback = process_callback
async def enqueue(self, channel_id: str, item):
"""Adds an item to the channel's processing queue."""
if channel_id not in self.queues:
self.queues[channel_id] = asyncio.Queue()
if channel_id not in self.semaphores:
self.semaphores[channel_id] = asyncio.Semaphore(Config.MAX_CONCURRENT_WORKERS)
await self.queues[channel_id].put(item)
await self._start_workers(channel_id)
async def _start_workers(self, channel_id: str):
"""Starts worker tasks for a channel if not already running."""
if channel_id in self.active_channels:
return
self.active_channels.add(channel_id)
queue = self.queues[channel_id]
semaphore = self.semaphores[channel_id]
async def worker():
while True:
try:
# Wait for a task, but timeout if idle to clean up
try:
# Simple timeout to allow worker cleanup if queue is empty for a while
# In a real app, you might want more sophisticated lifecycle management
item = await asyncio.wait_for(queue.get(), timeout=60.0)
except asyncio.TimeoutError:
break
async with semaphore:
await self.process_callback(item)
queue.task_done()
except Exception as e:
logger.error(f"Worker error in channel {channel_id}: {e}")
# Launch workers
for _ in range(Config.MAX_CONCURRENT_WORKERS):
asyncio.create_task(worker())
# Note: In this simple implementation, we don't strictly wait for workers to finish
# before removing from active_channels to keep it simple.
# A more robust solution would track tasks.
class DiscordBot(discord.Client):
"""Main Bot Class."""
def __init__(self):
intents = discord.Intents.default()
intents.messages = True
intents.message_content = True
super().__init__(intents=intents)
self.db = DatabaseManager(Config.DB_NAME)
self.ollama = OllamaClient(Config.OLLAMA_MODEL, Config.OLLAMA_URL)
self.worker_manager = WorkerManager(self.process_message_task)
# In-memory conversation history: { "channel_id:user_id": [messages] }
self.conversations: Dict[str, List[Dict[str, str]]] = {}
async def on_ready(self):
logger.info(f"✅ Bot connected as {self.user}")
async def process_message_task(self, task_data):
"""Callback function executed by workers."""
message, prompt = task_data
channel_id = str(message.channel.id)
author_id = str(message.author.id)
conversation_key = f"{channel_id}:{author_id}"
# Initialize history if needed
if conversation_key not in self.conversations:
self.conversations[conversation_key] = []
# Update history
history = self.conversations[conversation_key]
history.append({"role": "user", "content": prompt})
# Trim history
if len(history) > Config.MAX_HISTORY_LENGTH:
history = history[-Config.MAX_HISTORY_LENGTH:]
self.conversations[conversation_key] = history
# Show typing indicator
async with message.channel.typing():
response_text, error = await self.ollama.generate_response(history)
if error:
await message.channel.send(f"❌ Error: {error}")
return
# Discord message limit handling
if len(response_text) > 2000:
response_text = response_text[-2000:]
await message.reply(response_text, mention_author=True)
# Update history with assistant response
history.append({"role": "assistant", "content": response_text})
# Log to DB
self.db.register_user(author_id, message.author.name, prompt)
async def on_message(self, message: discord.Message):
if message.author.bot:
return
content = message.content.strip()
channel_id = str(message.channel.id)
author_id = str(message.author.id)
# Command: Reset Context
if content == "!reset":
key = f"{channel_id}:{author_id}"
self.conversations.pop(key, None)
await message.channel.send("🔄 Conversation context reset.")
return
# Check if bot is mentioned or invoked via command
is_mentioned = self.user in message.mentions
is_command = content.lower().startswith("!ai ")
prompt = ""
if is_command:
prompt = content[4:].strip()
elif is_mentioned:
# Remove mention from prompt
prompt = content.replace(f"<@{self.user.id}>", "").strip()
if prompt:
await self.worker_manager.enqueue(channel_id, (message, prompt))
if __name__ == "__main__":
bot = DiscordBot()
bot.run(Config.DISCORD_TOKEN)
Configuration
- Installer les Dépendances :
pip install discord.py httpx aiofiles - Ollama : Assurez-vous qu’Ollama est en cours d’exécution (
ollama serve) et que vous avez téléchargé le modèle (par exemple,ollama pull llama3). - Token : Remplacez
DISCORD_TOKENdans la classeConfigpar le token réel de votre bot depuis le Portail des Développeurs Discord.
Ne commitez jamais votre token Discord réel dans un dépôt public. Utilisez des variables d’environnement (par exemple,
python-dotenv) pour les déploiements en production.