forked from yoomoney/db-queue
-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathExampleTracingConfiguration.java
More file actions
131 lines (119 loc) · 6.85 KB
/
ExampleTracingConfiguration.java
File metadata and controls
131 lines (119 loc) · 6.85 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
package ru.yoomoney.tech.dbqueue.test;
import brave.Span;
import brave.Tracer;
import brave.Tracing;
import brave.context.log4j2.ThreadContextScopeDecorator;
import brave.propagation.ThreadLocalCurrentTraceContext;
import org.junit.Assert;
import org.junit.Test;
import ru.yoomoney.tech.dbqueue.api.EnqueueParams;
import ru.yoomoney.tech.dbqueue.api.QueueProducer;
import ru.yoomoney.tech.dbqueue.api.impl.MonitoringQueueProducer;
import ru.yoomoney.tech.dbqueue.api.impl.NoopPayloadTransformer;
import ru.yoomoney.tech.dbqueue.api.impl.ShardingQueueProducer;
import ru.yoomoney.tech.dbqueue.api.impl.SingleQueueShardRouter;
import ru.yoomoney.tech.dbqueue.brave.TracingQueueProducer;
import ru.yoomoney.tech.dbqueue.brave.TracingTaskLifecycleListener;
import ru.yoomoney.tech.dbqueue.config.DatabaseDialect;
import ru.yoomoney.tech.dbqueue.config.QueueService;
import ru.yoomoney.tech.dbqueue.config.QueueShard;
import ru.yoomoney.tech.dbqueue.config.QueueShardId;
import ru.yoomoney.tech.dbqueue.config.QueueTableSchema;
import ru.yoomoney.tech.dbqueue.config.impl.CompositeTaskLifecycleListener;
import ru.yoomoney.tech.dbqueue.config.impl.CompositeThreadLifecycleListener;
import ru.yoomoney.tech.dbqueue.config.impl.LoggingTaskLifecycleListener;
import ru.yoomoney.tech.dbqueue.config.impl.LoggingThreadLifecycleListener;
import ru.yoomoney.tech.dbqueue.settings.ExtSettings;
import ru.yoomoney.tech.dbqueue.settings.FailRetryType;
import ru.yoomoney.tech.dbqueue.settings.FailureSettings;
import ru.yoomoney.tech.dbqueue.settings.PollSettings;
import ru.yoomoney.tech.dbqueue.settings.ProcessingMode;
import ru.yoomoney.tech.dbqueue.settings.ProcessingSettings;
import ru.yoomoney.tech.dbqueue.settings.QueueConfig;
import ru.yoomoney.tech.dbqueue.settings.QueueId;
import ru.yoomoney.tech.dbqueue.settings.QueueLocation;
import ru.yoomoney.tech.dbqueue.settings.QueueSettings;
import ru.yoomoney.tech.dbqueue.settings.ReenqueueRetryType;
import ru.yoomoney.tech.dbqueue.settings.ReenqueueSettings;
import ru.yoomoney.tech.dbqueue.spring.dao.SpringDatabaseAccessLayer;
import java.time.Duration;
import java.util.Arrays;
import java.util.LinkedHashMap;
import java.util.concurrent.atomic.AtomicInteger;
import static java.lang.Thread.sleep;
import static java.util.Collections.singletonList;
import static org.hamcrest.CoreMatchers.equalTo;
/**
* @author Oleg Kandaurov
* @since 11.06.2021
*/
public class ExampleTracingConfiguration {
public static final String PG_TRACING_TABLE_DDL = "CREATE TABLE %s (\n" +
" id BIGSERIAL PRIMARY KEY,\n" +
" queue_name TEXT NOT NULL,\n" +
" payload TEXT,\n" +
" created_at TIMESTAMP WITH TIME ZONE DEFAULT now(),\n" +
" next_process_at TIMESTAMP WITH TIME ZONE DEFAULT now(),\n" +
" attempt INTEGER DEFAULT 0,\n" +
" reenqueue_attempt INTEGER DEFAULT 0,\n" +
" total_attempt INTEGER DEFAULT 0,\n" +
" trace_info TEXT\n" +
");" +
"CREATE INDEX %s_name_time_desc_idx\n" +
" ON %s (queue_name, next_process_at, id DESC);\n" +
"\n";
@Test
public void tracing_config() throws InterruptedException {
AtomicInteger taskConsumedCount = new AtomicInteger(0);
DefaultDatabaseInitializer.createTable(PG_TRACING_TABLE_DDL, "tracing_task_table");
SpringDatabaseAccessLayer databaseAccessLayer = new SpringDatabaseAccessLayer(
DatabaseDialect.POSTGRESQL, QueueTableSchema.builder()
.withExtFields(singletonList("trace_info")).build(),
DefaultDatabaseInitializer.getJdbcTemplate(),
DefaultDatabaseInitializer.getTransactionTemplate());
QueueShard<SpringDatabaseAccessLayer> shard = new QueueShard<>(new QueueShardId("main"), databaseAccessLayer);
QueueId queueId = new QueueId("tracing_queue");
QueueConfig config = new QueueConfig(QueueLocation.builder()
.withTableName("tracing_task_table")
.withQueueId(queueId).build(),
QueueSettings.builder()
.withProcessingSettings(ProcessingSettings.builder()
.withProcessingMode(ProcessingMode.SEPARATE_TRANSACTIONS)
.withThreadCount(1).build())
.withPollSettings(PollSettings.builder()
.withBetweenTaskTimeout(Duration.ofMillis(100))
.withNoTaskTimeout(Duration.ofMillis(100))
.withFatalCrashTimeout(Duration.ofSeconds(1)).build())
.withFailureSettings(FailureSettings.builder()
.withRetryType(FailRetryType.GEOMETRIC_BACKOFF)
.withRetryInterval(Duration.ofMinutes(1)).build())
.withReenqueueSettings(ReenqueueSettings.builder()
.withRetryType(ReenqueueRetryType.MANUAL).build())
.withExtSettings(ExtSettings.builder().withSettings(new LinkedHashMap<>()).build())
.build());
Tracing tracing = Tracing.newBuilder().currentTraceContext(ThreadLocalCurrentTraceContext.newBuilder()
.addScopeDecorator(ThreadContextScopeDecorator.create())
.build()).build();
ShardingQueueProducer<String, SpringDatabaseAccessLayer> shardingQueueProducer = new ShardingQueueProducer<>(
config, NoopPayloadTransformer.getInstance(), new SingleQueueShardRouter<>(shard));
QueueProducer<String> monitoringQueueProducer = new MonitoringQueueProducer<>(shardingQueueProducer, queueId);
TracingQueueProducer<String> tracingQueueProducer = new TracingQueueProducer<>(monitoringQueueProducer, queueId, tracing, "trace_info");
StringQueueConsumer consumer = new StringQueueConsumer(config, taskConsumedCount);
QueueService queueService = new QueueService(singletonList(shard),
new CompositeThreadLifecycleListener(singletonList(
new LoggingThreadLifecycleListener())),
new CompositeTaskLifecycleListener(Arrays.asList(
new TracingTaskLifecycleListener(tracing, "trace_info"),
new LoggingTaskLifecycleListener())));
queueService.registerQueue(consumer);
queueService.start();
Span span = tracing.tracer().newTrace();
try (Tracer.SpanInScope spanInScope = tracing.tracer().withSpanInScope(span)) {
tracingQueueProducer.enqueue(EnqueueParams.create("tracing task"));
}
sleep(1000);
queueService.shutdown();
queueService.awaitTermination(Duration.ofSeconds(10));
Assert.assertThat(taskConsumedCount.get(), equalTo(1));
}
}