RabbitMQ worker like a bridge between two queues

RabbitMQ is a well known messaging broker. It gives a common platform for sending and receiving messages. RabbitMQ has some good tutorials in five programming languages. This post wants to show a little example about a message receiver likes a bridge between two message queues. Actually it’s very simple, the receiver send messages from the first messaging queue, after the processing of messages it sends them the second messaging queue.

The skeleton of our example is:
Producer#1 -> Queue#1 -> Worker (in another perspective Producer#2) -> Queue#2 -> Receiver

Producer#1 -> Queue#1
The first part is establishment of the first messaging queue and producer code sends messages to our message receiver (let be “worker”). This part is almost same as official tutorial.


public class Producer {
private static final String TASK_QUEUE_NAME = "first_task_queue";

public static void main(String[] argv)
throws java.io.IOException {

ConnectionFactory factory = new ConnectionFactory();
factory.setHost("localhost");
Connection connection = factory.newConnection();
Channel channel = connection.createChannel();

channel.queueDeclare(TASK_QUEUE_NAME, true, false, false, null);

String message = "Hello from Queue#1";

channel.basicPublish( "", TASK_QUEUE_NAME,
MessageProperties.PERSISTENT_TEXT_PLAIN,
message.getBytes());
System.out.println(" Sent '" + message + "'" + "to Queue#1");

channel.close();
connection.close();
}
}

Queue#1 -> Worker (in another perspective Producer#2) -> Queue#2
The second part is heart of our example, this part receives messages from the first queue and send it to the second queue. The worker code has two different queue definitions.


public class Worker {
private static final String FIRST_TASK_QUEUE_NAME = "first_task_queue";
private static final String SECOND_TASK_QUEUE_NAME = "second_task_queue";

public static void main(String[] argv)
throws java.io.IOException,
java.lang.InterruptedException {

The first queue definition.


ConnectionFactory factory = new ConnectionFactory();
factory.setHost("localhost");
Connection connection = factory.newConnection();
Channel channel = connection.createChannel();

channel.queueDeclare(FIRST_TASK_QUEUE_NAME, true, false, false, null);
System.out.println(" Waiting for messages from Queue#1.");

channel.basicQos(1);
QueueingConsumer consumer = new QueueingConsumer(channel);
channel.basicConsume(FIRST_TASK_QUEUE_NAME, false, consumer);

The second queue definition.


ConnectionFactory factory2 = new ConnectionFactory();
factory2.setHost("localhost");
Connection connection2 = factory2.newConnection();
Channel channel2 = connection2.createChannel();
channel2.queueDeclare(SECOND_TASK_QUEUE_NAME, true, false, false, null);

Receiving from Queue#1 and sending to Queue#2


while (true) {
QueueingConsumer.Delivery delivery = consumer.nextDelivery();
String message = new String(delivery.getBody());
System.out.println(" Received '" + message + "'" + "from Queue#1");
channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false);

channel2.basicPublish( "", SECOND_TASK_QUEUE_NAME,
MessageProperties.PERSISTENT_TEXT_PLAIN,
message.getBytes());
System.out.println(" Sent '" + message + "'" + "to Queue#2");
}
}
}

And the final part is an ordinary receiver. Official tutorials are very helpful and simple, so in my opinion RabbitMQ is a simple tool for sending and receiving messages. I use it on different a producer machine, five worker machines and a saver machine.

Advertisements

Leave a Reply

Fill in your details below or click an icon to log in:

WordPress.com Logo

You are commenting using your WordPress.com account. Log Out / Change )

Twitter picture

You are commenting using your Twitter account. Log Out / Change )

Facebook photo

You are commenting using your Facebook account. Log Out / Change )

Google+ photo

You are commenting using your Google+ account. Log Out / Change )

Connecting to %s