diff --git a/docs/adr/custom-gateway.md b/docs/adr/custom-gateway.md index 8b894baa..99d5bc0c 100644 --- a/docs/adr/custom-gateway.md +++ b/docs/adr/custom-gateway.md @@ -148,6 +148,9 @@ Key fields in the base schema: } ``` +Key fields in the outbound reply: +- **`reply_to`**: the `event_id` of the inbound `GatewayEvent` that triggered this reply. The gateway can use this for reply correlation — e.g., looking up a cached LINE reply token to prefer the free Reply API over the quota-consuming Push API. Empty string if the reply is not associated with a specific inbound event (e.g., cron-triggered messages). + ### Design Principles for the Schema - **Platform field is metadata, not routing logic** — OAB uses it for session key construction and sender context, but does not branch behavior on it diff --git a/src/adapter.rs b/src/adapter.rs index d64f1db1..a66b5aa9 100644 --- a/src/adapter.rs +++ b/src/adapter.rs @@ -22,7 +22,7 @@ use crate::reactions::StatusReactionController; /// Compare with `SenderContext`, which is **metadata for the agent**: there /// `channel_id` is the parent channel and `thread_id` is the thread, /// matching Slack's model for cross-platform consistency. -#[derive(Clone, Debug, Hash, Eq, PartialEq)] +#[derive(Clone, Debug)] pub struct ChannelRef { pub platform: String, pub channel_id: String, @@ -31,6 +31,31 @@ pub struct ChannelRef { pub thread_id: Option, /// Parent channel if this is a thread-as-channel (Discord). pub parent_id: Option, + /// Originating gateway event ID, propagated back in `GatewayReply.reply_to` + /// so the gateway can correlate replies with inbound events (e.g. LINE reply tokens). + /// Excluded from Hash/Eq — two ChannelRefs pointing to the same channel are + /// equal regardless of which event they originated from. + pub origin_event_id: Option, +} + +impl PartialEq for ChannelRef { + fn eq(&self, other: &Self) -> bool { + self.platform == other.platform + && self.channel_id == other.channel_id + && self.thread_id == other.thread_id + && self.parent_id == other.parent_id + } +} + +impl Eq for ChannelRef {} + +impl std::hash::Hash for ChannelRef { + fn hash(&self, state: &mut H) { + self.platform.hash(state); + self.channel_id.hash(state); + self.thread_id.hash(state); + self.parent_id.hash(state); + } } /// Identifies a message across platforms. @@ -540,4 +565,66 @@ mod tests { // Verify the method is callable and returns the declared value assert!(!adapter.use_streaming(false)); } + + #[test] + fn origin_event_id_excluded_from_eq() { + let a = ChannelRef { + platform: "line".into(), + channel_id: "U123".into(), + thread_id: None, + parent_id: None, + origin_event_id: Some("evt_aaa".into()), + }; + let b = ChannelRef { + platform: "line".into(), + channel_id: "U123".into(), + thread_id: None, + parent_id: None, + origin_event_id: Some("evt_bbb".into()), + }; + assert_eq!(a, b, "same channel with different event IDs must be equal"); + } + + #[test] + fn origin_event_id_excluded_from_hash() { + use std::collections::HashMap; + let a = ChannelRef { + platform: "line".into(), + channel_id: "U123".into(), + thread_id: None, + parent_id: None, + origin_event_id: Some("evt_aaa".into()), + }; + let b = ChannelRef { + platform: "line".into(), + channel_id: "U123".into(), + thread_id: None, + parent_id: None, + origin_event_id: Some("evt_bbb".into()), + }; + let mut map = HashMap::new(); + map.insert(a, "first"); + // b should hit the same bucket and overwrite + map.insert(b, "second"); + assert_eq!(map.len(), 1); + assert_eq!(map.values().next(), Some(&"second")); + } + + #[test] + fn origin_event_id_survives_clone() { + let ch = ChannelRef { + platform: "line".into(), + channel_id: "U123".into(), + thread_id: None, + parent_id: None, + origin_event_id: Some("evt_abc".into()), + }; + // Simulates create_thread propagation: clone preserves origin_event_id + let thread_ch = ChannelRef { + thread_id: Some("topic_1".into()), + origin_event_id: ch.origin_event_id.clone(), + ..ch.clone() + }; + assert_eq!(thread_ch.origin_event_id.as_deref(), Some("evt_abc")); + } } diff --git a/src/cron.rs b/src/cron.rs index d0866ee9..cf922192 100644 --- a/src/cron.rs +++ b/src/cron.rs @@ -229,6 +229,7 @@ async fn fire_cronjob( channel_id: job.channel.clone(), thread_id: job.thread_id.clone(), parent_id: None, + origin_event_id: None, }; // Send visible message first so users see what triggered diff --git a/src/discord.rs b/src/discord.rs index 3584786d..86e12990 100644 --- a/src/discord.rs +++ b/src/discord.rs @@ -99,6 +99,7 @@ impl ChatAdapter for DiscordAdapter { channel_id: thread.id.to_string(), thread_id: None, parent_id: Some(channel.channel_id.clone()), + origin_event_id: None, }) } @@ -582,6 +583,7 @@ impl EventHandler for Handler { channel_id: msg.channel_id.get().to_string(), thread_id: None, parent_id: thread_parent_id.clone(), + origin_event_id: None, } } else { match get_or_create_thread(&ctx, &adapter, &msg, &prompt).await { @@ -817,6 +819,7 @@ fn discord_msg_ref(msg: &Message) -> MessageRef { channel_id: msg.channel_id.get().to_string(), thread_id: None, parent_id: None, + origin_event_id: None, }, message_id: msg.id.to_string(), } @@ -837,6 +840,7 @@ async fn get_or_create_thread( channel_id: msg.channel_id.get().to_string(), thread_id: None, parent_id: None, + origin_event_id: None, }); } } @@ -847,6 +851,7 @@ async fn get_or_create_thread( channel_id: msg.channel_id.get().to_string(), thread_id: None, parent_id: None, + origin_event_id: None, }; let trigger_ref = discord_msg_ref(msg); match adapter.create_thread(&parent, &trigger_ref, &thread_name).await { @@ -877,6 +882,7 @@ async fn get_or_create_thread( channel_id: existing.id.to_string(), thread_id: None, parent_id: Some(msg.channel_id.get().to_string()), + origin_event_id: None, }) } Err(e) => Err(e), @@ -1446,6 +1452,7 @@ mod tests { channel_id: "111".into(), thread_id: None, parent_id: None, + origin_event_id: None, }; assert_eq!(DiscordAdapter::resolve_channel(&ch), "111"); } @@ -1457,6 +1464,7 @@ mod tests { channel_id: "111".into(), thread_id: Some("222".into()), parent_id: None, + origin_event_id: None, }; assert_eq!(DiscordAdapter::resolve_channel(&ch), "222"); } diff --git a/src/gateway.rs b/src/gateway.rs index a1721c4c..da51e4e1 100644 --- a/src/gateway.rs +++ b/src/gateway.rs @@ -15,7 +15,6 @@ use tracing::{error, info, warn}; struct GatewayEvent { #[allow(dead_code)] schema: String, - #[allow(dead_code)] event_id: String, #[allow(dead_code)] timestamp: String, @@ -139,7 +138,7 @@ impl ChatAdapter for GatewayAdapter { async fn send_message(&self, channel: &ChannelRef, content: &str) -> Result { let reply = GatewayReply { schema: "openab.gateway.reply.v1".into(), - reply_to: String::new(), + reply_to: channel.origin_event_id.clone().unwrap_or_default(), platform: channel.platform.clone(), channel: ReplyChannel { id: channel.channel_id.clone(), @@ -196,6 +195,7 @@ impl ChatAdapter for GatewayAdapter { channel_id: channel.channel_id.clone(), thread_id: resp.thread_id, parent_id: None, + origin_event_id: channel.origin_event_id.clone(), }), Ok(Ok(resp)) => { warn!(err = ?resp.error, "create_topic failed, falling back to same channel"); @@ -364,6 +364,7 @@ pub async fn run_gateway_adapter( channel_id: event.channel.id.clone(), thread_id: event.channel.thread_id.clone(), parent_id: None, + origin_event_id: Some(event.event_id.clone()), }; let sender_ctx = SenderContext { diff --git a/src/slack.rs b/src/slack.rs index c0b43b93..7619255f 100644 --- a/src/slack.rs +++ b/src/slack.rs @@ -350,6 +350,7 @@ impl ChatAdapter for SlackAdapter { channel_id: channel.channel_id.clone(), thread_id: channel.thread_id.clone(), parent_id: None, + origin_event_id: None, }, message_id: ts.to_string(), }) @@ -368,6 +369,7 @@ impl ChatAdapter for SlackAdapter { channel_id: channel.channel_id.clone(), thread_id: Some(trigger_msg.message_id.clone()), parent_id: None, + origin_event_id: None, }) } @@ -692,6 +694,7 @@ pub async fn run_slack_adapter( channel_id: channel_id.to_string(), thread_id: event["thread_ts"].as_str().map(|s| s.to_string()), parent_id: None, + origin_event_id: None, }; let _ = adapter.send_message(&warn_channel, &user_message).await; } @@ -956,6 +959,7 @@ async fn handle_message( channel_id: channel_id.clone(), thread_id: thread_ts.clone(), parent_id: None, + origin_event_id: None, }, message_id: ts.clone(), }; @@ -1021,6 +1025,7 @@ async fn handle_message( channel_id: channel_id.clone(), thread_id: thread_ts.clone(), parent_id: None, + origin_event_id: None, }, message_id: ts.clone(), }; @@ -1096,6 +1101,7 @@ async fn handle_message( channel_id: channel_id.clone(), thread_id: thread_ts.clone(), parent_id: None, + origin_event_id: None, }, message_id: ts.clone(), }; @@ -1106,6 +1112,7 @@ async fn handle_message( channel_id: channel_id.clone(), thread_id: Some(thread_ts.unwrap_or(ts)), parent_id: None, + origin_event_id: None, }; // Serialize sender context with Slack-native key names so agents calling