Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
26 changes: 22 additions & 4 deletions apps/web/src/__tests__/lib/api/messages.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -117,7 +117,13 @@ describe('messagesApi', () => {

expect(mockedClient.post).toHaveBeenCalledWith(
'/namespaces/ns-1/queues/my-queue/messages',
expect.objectContaining({ body: 'hello' })
expect.objectContaining({ body: 'hello' }),
expect.objectContaining({
headers: expect.objectContaining({
'X-ServiceHub-Confirm': 'true',
'X-ServiceHub-Intent': 'messages:send',
}),
})
);
});

Expand All @@ -128,7 +134,13 @@ describe('messagesApi', () => {

expect(mockedClient.post).toHaveBeenCalledWith(
'/namespaces/ns-1/topics/my-topic/messages',
expect.objectContaining({ body: 'hello' })
expect.objectContaining({ body: 'hello' }),
expect.objectContaining({
headers: expect.objectContaining({
'X-ServiceHub-Confirm': 'true',
'X-ServiceHub-Intent': 'messages:send',
}),
})
);
});

Expand All @@ -141,8 +153,14 @@ describe('messagesApi', () => {
});

expect(mockedClient.post).toHaveBeenCalledWith(
expect.any(String),
expect.objectContaining({ applicationProperties: { key: 'value' } })
'/namespaces/ns-1/queues/my-queue/messages',
expect.objectContaining({ applicationProperties: { key: 'value' } }),
expect.objectContaining({
headers: expect.objectContaining({
'X-ServiceHub-Confirm': 'true',
'X-ServiceHub-Intent': 'messages:send',
}),
})
);
});
});
Expand Down
10 changes: 9 additions & 1 deletion apps/web/src/__tests__/lib/api/namespaces.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,15 @@ describe('namespacesApi', () => {

await namespacesApi.delete('ns-1');

expect(mockedClient.delete).toHaveBeenCalledWith('/namespaces/ns-1');
expect(mockedClient.delete).toHaveBeenCalledWith(
'/namespaces/ns-1',
expect.objectContaining({
headers: expect.objectContaining({
'X-ServiceHub-Confirm': 'true',
'X-ServiceHub-Intent': 'namespaces:delete',
}),
})
);
});
});

Expand Down
8 changes: 7 additions & 1 deletion apps/web/src/__tests__/lib/api/scheduled.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -89,7 +89,13 @@ describe('scheduledApi', () => {
await scheduledApi.cancelScheduled('ns-1', 'my-queue', 100);

expect(mockedClient.delete).toHaveBeenCalledWith(
'/namespaces/ns-1/queues/my-queue/scheduled/100'
'/namespaces/ns-1/queues/my-queue/scheduled/100',
expect.objectContaining({
headers: expect.objectContaining({
'X-ServiceHub-Confirm': 'true',
'X-ServiceHub-Intent': 'messages:cancel-scheduled',
}),
})
);
});

Expand Down
15 changes: 15 additions & 0 deletions apps/web/src/lib/api/intentHeaders.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
export const riskIntent = {
sendMessage: 'messages:send',
deadLetter: 'messages:deadletter',
replayMessage: 'messages:replay',
cancelScheduled: 'messages:cancel-scheduled',
deleteNamespace: 'namespaces:delete',
replayAllRules: 'rules:replay-all',
} as const;

export function withRiskIntent(intent: string): Record<string, string> {
return {
'X-ServiceHub-Intent': intent,
'X-ServiceHub-Confirm': 'true',
};
}
13 changes: 11 additions & 2 deletions apps/web/src/lib/api/messages.ts
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
import { apiClient } from './client';
import { riskIntent, withRiskIntent } from './intentHeaders';
import { Message, PaginatedResponse, GetMessagesParams } from './types';

