Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
68 changes: 46 additions & 22 deletions src/adapter.rs
Original file line number Diff line number Diff line change
Expand Up @@ -87,6 +87,10 @@ pub struct SenderContext {
#[serde(skip_serializing_if = "Option::is_none")]
pub thread_id: Option<String>,
pub is_bot: bool,
/// Platform message creation time (ISO 8601 UTC).
/// Discord/Slack: platform timestamp. Gateway: broker receive time (best-effort).
/// Additive field — schema stays openab.sender.v1.
pub timestamp: String,
}

// --- ChatAdapter trait ---
Expand Down Expand Up @@ -160,6 +164,32 @@ impl AdapterRouter {
&self.pool
}

/// Access the reactions config (used by dispatch.rs).
pub fn reactions_config(&self) -> &ReactionsConfig {
&self.reactions_config
}

/// Pack one arrival event into ContentBlocks using the uniform per-arrival template:
/// Text { "<sender_context>\n{json}\n</sender_context>\n\n{prompt}" }
/// [extra_blocks in arrival order]
///
/// This is the single packing code path for both per-message and batched dispatch
/// (ADR §3.5). For a batch of N messages, call this N times and concatenate.
pub fn pack_arrival_event(
sender_json: &str,
prompt: &str,
extra_blocks: Vec<ContentBlock>,
) -> Vec<ContentBlock> {
let header = format!(
"<sender_context>\n{}\n</sender_context>\n\n{}",
sender_json, prompt
);
let mut blocks = Vec::with_capacity(1 + extra_blocks.len());
blocks.push(ContentBlock::Text { text: header });
blocks.extend(extra_blocks);
blocks
}

