Skip to content

Fixed race conditions with ActiveMQTextMessage#1851

Open
arnoudja wants to merge 1 commit intoapache:mainfrom
arnoudja:main
Open

Fixed race conditions with ActiveMQTextMessage#1851
arnoudja wants to merge 1 commit intoapache:mainfrom
arnoudja:main

Conversation

@arnoudja
Copy link
Copy Markdown

We use a network of brokers setup. On all servers, OpenWire is used by subscribers to receive text messages from topics.

When a text message is posted on a topic on one of the servers using AMQP, there is a race condition. Roughly 1% of the messages are read as empty messages on another server.

I'm not an expert on the activemq architecture, nor on its use cases, nor on Java, so please help me out when I'm making wrong conclusions. But with the help of codex, for me it looks like ActiveMQTextMessage had too much optimization. As far as I understood, it's not supposed to be used by multiple threads concurrently but it is. As a result, when copy() and beforeMarshall() are called roughly at the same time, the copy ends up completely empty. I'll add the analysis below.

A different but related pull request is this: https://github.com/apache/activemq/pull/1659/changes

I've looked at 4 solutions, all with their own downside. In the end, this pull request looks like the best compromise to me.

  1. Add synchronisation to ActiveMQTextMessage
    This is not according to the architecture, is not in line with other classes like ActiveMQMapMessage and will create more overhead. Besides that, it will not solve all problems: The beforeMarshall - getText - continue with the marshall scenario will still fail.

  2. Swap the order in copy() so text is copied first instead of last
    Though this will probably have the least impact, it feels like a nasty solution. It would need a lot of explanation around that code to avoid regression after future changes. Also, it wouldn't solve the problems completely.

  3. Avoid the concurrent use of the same instance by performing a copy before calling beforeMarshall
    This looks like the best option to me long term, but would have a lot of impact. High risk on creating other bugs and performance issues even in usecases where text messages aren't used at all.

  4. Remove the over-optimization by not clearing text / content when the other is filled
    Not a perfect solution and will lead to higher memory usage, but it is in line with ActiveMQMapMessage and ActiveMQObjectMessage. Implemented in this pull request

The analysis of the bug was done on an older branch, but as far as I can see it is still not solved:

The race originated because the broker dispatched the same ActiveMQTextMessage instance to two different subscribers at roughly the same time:

  • the local OpenWire consumer
  • the local side of the network bridge

You can see that the broker puts the same message object into each MessageDispatch, not a copy, in activemq-broker/src/main/java/org/apache/activemq/broker/region/TopicSubscription.java:644 and
activemq-broker/src/main/java/org/apache/activemq/broker/region/PrefetchSubscription.java:782.

The two methods racing

  1. ActiveMQTextMessage.copy()
  2. ActiveMQTextMessage.beforeMarshall()

beforeMarshall() is the mutating path. In activemq-client/src/main/java/org/apache/activemq/command/ActiveMQTextMessage.java:123, it calls storeContentAndClear(), which:

  • serializes text into content
  • then sets text = null

The copy() implementation read those two pieces of body state separately:

  • super.copy(copy) copied content
  • then copy.text = text

That gave a race window between those two reads.

Where the two calls came from

  • copy() came from the network bridge forwarding path:
    activemq-broker/src/main/java/org/apache/activemq/network/DemandForwardingBridgeSupport.java:1189
    -> activemq-broker/src/main/java/org/apache/activemq/network/DemandForwardingBridgeSupport.java:1172
    -> md.getMessage().copy()
  • beforeMarshall() came from the local OpenWire consumer delivery path:
    topic/prefetch subscription dispatch
    -> activemq-broker/src/main/java/org/apache/activemq/broker/TransportConnection.java:936
    -> activemq-broker/src/main/java/org/apache/activemq/broker/TransportConnection.java:971
    -> activemq-broker/src/main/java/org/apache/activemq/broker/TransportConnection.java:1482
    -> OpenWire marshal
    -> activemq-client/src/main/java/org/apache/activemq/openwire/v12/MessageMarshaller.java:118 or looseMarshal()
    -> info.beforeMarshall(wireFormat)

The network bridge gets its commands from the local transport listener in activemq-broker/src/main/java/org/apache/activemq/network/DemandForwardingBridgeSupport.java:214, which calls
serviceLocalCommand(command).

The failing interleaving
The bad sequence was:

  1. Network bridge thread enters copy()
  2. super.copy(copy) sees content == null, so copied message gets no content
  3. Local OpenWire transport thread enters beforeMarshall()
  4. storeContentAndClear() serializes body and then sets source text = null
  5. Network bridge thread resumes and does copy.text = text
  6. It now reads text == null

Result: the forwarded copy ended with both content == null and text == null, so the remote OpenWire consumer received a TextMessage with a null body.

During the analysis I also found another possible bug. I didn't try to reproduce it yet:

  • a topic has one normal OpenWire consumer
  • the same topic also has another consumer using an XPath selector
  • the normal consumer is using async dispatch, which is the default on ActiveMQConnectionFactory (activemq-client/src/main/java/org/apache/activemq/ActiveMQConnectionFactory.java:126)

