diff --git a/docs/adr/line-adapter.md b/docs/adr/line-adapter.md index f899a747..88460530 100644 --- a/docs/adr/line-adapter.md +++ b/docs/adr/line-adapter.md @@ -1,8 +1,9 @@ # ADR: LINE Messaging API Adapter -- **Status:** Proposed +- **Status:** Accepted - **Date:** 2026-04-22 -- **Author:** @chaodu-agent +- **Last Updated:** 2026-04-28 +- **Author:** @chaodu-agent, @iamninihuang --- @@ -101,17 +102,57 @@ LINE is not ideal when: - Group → line:{groupId} - Room → line:{roomId} 6. Message is routed to AdapterRouter → ACP Session Pool → kiro-cli process -7. Agent response is sent back via LINE Push Message API +7. Agent response is sent back via LINE Reply API (free) or Push Message API (fallback) ``` -### Reply Strategy: Push Messages +### Hybrid Reply/Push Dispatch Flow -LINE offers two reply mechanisms: -- **Reply message**: uses a reply token, but the token expires in 1 minute -- **Push message**: no time limit, can send to any user/group at any time +``` +LINE User Gateway OAB Core + │ │ │ + │ message + replyToken │ │ + │ ─────────────────────────▶ │ │ + │ │ 1. Verify HMAC signature │ + │ │ 2. Generate event_id (UUID) │ + │ │ 3. Cache: │ + │ │ event_id → replyToken │ + │ │ (TTL 50s, max 10k) │ + │ │ │ + │ │ GatewayEvent { event_id } │ + │ │ ─────────────────────────────▶│ + │ │ │ Store event_id in + │ │ │ ChannelRef.origin_event_id + │ │ │ + │ │ │ Agent processes... + │ │ │ + │ │ GatewayReply { │ + │ │ reply_to: event_id │ + │ │ } │ + │ │ ◀─────────────────────────────│ + │ │ │ + │ │ 4. Lookup cache(event_id) │ + │ │ ├─ HIT + fresh │ + │ Reply API (FREE) ✅ │ │ → Reply API │ + │ ◀──────────────────────────│ │ │ + │ │ ├─ HIT + expired │ + │ Push API (quota) 💰 │ │ → Push API fallback │ + │ ◀──────────────────────────│ │ │ + │ │ └─ MISS │ + │ Push API (quota) 💰 │ → Push API fallback │ + │ ◀──────────────────────────│ │ +``` -OpenAB uses **push messages** because agent processing typically exceeds the 1-minute reply token window. The trade-off is that push messages count against the monthly messaging quota on free-tier LINE accounts. +### Reply Strategy: Hybrid Reply/Push Messages +LINE offers two reply mechanisms: +- **Reply message**: uses a reply token, but the token expires in 1 minute (free). +- **Push message**: no time limit, can send to any user/group at any time (consumes quota). + +Historically, OpenAB relied solely on **push messages** because agent processing can exceed the 1-minute reply token window. To optimize costs for free-tier accounts, OpenAB now uses a **Hybrid Strategy** implemented at the gateway level: +1. The gateway caches incoming `replyToken`s keyed by `event_id` with a 50-second TTL. +2. When OAB replies with a non-empty `reply_to` that matches a cached entry, the gateway routes the message via the free **Reply API**. +3. If the token is expired, missing, or `reply_to` is empty, the gateway falls back to the **Push API**. +4. A background task sweeps expired cache entries to prevent memory growth. --- ## 3. Architectural Differences from Discord/Slack @@ -381,7 +422,7 @@ For v1: - LINE users can interact with OpenAB agents without switching to Discord or Slack - The inbound webhook pattern opens the door for future webhook-based platforms (Telegram, WhatsApp, etc.) - Using `axum` for the HTTP server provides a solid foundation for a general-purpose webhook gateway -- Push message strategy avoids the 1-minute reply token limitation, enabling long-running agent tasks +- Hybrid reply/push strategy optimizes cost: the gateway opportunistically uses the free Reply API when the agent responds within the token TTL, falling back to Push API for longer-running tasks ### Negative @@ -414,8 +455,9 @@ To ensure this ADR is followed in implementation and future changes: ## Notes -- **Version:** 0.1 +- **Version:** 0.2 - **Changelog:** + - 0.2 (2026-04-28): Hybrid Reply/Push strategy implemented (#608). Updated status to Accepted. Added dispatch flow diagram. Reply strategy section rewritten from Push-only to hybrid. Core propagates `event_id` via `ChannelRef.origin_event_id` (#619). - 0.1 (2026-04-22): Initial proposed version --- diff --git a/gateway/README.md b/gateway/README.md index 6e2f65dd..2a06e49d 100644 --- a/gateway/README.md +++ b/gateway/README.md @@ -47,12 +47,15 @@ url = "ws://gateway:8080/ws" | `TELEGRAM_BOT_TOKEN` | (required) | Telegram Bot API token | | `GATEWAY_LISTEN` | `0.0.0.0:8080` | Listen address | | `TELEGRAM_WEBHOOK_PATH` | `/webhook/telegram` | Webhook endpoint path | +| `LINE_CHANNEL_SECRET` | (optional) | LINE channel secret for webhook HMAC signature verification | +| `LINE_CHANNEL_ACCESS_TOKEN` | (optional) | LINE channel access token for Reply/Push API | ### Endpoints | Path | Description | |---|---| | `POST /webhook/telegram` | Telegram webhook receiver | +| `POST /webhook/line` | LINE webhook receiver | | `GET /ws` | WebSocket server (OAB connects here) | | `GET /health` | Health check | diff --git a/gateway/src/main.rs b/gateway/src/main.rs index 0c9b948b..67fb2b34 100644 --- a/gateway/src/main.rs +++ b/gateway/src/main.rs @@ -8,6 +8,7 @@ use axum::{ use futures_util::{SinkExt, StreamExt}; use serde::{Deserialize, Serialize}; use std::sync::Arc; +use std::time::Instant; use tokio::sync::{broadcast, Mutex}; use tracing::{error, info, warn}; @@ -126,6 +127,19 @@ struct TelegramUser { // --- App state --- +/// Cache entry for LINE reply tokens: (replyToken, insertion_time). +/// Uses std::sync::Mutex — critical sections are short (insert/remove/retain) +/// and never held across .await, so async Mutex overhead is unnecessary. +type ReplyTokenCache = Arc>>; + +/// Maximum age (in seconds) before a cached reply token is considered expired. +/// LINE tokens are valid for ~1 minute; we use 50s as a conservative margin. +const REPLY_TOKEN_TTL_SECS: u64 = 50; + +/// Maximum number of cached reply tokens. Prevents unbounded memory growth +/// if webhooks arrive faster than OAB can reply (e.g. OAB offline, spam burst). +const REPLY_TOKEN_CACHE_MAX: usize = 10_000; + struct AppState { bot_token: String, secret_token: Option, @@ -134,6 +148,11 @@ struct AppState { line_access_token: Option, /// Broadcast channel: gateway → OAB (events) event_tx: broadcast::Sender, + /// Cache: event_id → (LINE replyToken, timestamp). + /// Global across all OAB WebSocket clients. LINE reply tokens are single-use: + /// the first client to `remove()` a token wins the free Reply API call; + /// other clients for the same event naturally fall back to Push API. + reply_token_cache: ReplyTokenCache, } // --- Telegram webhook handler --- @@ -329,9 +348,22 @@ async fn line_webhook( .and_then(|s| s.user_id.as_deref()) .unwrap_or("unknown"); + let event_id = format!("evt_{}", uuid::Uuid::new_v4()); + + // Cache the reply token for hybrid Reply/Push dispatch + if let Some(ref reply_token) = event.reply_token { + let mut cache = state.reply_token_cache.lock().unwrap_or_else(|e| e.into_inner()); + if cache.len() >= REPLY_TOKEN_CACHE_MAX { + warn!(size = cache.len(), "reply token cache full, skipping insert"); + } else { + cache.insert(event_id.clone(), (reply_token.clone(), Instant::now())); + info!(event_id = %event_id, "cached LINE replyToken"); + } + } + let gateway_event = GatewayEvent { schema: "openab.gateway.event.v1".into(), - event_id: format!("evt_{}", uuid::Uuid::new_v4()), + event_id, timestamp: chrono::Utc::now().to_rfc3339(), platform: "line".into(), event_type: "message".into(), @@ -400,7 +432,6 @@ async fn handle_oab_connection(state: Arc, socket: axum::extract::ws:: break; } } - // No reply forwarding needed on this path — replies go to Telegram directly } } }); @@ -408,6 +439,7 @@ async fn handle_oab_connection(state: Arc, socket: axum::extract::ws:: // Receive OAB replies → Telegram let bot_token = state.bot_token.clone(); let line_access_token = state.line_access_token.clone(); + let reply_cache = state.reply_token_cache.clone(); let event_tx_for_recv = state.event_tx.clone(); // Track per-message reaction state (Telegram replaces all reactions atomically) let reaction_state: Arc>>> = @@ -535,19 +567,83 @@ async fn handle_oab_connection(state: Arc, socket: axum::extract::ws:: // Normal send_message — route by platform if reply.platform == "line" { - // LINE Push Message API - if let Some(ref token) = line_access_token { - info!(to = %reply.channel.id, "gateway → line"); - let _ = client - .post("https://api.line.me/v2/bot/message/push") - .bearer_auth(token) - .json(&serde_json::json!({ - "to": reply.channel.id, - "messages": [{"type": "text", "text": reply.content.text}] - })) - .send() - .await - .map_err(|e| error!("line send error: {e}")); + if let Some(ref access_token) = line_access_token { + // Extract token from cache (drop lock before HTTP call) + let cached_token = { + let mut cache = reply_cache.lock().unwrap_or_else(|e| e.into_inner()); + cache + .remove(&reply.reply_to) + .and_then(|(token, cached_at)| { + if cached_at.elapsed().as_secs() < REPLY_TOKEN_TTL_SECS + { + Some(token) + } else { + info!("LINE replyToken expired, using Push API"); + None + } + }) + }; + + // Try Reply API first (free, no quota consumed) + let mut used_reply = false; + if let Some(reply_token) = cached_token { + info!(to = %reply.channel.id, "gateway → line (reply API)"); + let resp = client + .post("https://api.line.me/v2/bot/message/reply") + .bearer_auth(access_token) + .json(&serde_json::json!({ + "replyToken": reply_token, + "messages": [{"type": "text", "text": reply.content.text}] + })) + .send() + .await; + match resp { + Ok(r) if r.status().is_success() => { + used_reply = true; + } + Ok(r) => { + let status = r.status(); + let body = r.text().await.unwrap_or_default(); + // Only fallback to Push when LINE explicitly says + // the reply token is unusable (invalid/expired). + // LINE returns "Invalid reply token" or "expired" + // in the error body for token-specific failures. + let body_lower = body.to_lowercase(); + let token_unusable = status.as_u16() == 400 + && ((body_lower.contains("invalid") + && body_lower.contains("reply token")) + || body_lower.contains("expired")); + if token_unusable { + warn!(status = %status, body = %body, "LINE reply token unusable, falling back to Push"); + } else { + // Ambiguous: 5xx, other 4xx, or unrecognized 400. + // Message may have been delivered — do NOT fallback. + error!(status = %status, body = %body, "LINE Reply API error, NOT falling back to Push (possible duplicate risk)"); + used_reply = true; + } + } + Err(e) => { + // Network/timeout error: delivery ambiguous, do NOT fallback + error!(err = %e, "LINE Reply API network error, NOT falling back to Push (possible duplicate risk)"); + used_reply = true; + } + } + } + + // Fallback to Push API + if !used_reply { + info!(to = %reply.channel.id, "gateway → line (push API)"); + let _ = client + .post("https://api.line.me/v2/bot/message/push") + .bearer_auth(access_token) + .json(&serde_json::json!({ + "to": reply.channel.id, + "messages": [{"type": "text", "text": reply.content.text}] + })) + .send() + .await + .map_err(|e| error!("line push error: {e}")); + } } } else { // Telegram sendMessage @@ -611,6 +707,8 @@ async fn main() -> Result<()> { } let (event_tx, _) = broadcast::channel::(256); + let reply_token_cache: ReplyTokenCache = + Arc::new(std::sync::Mutex::new(std::collections::HashMap::new())); let state = Arc::new(AppState { bot_token, @@ -619,8 +717,30 @@ async fn main() -> Result<()> { line_channel_secret, line_access_token, event_tx, + reply_token_cache, }); + // Background task: sweep expired reply tokens every REPLY_TOKEN_TTL_SECS + { + let cache_state = state.clone(); + tokio::spawn(async move { + loop { + tokio::time::sleep(std::time::Duration::from_secs(REPLY_TOKEN_TTL_SECS)).await; + let mut cache = cache_state.reply_token_cache.lock().unwrap_or_else(|e| e.into_inner()); + let before = cache.len(); + cache.retain(|_, (_, t)| t.elapsed().as_secs() < REPLY_TOKEN_TTL_SECS); + let after = cache.len(); + if before != after { + info!( + removed = before - after, + remaining = after, + "reply token cache sweep" + ); + } + } + }); + } + let app = Router::new() .route(&webhook_path, post(telegram_webhook)) .route("/webhook/line", post(line_webhook))