/**
Expand Down Expand Up @@ -77,7 +78,10 @@ export const messagesApi = {

await apiClient.post(
`/namespaces/${namespaceId}/${entityPath}/${queueOrTopicName}/messages`,
payload
payload,
{
headers: withRiskIntent(riskIntent.sendMessage),
}
);
},

Expand All @@ -89,6 +93,7 @@ export const messagesApi = {
subscriptionName?: string
): Promise<void> => {
await apiClient.post('/messages/replay', null, {
headers: withRiskIntent(riskIntent.replayMessage),
params: {
namespaceId,
sequenceNumber,
Expand Down Expand Up @@ -147,7 +152,11 @@ export const messagesApi = {
}

const response = await apiClient.post<{ deadLetteredCount: number; reason: string }>(
`${url}?${params.toString()}`
`${url}?${params.toString()}`,
null,
{
headers: withRiskIntent(riskIntent.deadLetter),
}
);
return response.data;
},
Expand Down
5 changes: 4 additions & 1 deletion apps/web/src/lib/api/namespaces.ts
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
import { apiClient } from './client';
import { riskIntent, withRiskIntent } from './intentHeaders';
import { Namespace, CreateNamespaceRequest } from './types';

export interface NamespaceStats {
Expand Down Expand Up @@ -31,7 +32,9 @@ export const namespacesApi = {

// DELETE /api/v1/namespaces/{id}
delete: async (id: string): Promise<void> => {
await apiClient.delete(`/namespaces/${id}`);
await apiClient.delete(`/namespaces/${id}`, {
headers: withRiskIntent(riskIntent.deleteNamespace),
});
},

// POST /api/v1/namespaces/{id}/test-connection
Expand Down
2 changes: 2 additions & 0 deletions apps/web/src/lib/api/rules.ts
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
import { apiClient } from './client';
import { riskIntent, withRiskIntent } from './intentHeaders';

// ─── Types ─────────────────────────────────────────────────────────

Expand Down Expand Up @@ -146,6 +147,7 @@ export const rulesApi = {
replayAll: async (ruleId: number): Promise<ReplayAllResponse> => {
// Use extended timeout — bulk replay can take time for many messages
const { data } = await apiClient.post<ReplayAllResponse>(`${BASE}/${ruleId}/replay-all`, null, {
headers: withRiskIntent(riskIntent.replayAllRules),
timeout: 120_000, // 2 minutes (override default 30s)
});
return data;
Expand Down
6 changes: 5 additions & 1 deletion apps/web/src/lib/api/scheduled.ts
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
import { apiClient } from './client';
import { riskIntent, withRiskIntent } from './intentHeaders';
import { Message, PaginatedResponse } from './types';

export const scheduledApi = {
Expand Down Expand Up @@ -29,7 +30,10 @@ export const scheduledApi = {
sequenceNumber: number
): Promise<void> => {
await apiClient.delete(
`/namespaces/${namespaceId}/queues/${queueName}/scheduled/${sequenceNumber}`
`/namespaces/${namespaceId}/queues/${queueName}/scheduled/${sequenceNumber}`,
{
headers: withRiskIntent(riskIntent.cancelScheduled),
}
);
},
};
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
using Microsoft.AspNetCore.Mvc;
using ServiceHub.Api.Authorization;
using ServiceHub.Api.Security;
using ServiceHub.Infrastructure.Security;
using ServiceHub.Core.DTOs.Requests;
using ServiceHub.Core.DTOs.Responses;
Expand All @@ -22,6 +23,7 @@ public sealed class MessagesController : ApiControllerBase
private readonly IMessageSender _messageSender;
private readonly IMessageReceiver _messageReceiver;
private readonly INamespaceRepository _namespaceRepository;
private readonly IAuditLogger _auditLogger;
private readonly ILogger<MessagesController> _logger;

/// <summary>
Expand All @@ -31,15 +33,18 @@ public sealed class MessagesController : ApiControllerBase
/// <param name="messageReceiver">The message receiver service.</param>
/// <param name="namespaceRepository">The namespace repository.</param>
/// <param name="logger">The logger.</param>
/// <param name="auditLogger">The security audit logger.</param>
public MessagesController(
IMessageSender messageSender,
IMessageReceiver messageReceiver,
INamespaceRepository namespaceRepository,
ILogger<MessagesController> logger)
ILogger<MessagesController> logger,
IAuditLogger? auditLogger = null)
{
_messageSender = messageSender ?? throw new ArgumentNullException(nameof(messageSender));
_messageReceiver = messageReceiver ?? throw new ArgumentNullException(nameof(messageReceiver));
_namespaceRepository = namespaceRepository ?? throw new ArgumentNullException(nameof(namespaceRepository));
_auditLogger = auditLogger ?? NoOpAuditLogger.Instance;
_logger = logger ?? throw new ArgumentNullException(nameof(logger));
}

Expand Down Expand Up @@ -361,6 +366,24 @@ public async Task<IActionResult> ReplayMessage(
[FromQuery] string? subscriptionName = null,
CancellationToken cancellationToken = default)
{
if (!IntentHeaders.HasExplicitIntent(HttpContext, IntentHeaders.IntentReplayMessage))
{
_auditLogger.LogCriticalAction(
HttpContext,
OwnerId,
action: IntentHeaders.IntentReplayMessage,
outcome: "Denied",
namespaceId: namespaceId,
resourceName: entityName,
sequenceNumber: sequenceNumber,
detail: "Missing explicit intent headers");

return Problem(
statusCode: StatusCodes.Status428PreconditionRequired,
title: "Explicit Intent Required",
detail: IntentHeaders.BuildIntentRequiredDetail("message replay"));
}

_logger.LogInformation(
"Replaying message {SequenceNumber} from {EntityName} in namespace {NamespaceId}",
sequenceNumber,
Expand All @@ -382,6 +405,17 @@ public async Task<IActionResult> ReplayMessage(
// Check if namespace has Send permission
if (!ns.HasSendPermission)
{
_auditLogger.LogCriticalAction(
HttpContext,
OwnerId,
action: IntentHeaders.IntentReplayMessage,
outcome: "Denied",
namespaceId: namespaceId,
environment: ns.Environment,
resourceName: entityName,
sequenceNumber: sequenceNumber,
detail: "Namespace lacks Send permission");

return Problem(
statusCode: StatusCodes.Status403Forbidden,
title: "Insufficient Permissions",
Expand All @@ -391,6 +425,26 @@ public async Task<IActionResult> ReplayMessage(
type: "https://docs.microsoft.com/azure/service-bus-messaging/service-bus-sas");
}

// Safety-by-default guard: destructive replay is blocked in production.
if (ns.Environment == EnvironmentType.Prod)
{
_auditLogger.LogCriticalAction(
HttpContext,
OwnerId,
action: IntentHeaders.IntentReplayMessage,
outcome: "Denied",
namespaceId: namespaceId,
environment: ns.Environment,
resourceName: entityName,
sequenceNumber: sequenceNumber,
detail: "Replay blocked in production environment");

return Problem(
statusCode: StatusCodes.Status403Forbidden,
title: "Production Restriction",
detail: "Replay is blocked for production namespaces. Validate in DEV and UAT first.");
}

var result = await _messageReceiver.ReplayMessageAsync(
namespaceId,
entityName,
Expand All @@ -400,9 +454,30 @@ public async Task<IActionResult> ReplayMessage(

if (result.IsFailure)
{
_auditLogger.LogCriticalAction(
HttpContext,
OwnerId,
action: IntentHeaders.IntentReplayMessage,
outcome: "Failed",
namespaceId: namespaceId,
environment: ns.Environment,
resourceName: entityName,
sequenceNumber: sequenceNumber,
detail: result.Error.Message);
return ToActionResult(result);
}

_auditLogger.LogCriticalAction(
HttpContext,
OwnerId,
action: IntentHeaders.IntentReplayMessage,
outcome: "Succeeded",
namespaceId: namespaceId,
environment: ns.Environment,
resourceName: entityName,
sequenceNumber: sequenceNumber,
detail: "Replay completed");

_logger.LogInformation("Message {SequenceNumber} replayed successfully", sequenceNumber);
return Accepted();
}
Expand Down
Loading
Loading