Why this is realistic:

  • async consumer dispatch is normal
  • XPath selectors are a built-in broker feature, not a test-only trick

How the overlap happens

During topic fanout, the broker iterates subscriptions in activemq-broker/src/main/java/org/apache/activemq/broker/region/policy/SimpleDispatchPolicy.java:37:

  1. For subscription A, sub.add(node) is called.
  2. In activemq-broker/src/main/java/org/apache/activemq/broker/region/TopicSubscription.java:96, if the subscription can take the message immediately, it calls dispatch(node) (activemq-broker/
    src/main/java/org/apache/activemq/broker/region/TopicSubscription.java:113).
  3. That creates a MessageDispatch holding the shared Message instance and queues it to the connection with activemq-broker/src/main/java/org/apache/activemq/broker/region/
    TopicSubscription.java:700.

At that point the transport/task-runner thread for consumer A can start marshalling the message, which eventually leads to beforeMarshall().

Meanwhile, the broker dispatch thread is still inside the same fanout loop and moves on to subscription B:

  1. sub.matches(node, msgContext) is called in activemq-broker/src/main/java/org/apache/activemq/broker/region/policy/SimpleDispatchPolicy.java:43.
  2. That goes through activemq-broker/src/main/java/org/apache/activemq/broker/region/AbstractSubscription.java:101.
  3. If B uses an XPath selector, activemq-client/src/main/java/org/apache/activemq/filter/XPathExpression.java:148 calls the evaluator on message.getMessage().
  4. The broker-side XPath evaluator then does ((TextMessage) message).getText() in activemq-broker/src/main/java/org/apache/activemq/filter/XalanXPathEvaluator.java:49.

So you can get this real overlap:

  • Thread 1: local OpenWire transport path calls beforeMarshall()
  • Thread 2: broker dispatch thread evaluates an XPath selector and calls getText()

And both operate on the same shared ActiveMQTextMessage instance.

So the scenario requires:

  • topic fanout to multiple subscriptions
  • async dispatch on one subscription
  • an XPath selector on another subscription

That is less common than ordinary selectors, but it is absolutely a supported runtime path, not just a theoretical one.

@jbonofre jbonofre self-requested a review March 27, 2026 17:58
@cshannon cshannon self-requested a review March 27, 2026 18:17
@cshannon
Copy link
Copy Markdown
Contributor

@arnoudja - Thanks for the PR I will take a look when I get a chance. As you saw from #1659, this issue has come up before in the past a few times.

The primary issue is that the message classes are not designed to be thread safe, but as you noticed sometimes it appears that are accessed in a non thread safe way. There have been reports over the years of it happening, and while there are race conditions we could not pinpoint exactly where in the broker so it wasn't clear if it was really a broker issue or some client side problem. This was the problem with #1659 and why that thread died out, which was the race condition could be created in a unit test artificially but there wasn't a good explanation of how to reproduce the issue with a real broker so we can see why it was broken in the first place.

It looks like your analysis might shed some light on that mystery so that will be helpful. Ideally we'd try and fix it so that we didn't need to sync on the actual message itself and just handle/sync in the broker where needed but it depends I guess as we want to make sure it's correct. I did a quick scan of the PR and it doesn't look like you used sync so I'll take a look closer and see what you found.

@cshannon
Copy link
Copy Markdown
Contributor

I started looking at this briefly and the optimization is necessary to avoid OOM and because wasting twice as much memory is not great when there are a large amount of messages in memory. The optimization was added because when there are a lot of messages in cache in the broker you can easily have an OOM error. The problem is the memory usage tracking isn't aware that the data is stored twice so you can blow past the configured memory limits. We could account for both copies for memory tracking which would fix OOM but then still the issue of wasting a ton of space.

As you pointed out, the general idea is to operate on independent copies so the broker is supposed to make copies so 2 threads are not touching the same message, which is the 3rd solution you mentioned and has been preferred. So we may ultimately want to go the copy option.

Other ideas include creating broker specific versions of the message classes that were thread safe but that might be a huge pain to do or maybe we could just only store the text as bytes (and never text) but then there is the conversion penalty each time you called getText()

@arnoudja - can you better explain what you mean that synchronization wouldn't solve the issues? We have avoided it so far of course but it should be possible to add a lock internally to prevent two threads interfering. I am still hesitant to do this and if we did we'd probably need to add sync to all the classes but I am going to explore the option at least when looking at this more.

@cshannon
Copy link
Copy Markdown
Contributor

cshannon commented Mar 28, 2026

Also this may have something to do specifically with AMQP because this issue doesn't appear to happen with just OpenWire (at least I'e never seen it). I am not familiar with the conversion and I'd need to look it up, but maybe the AMQP protocol handler for incoming messages is not storing the contents as binary so we end up with text being stored as a String. That would then lead to the race condition when having to convert later on dispatch over the network bridge and consumer.

Openwire messages should arrive on the broker and be serialized already into bytes so they shouldn't need to be converted later as the broker is generally not calling getText() (unless a custom plugin as logging or something).

So one part of this fix could involve something with the AMQP conversion.

@arnoudja
Copy link
Copy Markdown
Author

