JMS using threads
Ques:- How to use multiple producers and multiple consumers using threads?Answer:- Here in the below example, we will create multiple producers and multiple consumers by implementing runnable interface.
System Requirements:-
- Eclipse Editor or any other.
- JDK 1.7.0_03
- Required jars(activemq-all-5.4.3.jar).
- Apache-activemq-5.4.3
Note: - Apache Active MQ Setup is required for the execution of this example. For doing the Active MQ Setup please follow the below link:-
Steps for creating Eclipse java project for implementing Core JMS using Apache ActiveMQ with mutlithreading:-
- Create a java project named JMSUsingActiveMQUsingThreads.
- Create a package names com.gaurav.jms.activemq.threads in the src directory
- Create an ActiveMQMessageProducerThread.javain the above specified package.
package com.gaurav.jms.activemq.threads;
import javax.jms.Connection;
import javax.jms.DeliveryMode;
import javax.jms.Destination;
import javax.jms.MessageProducer;
import javax.jms.Session;
import javax.jms.TextMessage;
import org.apache.activemq.ActiveMQConnectionFactory;
public class ActiveMQMessageProducerThread implementsRunnable {
@Override
public voidrun() {
try {
// Creating a connection factory for ActiveMQ
ActiveMQConnectionFactory activeMQConFactory = new ActiveMQConnectionFactory(
"tcp://localhost:61616");
// Creating a connection
Connection con = activeMQConFactory.createConnection();
con.start();
// Creating a session;
Session session = con
.createSession(false, Session.AUTO_ACKNOWLEDGE);
// Creating a destination using Topic or Queue
Destination dest = session.createQueue("TestWelcomeActiveMQQueue");
// creating a MessageProducer using the session to the topic or
// queue.
MessageProducer msgProducer = session.createProducer(dest);
msgProducer.setDeliveryMode(DeliveryMode.NON_PERSISTENT);
// Creating a message for sending in the queue
String strMessage = "WELCOME GAURAV BY "
+ Thread.currentThread().getName();
TextMessage txtMessage = session.createTextMessage(strMessage);
System.out.println("Sent message : " + strMessage.hashCode()
+ " : " + Thread.currentThread().getName());
msgProducer.send(txtMessage);
// closing the resources
msgProducer.close();
session.close();
con.close();
} catch(Exception e) {
System.out.println("Exception thrown : " + e);
e.printStackTrace();
}
}
}
/* NON_PERSISTENT means no need for database specific persistent */
- Create an ActiveMQMessageConsumerThread.javain the above specified package.
package com.gaurav.jms.activemq.threads;
import javax.jms.Connection;
import javax.jms.Destination;
import javax.jms.ExceptionListener;
import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.MessageConsumer;
import javax.jms.Session;
import javax.jms.TextMessage;
import org.apache.activemq.ActiveMQConnectionFactory;
public class ActiveMQMessageConsumerThread implementsRunnable,
ExceptionListener {
@Override
public voidrun() {
try {
// Creating a connection factory for ActiveMQ
ActiveMQConnectionFactory activeMQConFactory = new ActiveMQConnectionFactory(
"tcp://localhost:61616");
// Creating a connection
Connection con = activeMQConFactory.createConnection();
con.start();
// Creating a session;
Session session = con
.createSession(false, Session.AUTO_ACKNOWLEDGE);
// Creating a destination using Topic or Queue
Destination dest = session.createQueue("TestWelcomeActiveMQQueue");
// creating a MessageProducer using the session to the topic or
// queue.
MessageConsumer msgConsumer = session.createConsumer(dest);
Message message = msgConsumer.receive(5000);
if(message instanceof TextMessage) {
TextMessage txtMessage = (TextMessage) message;
String strMessage = txtMessage.getText();
System.out.println("Received Message from queue is : "
+ strMessage);
} else{
System.out.println("Received : " + message);
}
// closing the resources
msgConsumer.close();
session.close();
con.close();
} catch(Exception e) {
System.out.println("Exception thrown : " + e);
e.printStackTrace();
}
}
@Override
public synchronizedvoid onException(JMSException jmsEx) {
System.out.println("JMS Exception Occured, So closing the client ");
}
}
- Create an ActiveMQProducerConsumerCaller.javain the above specified package.
package com.gaurav.jms.activemq.threads;
public class ActiveMQProducerConsumerCaller {
public staticvoid main(String args[]) throws Exception {
executeThread(newActiveMQMessageProducerThread(), false);
executeThread(newActiveMQMessageProducerThread(), false);
executeThread(newActiveMQMessageProducerThread(), false);
executeThread(newActiveMQMessageProducerThread(), false);
executeThread(newActiveMQMessageProducerThread(), false);
Thread.sleep(3000);
executeThread(newActiveMQMessageConsumerThread(), false);
executeThread(newActiveMQMessageConsumerThread(), false);
executeThread(newActiveMQMessageConsumerThread(), false);
executeThread(new ActiveMQMessageConsumerThread(), false);
executeThread(new ActiveMQMessageConsumerThread(), false);
Thread.sleep(3000);
}
public staticvoid executeThread(Runnable runnable, boolean daemonThreadFlag) {
Thread thread = newThread(runnable);
thread.setDaemon(daemonThreadFlag);
thread.start();
}
}
- Execute the ActiveMQProducerConsumerCaller.javaby selecting the option Run as Java Application.
Result of ActiveMQProducerConsumerCaller.javasuccessful execution.
Sent message : 1075196385 : Thread-1
Sent message : 1075196388 : Thread-4
Sent message : 1075196387 : Thread-3
Sent message : 1075196386 : Thread-2
Sent message : 1075196384 : Thread-0
Received Message from queue is : WELCOME GAURAV BY Thread-1
Received Message from queue is : WELCOME GAURAV BY Thread-2
Received Message from queue is : WELCOME GAURAV BY Thread-4
Received Message from queue is : WELCOME GAURAV BY Thread-3
Received Message from queue is : WELCOME GAURAV BY Thread-0
No comments:
Post a Comment