/// Handle an incoming user message. The adapter is responsible for
/// filtering, resolving the thread, and building the SenderContext.
/// This method handles sender context injection, session management, and streaming.
Expand All @@ -176,28 +206,7 @@ impl AdapterRouter {
) -> Result<()> {
tracing::debug!(platform = adapter.platform(), "processing message");

// Build content blocks: sender context + prompt text, then extra (images, transcripts)
let prompt_with_sender = format!(
"<sender_context>\n{}\n</sender_context>\n\n{}",
sender_json, prompt
);

let mut content_blocks = Vec::with_capacity(1 + extra_blocks.len());
// Prepend any transcript blocks (they go before the text block)
for block in &extra_blocks {
if matches!(block, ContentBlock::Text { .. }) {
content_blocks.push(block.clone());
}
}
content_blocks.push(ContentBlock::Text {
text: prompt_with_sender,
});
// Append non-text blocks (images)
for block in extra_blocks {
if !matches!(block, ContentBlock::Text { .. }) {
content_blocks.push(block);
}
}
let content_blocks = Self::pack_arrival_event(sender_json, prompt, extra_blocks);

let thread_key = format!(
"{}:{}",
Expand Down Expand Up @@ -272,6 +281,21 @@ impl AdapterRouter {
thread_channel: &ChannelRef,
reactions: Arc<StatusReactionController>,
other_bot_present: bool,
) -> Result<()> {
self.stream_prompt_blocks(adapter, thread_key, content_blocks, thread_channel, reactions, other_bot_present).await
}

/// Drive one ACP turn with the given pre-packed ContentBlocks.
/// Called by both `handle_message` (per-message mode) and `dispatch::dispatch_batch`
/// (batched mode).
pub async fn stream_prompt_blocks(
&self,
adapter: &Arc<dyn ChatAdapter>,
thread_key: &str,
content_blocks: Vec<ContentBlock>,
thread_channel: &ChannelRef,
reactions: Arc<StatusReactionController>,
other_bot_present: bool,
) -> Result<()> {
let adapter = adapter.clone();
let thread_channel = thread_channel.clone();
Expand Down
54 changes: 53 additions & 1 deletion src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,29 @@ use serde::Deserialize;
use std::collections::HashMap;
use std::path::Path;

/// Controls how incoming messages are dispatched to ACP turns.
///
/// - `PerMessage` (default): each message becomes its own ACP turn (v0.8.2-beta.1 behaviour).
/// - `Batched`: messages that arrive while a turn is in flight are buffered and merged
/// into one ACP turn at the next turn boundary (see ADR: turn-boundary-batching-adr.md).
#[derive(Debug, Clone, Copy, Default, PartialEq, Eq)]
pub enum MessageProcessingMode {
#[default]
PerMessage,
Batched,
}

impl<'de> Deserialize<'de> for MessageProcessingMode {
fn deserialize<D: serde::Deserializer<'de>>(deserializer: D) -> Result<Self, D::Error> {
let s = String::deserialize(deserializer)?;
match s.to_lowercase().replace('-', "_").as_str() {
"per_message" | "per-message" => Ok(Self::PerMessage),
"batched" => Ok(Self::Batched),
other => Err(serde::de::Error::unknown_variant(other, &["per-message", "batched"])),
}
}
}

/// Controls whether the bot processes messages from other Discord bots.
///
/// Inspired by Hermes Agent's `DISCORD_ALLOW_BOTS` 3-value design:
Expand Down Expand Up @@ -120,9 +143,20 @@ pub struct DiscordConfig {
/// Default: false (opt-in). `allowed_users` still applies in DMs.
#[serde(default)]
pub allow_dm: bool,
/// Message dispatch mode. Default: per-message (v0.8.2-beta.1 behaviour).
#[serde(default)]
pub message_processing_mode: MessageProcessingMode,
/// Batched mode only: per-thread channel capacity. Default: 10.
#[serde(default = "default_max_buffered_messages")]
pub max_buffered_messages: usize,
/// Batched mode only: soft token cap for greedy drain. Default: 24000.
#[serde(default = "default_max_batch_tokens")]
pub max_batch_tokens: usize,
}

fn default_max_bot_turns() -> u32 { 20 }
fn default_max_buffered_messages() -> usize { 10 }
fn default_max_batch_tokens() -> usize { 24_000 }

/// Controls whether the bot responds to user messages in threads without @mention.
///
Expand Down Expand Up @@ -179,6 +213,15 @@ pub struct SlackConfig {
/// Human message resets the counter. Default: 20.
#[serde(default = "default_max_bot_turns")]
pub max_bot_turns: u32,
/// Message dispatch mode. Default: per-message.
#[serde(default)]
pub message_processing_mode: MessageProcessingMode,
/// Batched mode only: per-thread channel capacity. Default: 10.
#[serde(default = "default_max_buffered_messages")]
pub max_buffered_messages: usize,
/// Batched mode only: soft token cap for greedy drain. Default: 24000.
#[serde(default = "default_max_batch_tokens")]
pub max_batch_tokens: usize,
}

#[derive(Debug, Deserialize)]
Expand All @@ -202,6 +245,15 @@ pub struct GatewayConfig {
pub allowed_channels: Vec<String>,
#[serde(default)]
pub allowed_users: Vec<String>,
/// Message dispatch mode. Default: per-message.
#[serde(default)]
pub message_processing_mode: MessageProcessingMode,
/// Batched mode only: per-thread channel capacity. Default: 10.
#[serde(default = "default_max_buffered_messages")]
pub max_buffered_messages: usize,
/// Batched mode only: soft token cap for greedy drain. Default: 24000.
#[serde(default = "default_max_batch_tokens")]
pub max_batch_tokens: usize,
}

fn default_gateway_platform() -> String {
Expand Down Expand Up @@ -280,7 +332,7 @@ impl<'de> Deserialize<'de> for ToolDisplay {
}
}

#[derive(Debug, Deserialize)]
#[derive(Debug, Clone, Deserialize)]
pub struct ReactionsConfig {
#[serde(default = "default_true")]
pub enabled: bool,
Expand Down
1 change: 1 addition & 0 deletions src/cron.rs
Original file line number Diff line number Diff line change
Expand Up @@ -350,6 +350,7 @@ async fn fire_cronjob(
channel_id: reply_channel.parent_id.as_deref().unwrap_or(&reply_channel.channel_id).to_string(),
thread_id: reply_channel.thread_id.clone().or(Some(reply_channel.channel_id.clone())),
is_bot: true,
timestamp: Utc::now().to_rfc3339(),
};
let sender_json = match serde_json::to_string(&sender) {
Ok(j) => j,
Expand Down
78 changes: 69 additions & 9 deletions src/discord.rs
Original file line number Diff line number Diff line change
Expand Up @@ -160,6 +160,9 @@ pub struct Handler {
pub bot_turns: tokio::sync::Mutex<BotTurnTracker>,
/// Allow the bot to respond to Discord DMs.
pub allow_dm: bool,
/// Batched-mode dispatcher (None in per-message mode).
pub dispatcher: Option<Arc<crate::dispatch::Dispatcher>>,
pub message_processing_mode: crate::config::MessageProcessingMode,
}

impl Handler {
Expand Down Expand Up @@ -527,6 +530,7 @@ impl EventHandler for Handler {
&msg.channel_id.to_string(),
thread_parent_id.as_deref(),
msg.author.bot,
&msg.timestamp.to_rfc3339().unwrap_or_default(),
);

// Build extra content blocks from attachments (audio → STT, text → inline, image → encode)
Expand Down Expand Up @@ -622,19 +626,58 @@ impl EventHandler for Handler {
let trigger_msg = discord_msg_ref(&msg);

// Per-thread streaming: check if another bot is present in this thread
let other_bot_present = {
let other_bot_present_flag = {
let cache = self.multibot_threads.lock().await;
cache.contains_key(&msg.channel_id.to_string())
};

let router = self.router.clone();
let mode = self.message_processing_mode;
let dispatcher = self.dispatcher.clone();

tokio::spawn(async move {
let sender_json = serde_json::to_string(&sender).unwrap();
if let Err(e) = router
.handle_message(&adapter, &thread_channel, &sender_json, &prompt, extra_blocks, &trigger_msg, other_bot_present)
.await
{
error!("handle_message error: {e}");
match mode {
crate::config::MessageProcessingMode::PerMessage => {
if let Err(e) = router
.handle_message(
&adapter,
&thread_channel,
&sender_json,
&prompt,
extra_blocks,
&trigger_msg,
other_bot_present_flag,
)
.await
{
error!("handle_message error: {e}");
}
}
crate::config::MessageProcessingMode::Batched => {
if let Some(dispatcher) = dispatcher {
let thread_key = format!("discord:{}", thread_channel.channel_id);
let estimated_tokens =
crate::dispatch::estimate_tokens(&prompt, &extra_blocks);
let buf_msg = crate::dispatch::BufferedMessage {
sender_json,
prompt,
extra_blocks,
trigger_msg,
arrived_at: std::time::Instant::now(),
estimated_tokens,
other_bot_present: other_bot_present_flag,
};
if let Err(e) = dispatcher
.submit(thread_key, thread_channel, adapter, buf_msg)
.await
{
error!("dispatcher submit error: {e}");
}
} else {
error!("batched mode enabled but no dispatcher configured");
}
}
}
});
}
Expand Down Expand Up @@ -854,10 +897,25 @@ impl Handler {
cmd: &serenity::model::application::CommandInteraction,
) {
let thread_key = format!("discord:{}", cmd.channel_id.get());

// Drop any messages buffered for this thread before resetting the session.
// /reset semantics include /cancel-all: discard buffered work, then reset.
let dropped = self
.dispatcher
.as_ref()
.map(|d| d.cancel_buffered(&thread_key))
.unwrap_or(0);

let result = self.router.pool().reset_session(&thread_key).await;

let msg = match result {
Ok(()) if dropped > 0 => {
format!("🔄 Session reset. Dropped {dropped} buffered message(s). Start a new conversation!")
}
Ok(()) => "🔄 Session reset. Start a new conversation!".to_string(),
Err(_) if dropped > 0 => {
format!("🔄 Dropped {dropped} buffered message(s). No active session to reset.")
}
Err(_) => "⚠️ No active session to reset. Start a conversation first by @mentioning the bot.".to_string(),
};

Expand Down Expand Up @@ -1096,6 +1154,7 @@ fn build_sender_context(
msg_channel_id: &str,
thread_parent_id: Option<&str>,
is_bot: bool,
timestamp: &str,
) -> SenderContext {
SenderContext {
schema: "openab.sender.v1".into(),
Expand All @@ -1106,6 +1165,7 @@ fn build_sender_context(
channel_id: thread_parent_id.unwrap_or(msg_channel_id).to_string(),
thread_id: thread_parent_id.map(|_| msg_channel_id.to_string()),
is_bot,
timestamp: timestamp.to_string(),
}
}

Expand Down Expand Up @@ -1430,7 +1490,7 @@ mod tests {
/// In-thread message: channel_id = parent, thread_id = thread channel ID.
#[test]
fn build_sender_context_in_thread() {
let ctx = build_sender_context("user1", "alice", "Alice", "thread_ch", Some("parent_ch"), false);
let ctx = build_sender_context("user1", "alice", "Alice", "thread_ch", Some("parent_ch"), false, "2026-05-01T00:00:00Z");
assert_eq!(ctx.channel_id, "parent_ch");
assert_eq!(ctx.thread_id, Some("thread_ch".to_string()));
assert_eq!(ctx.channel, "discord");
Expand All @@ -1441,15 +1501,15 @@ mod tests {
/// Non-thread message: channel_id = message channel, thread_id = None.
#[test]
fn build_sender_context_not_in_thread() {
let ctx = build_sender_context("user1", "alice", "Alice", "main_ch", None, false);
let ctx = build_sender_context("user1", "alice", "Alice", "main_ch", None, false, "2026-05-01T00:00:00Z");
assert_eq!(ctx.channel_id, "main_ch");
assert_eq!(ctx.thread_id, None);
}

/// Bot sender: is_bot flag propagated correctly.
#[test]
fn build_sender_context_bot_sender() {
let ctx = build_sender_context("bot1", "mybot", "MyBot", "ch", Some("parent"), true);
let ctx = build_sender_context("bot1", "mybot", "MyBot", "ch", Some("parent"), true, "2026-05-01T00:00:00Z");
assert!(ctx.is_bot);
assert_eq!(ctx.channel_id, "parent");
assert_eq!(ctx.thread_id, Some("ch".to_string()));
Expand Down
Loading
Loading