Hello Christopher,

Thank you for looking into this. In our case, this change fixes the problem. We're currently running a custom build version of ActiveMQ, the memory usage is not an issue in our case. But I fully understand that this will cause problems for others so let's look for a solution that serves both cases.

As an answer to your question: Synchronisation will solve this problem for us, but it won't solve other problems. The problem is that getText basically does the reverse of beforeMarshall. So if thread 1 calls beforeMarshall, thread 2 calls getText right afterwards and then thread 1 resumes, this will lead to problems in all of the following code in thread 1 that rely on beforeMarshall being called. Basic synchronisation within the ActiveMQTextMessage class won't avoid that, you'll need to block the call to getText between beforeMarshall and the last piece of code that depends on that.

This doesn't seem to be just a theoretical scenario but seems possible in reality as well, though I haven't tried to reproduce it. See the xpath scenario described in my original message for more details. That results in my conclusion that synchronisation isn't only an expensive problem but also one that only solves part of the problems.

As of the AMQP part: This has definitely something to do with the conversion between protocols. Most of our code uses Openwire to communicate with the broker and we don't see this empty message problem there. However, we do see the same problem with messages posted using STOMP instead of AMQP. This fix solves most of the problems for the STOMP posts as well (down from ~1% problem cases to ~0.1%). The remaining ~0.1% problem is probably an unrelated client side issue but I didn't look into that yet.

So either the STOMP conversion has the same problem or the problem occurs in a piece of code used by both the AMQP and STOMP conversion.

@arnoudja
Copy link
Copy Markdown
Author

As a result to your remark "Openwire messages should arrive on the broker and be serialized already into bytes":

As far as I can tell, the ActiveMQTextMessage instance in this path is created in this method in JMSMappingInboundTransformer.java:

public static ActiveMQTextMessage createTextMessage(String text) {
    ActiveMQTextMessage message = new ActiveMQTextMessage();
    try {
        message.setText(text);
    } catch (MessageNotWriteableException ex) {}

    return message;
}

As you can see, setText is called which will fill the text field, not the content field. Adding a call to setContent here might solve the problem but this is probably not the best place to do so.

Also, for the STOMP part, does 57264bf address this? That wouldn't, however, explain why we still had the problem before this ActiveMQMessageText change.

@cshannon
Copy link
Copy Markdown
Contributor

I confirmed that AMQP does not marshal the data before passing to the broker which is probably the issue. You already found where Stomp does it.

Can you try this patch? 6002d18

You mentioned that the issue still happens with Stomp which I find surprising because that method does clear the text and marshal everything before it's passed as you linked. I need to check the Stomp dispatch/send code to see if it is copying messages first, maybe it is unmarshaling back to text on dispatch without copying causing a race but not sure yet.

For your case with AMQP and network bridges, when the VM transport dispatches it should copy before handing off to a consumer, and also the AMQP protocol sender will copy the message on dispatch as well because it updates the state and transforms it.

So if the message is already marshaled when incoming, this may fix the issue you are seeing, at least for AMQP, so it's at least worth trying to see if the small one line patch makes your issue go away as a test.

@arnoudja
Copy link
Copy Markdown
Author

I've checked that patch and I can confirm that it solves the AMQP problem. Nice solution!

As could be expected, the STOMP problem returned, so I switched back to our own solution afterwards. Please be aware that we're using an old branch of ActiveMQ, so I'm not sure whether the problem still would occur on the master branch. I'll have a look to see if I can find a location where it is unmarshalled.

@cshannon
Copy link
Copy Markdown
Contributor

@arnoudja - I'm glad to hear that fixed it for AMQP. So does this mean you are not running this patch 57264bf ?

That patch is basically the same solution for Stomp. I took a look at Stomp and during send to consumers a copy is correctly done before touching the body, but it looks like the message headers are deserialized without a copy first so that might be something we need to fix (by moving the copy earlier).

If you are not running 57264bf then that patch should fix it for Stomp as well.

@arnoudja
Copy link
Copy Markdown
Author

Yes, that patch is already in and it doesn't solve it in our case. So far we've focused on the AMQP part though, so I'll look into the STOMP part to see whether I can find a good reproduction scenario. Could very well be a client side issue.

@cshannon
Copy link
Copy Markdown
Contributor

cshannon commented Mar 28, 2026

Ok sounds good, at the very least next week I will create a PR with the AMQP fix but it would be good to identify if here's a Stomp area to fix too.

Ultimately something more substantial needs to be done to fix the issue for real. I've been thinking about ways to avoid the problem entirely. Synchronization on the message types is an option of course, but has performance draw backs. Something I want to experiment with would be to have immutable versions of the messages. We already copy all over the place so the server could just use immutable copies once they reach the broker internally that only store bytes (might even just have one Shared message type) and then there's no issue.

The downside is it it would likely require a major version bump and probably breaking changes for things like plugins as I'm sure lots of plugins and code do mutations like adding headers. There are impacts if you want to append a header as now you need to copy first, etc. You would need easy ways to copy and convert between types, maybe shared interfaces etc. And all the while not wanting to break existing clients. I would have to try it to really know the impact but it's worth exploring.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants