Last value queues in Apache Qpid
Apache Qpid is an open source implementation of Advance Message Queuing Protocol (AMQP), an emerging open standard for inter-operable Message Oriented Middleware (MOM) communication in financial market data delivery. I have been looking recently at Red Hat MRG Messaging which is based on Apache Qpid.
The feature of Qpid which I wanted to write about today is called Last Value Queues. Definition from MRG User Guide:
“The last value queue type causes logically updated versions of previous messages to appear to overwrite the older messages. To use this feature, add a qpid.last_value_queue key to the arguments supplied. The value of the key is irrelevant. Messages published to the queue then need to specify a value for the qpid.LVQ_key in the headers of messages they publish. The last value queue uses the value of the qpid.LVQ_key to determine whether a newly published message is an update to an existing message on the queue. If this is the case, the new message will appear to overwrite the older message. A subscriber that requests messages after this has occurred will see only the newer message.”
Common example would be a queue with latest price updates per symbol when we are interested only in the last value per instrument. The documentation of MRG Messaging mentions only last value queues defined by providing qpid.last_value_queue parameter when creating a queue. There is also slightly different implementation of this feature defined by parameter qpid.last_value_queue_no_browse. The difference is described on qpid website.
Let start and create a last value queue using qpid-config tool:
qpid-config add queue lvq --last-value-queue
Check if the queue has been created:
qpid-config queues Queue Name Attributes ====================================================================== lvq --last-value-queue
The messages with the same value of the qpid.LVQ_key property will be replaced, so if we wanted to write simple price publisher that publishes 12 messages of four symbols SYMBOLS = new String [] {”Crude”, “Gold”, “Silver”, “Copper”} which uses the last value queue feature the code would look like:
Destination destinationQueue = new AMQQueue("amq.direct", "lvq");
MessageProducer producerQueue = session.createProducer(destinationQueue);
TextMessage txtQueueMsg = session.createTextMessage();
for (int i=1; i < = 12; i++) {
String symbol = SYMBOLS[(i-1)%4];
txtQueueMsg.setStringProperty("qpid.LVQ_key", symbol);
txtQueueMsg.setText(String.format("Tick %d for symbol %s", i, symbol));
producerQueue.send(txtQueueMsg);
log.info(String.format("Producer: sent message => %s", txtQueueMsg.getText()));
}
Let’s create a synchronous client to consume the messages:
Destination destinationQueue = new AMQQueue("amq.direct", "lvq");
MessageConsumer consumerQueue = session.createConsumer(destinationQueue);
boolean finished = false;
while(!finished) {
TextMessage message = (TextMessage) consumerQueue.receive(5000);
if (message != null) {
log.info(String.format("Consumer: received message %s key=%s", message.getText(), message.getStringProperty("qpid.LVQ_key")));
message.acknowledge();
} else {
log.info("Timeout");
finished = true;
}
}
Assume that consumer subscribed to the queue after publisher finished publishing the 12 messages. In this case the consumer will receive only 4 messages, one per symbol:
Consumer: received message Tick 9 for symbol Crude key=Crude Consumer: received message Tick 10 for symbol Gold key=Gold Consumer: received message Tick 11 for symbol Silver key=Silver Consumer: received message Tick 12 for symbol Copper key=Copper
Popularity: 84% [?]

Robbie said,
January 20, 2009 @ 2:45 pm
What’s the difference between this and Apache ActiveMQ?
znachor said,
January 20, 2009 @ 6:30 pm
ActiveMQ is a JMS broker but Qpid implements completely different standard: AMQP. However it’s possible to use Qpid broker with a standard JMS client.