Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions docs/adr/custom-gateway.md
Original file line number Diff line number Diff line change
Expand Up @@ -148,6 +148,9 @@ Key fields in the base schema:
}
```

Key fields in the outbound reply:
- **`reply_to`**: the `event_id` of the inbound `GatewayEvent` that triggered this reply. The gateway can use this for reply correlation — e.g., looking up a cached LINE reply token to prefer the free Reply API over the quota-consuming Push API. Empty string if the reply is not associated with a specific inbound event (e.g., cron-triggered messages).

### Design Principles for the Schema

- **Platform field is metadata, not routing logic** — OAB uses it for session key construction and sender context, but does not branch behavior on it
Expand Down
89 changes: 88 additions & 1 deletion src/adapter.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ use crate::reactions::StatusReactionController;
/// Compare with `SenderContext`, which is **metadata for the agent**: there
/// `channel_id` is the parent channel and `thread_id` is the thread,
/// matching Slack's model for cross-platform consistency.
#[derive(Clone, Debug, Hash, Eq, PartialEq)]
#[derive(Clone, Debug)]
pub struct ChannelRef {
pub platform: String,
pub channel_id: String,
Expand All @@ -31,6 +31,31 @@ pub struct ChannelRef {
pub thread_id: Option<String>,
/// Parent channel if this is a thread-as-channel (Discord).
pub parent_id: Option<String>,
/// Originating gateway event ID, propagated back in `GatewayReply.reply_to`
/// so the gateway can correlate replies with inbound events (e.g. LINE reply tokens).
/// Excluded from Hash/Eq — two ChannelRefs pointing to the same channel are
/// equal regardless of which event they originated from.
pub origin_event_id: Option<String>,
}

impl PartialEq for ChannelRef {
fn eq(&self, other: &Self) -> bool {
self.platform == other.platform
&& self.channel_id == other.channel_id
&& self.thread_id == other.thread_id
&& self.parent_id == other.parent_id
}
}

impl Eq for ChannelRef {}

impl std::hash::Hash for ChannelRef {
fn hash<H: std::hash::Hasher>(&self, state: &mut H) {
self.platform.hash(state);
self.channel_id.hash(state);
self.thread_id.hash(state);
self.parent_id.hash(state);
}
}

/// Identifies a message across platforms.
Expand Down Expand Up @@ -540,4 +565,66 @@ mod tests {
// Verify the method is callable and returns the declared value
assert!(!adapter.use_streaming(false));
}

#[test]
fn origin_event_id_excluded_from_eq() {
let a = ChannelRef {
platform: "line".into(),
channel_id: "U123".into(),
thread_id: None,
parent_id: None,
origin_event_id: Some("evt_aaa".into()),
};
let b = ChannelRef {
platform: "line".into(),
channel_id: "U123".into(),
thread_id: None,
parent_id: None,
origin_event_id: Some("evt_bbb".into()),
};
assert_eq!(a, b, "same channel with different event IDs must be equal");
}

#[test]
fn origin_event_id_excluded_from_hash() {
use std::collections::HashMap;
let a = ChannelRef {
platform: "line".into(),
channel_id: "U123".into(),
thread_id: None,
parent_id: None,
origin_event_id: Some("evt_aaa".into()),
};
let b = ChannelRef {
platform: "line".into(),
channel_id: "U123".into(),
thread_id: None,
parent_id: None,
origin_event_id: Some("evt_bbb".into()),
};
let mut map = HashMap::new();
map.insert(a, "first");
// b should hit the same bucket and overwrite
map.insert(b, "second");
assert_eq!(map.len(), 1);
assert_eq!(map.values().next(), Some(&"second"));
}

#[test]
fn origin_event_id_survives_clone() {
let ch = ChannelRef {
platform: "line".into(),
channel_id: "U123".into(),
thread_id: None,
parent_id: None,
origin_event_id: Some("evt_abc".into()),
};
// Simulates create_thread propagation: clone preserves origin_event_id
let thread_ch = ChannelRef {
thread_id: Some("topic_1".into()),
origin_event_id: ch.origin_event_id.clone(),
..ch.clone()
};
assert_eq!(thread_ch.origin_event_id.as_deref(), Some("evt_abc"));
}
}
1 change: 1 addition & 0 deletions src/cron.rs
Original file line number Diff line number Diff line change
Expand Up @@ -229,6 +229,7 @@ async fn fire_cronjob(
channel_id: job.channel.clone(),
thread_id: job.thread_id.clone(),
parent_id: None,
origin_event_id: None,
};

// Send visible message first so users see what triggered
Expand Down
8 changes: 8 additions & 0 deletions src/discord.rs
Original file line number Diff line number Diff line change
Expand Up @@ -99,6 +99,7 @@ impl ChatAdapter for DiscordAdapter {
channel_id: thread.id.to_string(),
thread_id: None,
parent_id: Some(channel.channel_id.clone()),
origin_event_id: None,
})
}

Expand Down Expand Up @@ -582,6 +583,7 @@ impl EventHandler for Handler {
channel_id: msg.channel_id.get().to_string(),
thread_id: None,
parent_id: thread_parent_id.clone(),
origin_event_id: None,
}
} else {
match get_or_create_thread(&ctx, &adapter, &msg, &prompt).await {
Expand Down Expand Up @@ -817,6 +819,7 @@ fn discord_msg_ref(msg: &Message) -> MessageRef {
channel_id: msg.channel_id.get().to_string(),
thread_id: None,
parent_id: None,
origin_event_id: None,
},
message_id: msg.id.to_string(),
}
Expand All @@ -837,6 +840,7 @@ async fn get_or_create_thread(
channel_id: msg.channel_id.get().to_string(),
thread_id: None,
parent_id: None,
origin_event_id: None,
});
}
}
Expand All @@ -847,6 +851,7 @@ async fn get_or_create_thread(
channel_id: msg.channel_id.get().to_string(),
thread_id: None,
parent_id: None,
origin_event_id: None,
};
let trigger_ref = discord_msg_ref(msg);
match adapter.create_thread(&parent, &trigger_ref, &thread_name).await {
Expand Down Expand Up @@ -877,6 +882,7 @@ async fn get_or_create_thread(
channel_id: existing.id.to_string(),
thread_id: None,
parent_id: Some(msg.channel_id.get().to_string()),
origin_event_id: None,
})
}
Err(e) => Err(e),
Expand Down Expand Up @@ -1446,6 +1452,7 @@ mod tests {
channel_id: "111".into(),
thread_id: None,
parent_id: None,
origin_event_id: None,
};
assert_eq!(DiscordAdapter::resolve_channel(&ch), "111");
}
Expand All @@ -1457,6 +1464,7 @@ mod tests {
channel_id: "111".into(),
thread_id: Some("222".into()),
parent_id: None,
origin_event_id: None,
};
assert_eq!(DiscordAdapter::resolve_channel(&ch), "222");
}
Expand Down
5 changes: 3 additions & 2 deletions src/gateway.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,6 @@ use tracing::{error, info, warn};
struct GatewayEvent {
#[allow(dead_code)]
schema: String,
#[allow(dead_code)]
event_id: String,
#[allow(dead_code)]
timestamp: String,
Expand Down Expand Up @@ -139,7 +138,7 @@ impl ChatAdapter for GatewayAdapter {
async fn send_message(&self, channel: &ChannelRef, content: &str) -> Result<MessageRef> {
let reply = GatewayReply {
schema: "openab.gateway.reply.v1".into(),
reply_to: String::new(),
reply_to: channel.origin_event_id.clone().unwrap_or_default(),
platform: channel.platform.clone(),
channel: ReplyChannel {
id: channel.channel_id.clone(),
Expand Down Expand Up @@ -196,6 +195,7 @@ impl ChatAdapter for GatewayAdapter {
channel_id: channel.channel_id.clone(),
thread_id: resp.thread_id,
parent_id: None,
origin_event_id: channel.origin_event_id.clone(),
}),
Ok(Ok(resp)) => {
warn!(err = ?resp.error, "create_topic failed, falling back to same channel");
Expand Down Expand Up @@ -364,6 +364,7 @@ pub async fn run_gateway_adapter(
channel_id: event.channel.id.clone(),
thread_id: event.channel.thread_id.clone(),
parent_id: None,
origin_event_id: Some(event.event_id.clone()),
};

let sender_ctx = SenderContext {
Expand Down
7 changes: 7 additions & 0 deletions src/slack.rs
Original file line number Diff line number Diff line change
Expand Up @@ -350,6 +350,7 @@ impl ChatAdapter for SlackAdapter {
channel_id: channel.channel_id.clone(),
thread_id: channel.thread_id.clone(),
parent_id: None,
origin_event_id: None,
},
message_id: ts.to_string(),
})
Expand All @@ -368,6 +369,7 @@ impl ChatAdapter for SlackAdapter {
channel_id: channel.channel_id.clone(),
thread_id: Some(trigger_msg.message_id.clone()),
parent_id: None,
origin_event_id: None,
})
}

Expand Down Expand Up @@ -692,6 +694,7 @@ pub async fn run_slack_adapter(
channel_id: channel_id.to_string(),
thread_id: event["thread_ts"].as_str().map(|s| s.to_string()),
parent_id: None,
origin_event_id: None,
};
let _ = adapter.send_message(&warn_channel, &user_message).await;
}
Expand Down Expand Up @@ -956,6 +959,7 @@ async fn handle_message(
channel_id: channel_id.clone(),
thread_id: thread_ts.clone(),
parent_id: None,
origin_event_id: None,
},
message_id: ts.clone(),
};
Expand Down Expand Up @@ -1021,6 +1025,7 @@ async fn handle_message(
channel_id: channel_id.clone(),
thread_id: thread_ts.clone(),
parent_id: None,
origin_event_id: None,
},
message_id: ts.clone(),
};
Expand Down Expand Up @@ -1096,6 +1101,7 @@ async fn handle_message(
channel_id: channel_id.clone(),
thread_id: thread_ts.clone(),
parent_id: None,
origin_event_id: None,
},
message_id: ts.clone(),
};
Expand All @@ -1106,6 +1112,7 @@ async fn handle_message(
channel_id: channel_id.clone(),
thread_id: Some(thread_ts.unwrap_or(ts)),
parent_id: None,
origin_event_id: None,
};

// Serialize sender context with Slack-native key names so agents calling
Expand Down
Loading