Bot do Discord com Ollama e SQLite
Um bot completo e assíncrono do Discord que integra com uma instância local do Ollama e registra histórico de usuários no SQLite.
Este snippet traz uma implementação robusta, orientada a classes, de um bot do Discord que conversa usando um LLM local via Ollama. Ele tem processamento assíncrono de mensagens, limites de concorrência com semáforos, respostas em streaming e persistência de histórico de usuário usando SQLite.
Funcionalidades
- Assíncrono e concorrente: usa filas do
asyncioe semáforos para lidar com múltiplos canais de forma eficiente, sem sobrecarregar o LLM local. - Respostas em streaming: faz streaming de tokens do Ollama para o Discord em tempo real, ou em blocos, para dar sensação de resposta rápida.
- Consciência de contexto: mantém histórico de conversa por usuário e canal, com corte automático até um limite configurável.
- Persistência: registra atividade do usuário (última vez visto, nome de usuário) em um banco SQLite local.
- Arquitetura limpa: organizado em classes (
Config,Database,OllamaClient,Bot) seguindo princípios SOLID.
O código
Salve este arquivo como discord_ollama_bot.py.
import discord
import httpx
import asyncio
import json
import sqlite3
import logging
from datetime import datetime, timezone
from typing import Dict, List, Optional, Set, Tuple
# Configure logging
logging.basicConfig(
level=logging.INFO,
format="%(asctime)s [%(levelname)s] %(message)s",
datefmt="%Y-%m-%d %H:%M:%S"
)
logger = logging.getLogger(__name__)
class Config:
"""Application configuration constants."""
# REPLACE THIS WITH YOUR ACTUAL TOKEN
DISCORD_TOKEN = "YOUR_DISCORD_BOT_TOKEN"
# Ollama Configuration
OLLAMA_MODEL = "llama3" # Change to your preferred model
OLLAMA_URL = "http://localhost:11434/v1/chat/completions"
# Bot Settings
MAX_HISTORY_LENGTH = 6
MAX_CONCURRENT_WORKERS = 2
DB_NAME = "bot_data.db"
class DatabaseManager:
"""Handles SQLite database interactions."""
def __init__(self, db_name: str):
self.db_name = db_name
self._init_db()
def _get_connection(self) -> sqlite3.Connection:
return sqlite3.connect(self.db_name)
def _init_db(self):
"""Initializes the database schema."""
with self._get_connection() as conn:
cursor = conn.cursor()
cursor.execute("""
CREATE TABLE IF NOT EXISTS users (
user_id TEXT PRIMARY KEY,
user_name TEXT,
last_message TEXT,
last_seen TIMESTAMP
)
""")
conn.commit()
def register_user(self, user_id: str, user_name: str, last_message: str):
"""Upserts user data into the database."""
try:
with self._get_connection() as conn:
cursor = conn.cursor()
cursor.execute("""
INSERT INTO users (user_id, user_name, last_message, last_seen)
VALUES (?, ?, ?, ?)
ON CONFLICT(user_id) DO UPDATE SET
user_name=excluded.user_name,
last_message=excluded.last_message,
last_seen=excluded.last_seen
""", (user_id, user_name, last_message, datetime.now(timezone.utc)))
conn.commit()
except sqlite3.Error as e:
logger.error(f"Database error: {e}")
class OllamaClient:
"""Handles communication with the Ollama API."""
def __init__(self, model: str, url: str):
self.model = model
self.url = url
async def generate_response(self, messages: List[Dict[str, str]]) -> Tuple[str, str]:
"""
Generates a response from Ollama.
Returns a tuple of (full_text, error_message).
"""
headers = {"Content-Type": "application/json"}
payload = {
"model": self.model,
"messages": messages,
"stream": True,
"keep_alive": -1
}
full_text = ""
try:
async with httpx.AsyncClient(timeout=None) as client:
async with client.stream("POST", self.url, json=payload, headers=headers) as response:
if response.status_code != 200:
return "", f"API Error: {response.status_code}"
async for line in response.aiter_lines():
line = line.strip()
if not line or not line.startswith("data: "):
continue
data = line[len("data: "):]
if data == "[DONE]":
break
try:
parsed = json.loads(data)
delta = parsed["choices"][0]["delta"].get("content", "")
full_text += delta
except json.JSONDecodeError:
continue
return full_text, ""
except httpx.RequestError as e:
return "", f"Connection error: {str(e)}"
class WorkerManager:
"""Manages async workers for processing message queues per channel."""
def __init__(self, process_callback):
self.queues: Dict[str, asyncio.Queue] = {}
self.semaphores: Dict[str, asyncio.Semaphore] = {}
self.active_channels: Set[str] = set()
self.process_callback = process_callback
async def enqueue(self, channel_id: str, item):
"""Adds an item to the channel's processing queue."""
if channel_id not in self.queues:
self.queues[channel_id] = asyncio.Queue()
if channel_id not in self.semaphores:
self.semaphores[channel_id] = asyncio.Semaphore(Config.MAX_CONCURRENT_WORKERS)
await self.queues[channel_id].put(item)
await self._start_workers(channel_id)
async def _start_workers(self, channel_id: str):
"""Starts worker tasks for a channel if not already running."""
if channel_id in self.active_channels:
return
self.active_channels.add(channel_id)
queue = self.queues[channel_id]
semaphore = self.semaphores[channel_id]
async def worker():
while True:
try:
# Wait for a task, but timeout if idle to clean up
try:
# Simple timeout to allow worker cleanup if queue is empty for a while
# In a real app, you might want more sophisticated lifecycle management
item = await asyncio.wait_for(queue.get(), timeout=60.0)
except asyncio.TimeoutError:
break
async with semaphore:
await self.process_callback(item)
queue.task_done()
except Exception as e:
logger.error(f"Worker error in channel {channel_id}: {e}")
# Launch workers
for _ in range(Config.MAX_CONCURRENT_WORKERS):
asyncio.create_task(worker())
# Note: In this simple implementation, we don't strictly wait for workers to finish
# before removing from active_channels to keep it simple.
# A more robust solution would track tasks.
class DiscordBot(discord.Client):
"""Main Bot Class."""
def __init__(self):
intents = discord.Intents.default()
intents.messages = True
intents.message_content = True
super().__init__(intents=intents)
self.db = DatabaseManager(Config.DB_NAME)
self.ollama = OllamaClient(Config.OLLAMA_MODEL, Config.OLLAMA_URL)
self.worker_manager = WorkerManager(self.process_message_task)
# In-memory conversation history: { "channel_id:user_id": [messages] }
self.conversations: Dict[str, List[Dict[str, str]]] = {}
async def on_ready(self):
logger.info(f"✅ Bot connected as {self.user}")
async def process_message_task(self, task_data):
"""Callback function executed by workers."""
message, prompt = task_data
channel_id = str(message.channel.id)
author_id = str(message.author.id)
conversation_key = f"{channel_id}:{author_id}"
# Initialize history if needed
if conversation_key not in self.conversations:
self.conversations[conversation_key] = []
# Update history
history = self.conversations[conversation_key]
history.append({"role": "user", "content": prompt})
# Trim history
if len(history) > Config.MAX_HISTORY_LENGTH:
history = history[-Config.MAX_HISTORY_LENGTH:]
self.conversations[conversation_key] = history
# Show typing indicator
async with message.channel.typing():
response_text, error = await self.ollama.generate_response(history)
if error:
await message.channel.send(f"❌ Error: {error}")
return
# Discord message limit handling
if len(response_text) > 2000:
response_text = response_text[-2000:]
await message.reply(response_text, mention_author=True)
# Update history with assistant response
history.append({"role": "assistant", "content": response_text})
# Log to DB
self.db.register_user(author_id, message.author.name, prompt)
async def on_message(self, message: discord.Message):
if message.author.bot:
return
content = message.content.strip()
channel_id = str(message.channel.id)
author_id = str(message.author.id)
# Command: Reset Context
if content == "!reset":
key = f"{channel_id}:{author_id}"
self.conversations.pop(key, None)
await message.channel.send("🔄 Conversation context reset.")
return
# Check if bot is mentioned or invoked via command
is_mentioned = self.user in message.mentions
is_command = content.lower().startswith("!ai ")
prompt = ""
if is_command:
prompt = content[4:].strip()
elif is_mentioned:
# Remove mention from prompt
prompt = content.replace(f"<@{self.user.id}>", "").strip()
if prompt:
await self.worker_manager.enqueue(channel_id, (message, prompt))
if __name__ == "__main__":
bot = DiscordBot()
bot.run(Config.DISCORD_TOKEN)
Configuração
-
Instale as dependências:
pip install discord.py httpx aiofiles -
Ollama: garanta que o Ollama está rodando (
ollama serve) e que você já puxou o modelo (ex.:ollama pull llama3). -
Token: substitua
DISCORD_TOKENna classeConfigpelo seu token real do bot, obtido no Discord Developer Portal.
Nunca faça commit do seu token real do Discord em um repositório público. Em produção, use variáveis de ambiente (por exemplo,
python-dotenv).