Here is the protoype.
We start off be initiating a JNDI look up for Queue Connection Factory
Access the resources by looking up the JNDI tree
Get the Connection Factory reference
Get the connection to JMS Server
Get the reference to the static destination that MDB is listening on
Create a Session using Connection Object
Create the Temporary Destinations using Session Object
Create the Message - in our example a text message
Set the temporary queue name as a message header - JMSReplyTo
Create Receiver - We are now ready to listen on the temporary Queue
Create worker thread pass in Session, Receiver and the Connection so that it makes the blocking call
Start the thread.
At this point a new thread is spawned and waiting for the reply
Create a sender to the static queue using the session
send the message to the static destination
Assuming MDB is active and accepting requests, depending upon the vendor, you could accept the message in a second or couple of seconds - depends upon the complexity of the workflow you are dealing with on the Facade side
Clean up
Worker thread will close the session and the receiver passes in, but not the connection
Close the Connection as soon as you are done with all the threads
IMPORTANT NOTE: YOU CAN'T PASS IN THE SAME SESSION TO MORE THAN ONE THREAD, EVEN THOUGH YOU CAN PASS IN THE SAME CONNECTION OBJECT, OTHERWISE, YOU GET javax.jms.IllegalStateException SINCE JMS SESSIONS ARE NOT MULTITHREADED AS DESCRIBED IN THE ARTICLE
/** We start off be initiating a JNDI look up for Queue Connection Factory*/
Hashtable env = new Hashtable();
env.put(Context.INITIAL_CONTEXT_FACTORY, VendorSpecificContextFactory);
env.put(Context.PROVIDER_URL, JMS_URL);
env.put(Context.SECURITY_PRINCIPAL, USERNAME );
env.put(Context.SECURITY_CREDENTIALS, PASSWORD);
Context context = new InitialContext(env);
System.out.println("Go the Initial Context");
/** Access the resources by looking up the JNDI tree */
/** Get the Connection Factory reference*/
QueueConnectionFactory qcf = (QueueConnectionFactory)context.lookup(QCFNAME);
System.out.println("Go the QCF");
/** Get the connection to JMS Server */
javax.jms.QueueConnection qConnection = (javax.jms.QueueConnection)qcf.createQueueConnection(USERNAME, PASSWORD);
/** Get the reference to the static destination that MDB is listening on */
javax.jms.Queue staticQueue = (javax.jms.Queue)context.lookup(STATIC_QUEUE_NAME);
/** Create a Session using Connection Object */
javax.jms.Session qSession = (QueueSession)qConnection.createQueueSession(false,Session.AUTO_ACKNOWLEDGE);
/** Create the Temporary Destinations using Session Object */
javax.jms.Queue replyTempQueue = qSession.createTemporaryQueue();
System.out.println("Temporary Queue Name:" + replyTempQueue);
/** Create a Message - in our example a text message */
TextMessage message= qSession.createTextMessage();
message.setText("sampletext"); // in most cases it is an xml
/**Optional - set message properties - if any for example user, department*/
message.setStringProperty(USER, USER);
message.setStringProperty(DEPT, DEPT);
/** Set the temporary queue name as a message header - JMSReplyTo*/
message.setJMSReplyTo(replyTempQueue); //mandatory in the context of temporary destinations
/** Optional - vendor specific - set the message expiration*/
message.setJMSExpiration(30000); //message will expire after 30 seconds
/**Assuming MDB is active and accepting requests, depending upon the vendor,
* you could accept the message in a second or couple of seconds - depends upon
* the complexity of the workflow you are dealing with on the Facade side*/
/** Create Receiver - We are now ready to listen on the temporary Queue*/
QueueReceiver receiver= ((QueueSession)qSession).createReceiver(replyTempQueue);
/** Create worker thread that makes the blocking call*/
RequestHelper worker = new RequestHelper(qSession, qConnection,replyTempQueue,receiver );
worker.start();
/** At this point a new thread is spawned and waiting for the reply*/
/** Create a sender to the static queue using the session*/
QueueSender sender= ((QueueSession)qSession).createSender(staticQueue);
/** send the message*/
sender.send(message);
System.out.println("Message Send to Static Queue:" + staticQueue.getQueueName());
public class RequestHelper extends Thread
{
private Connection conn;
private MessageConsumer receiver;
private Session qSession;
private Queue tempQueue; /** Used only for logging purposes*/
private String threadId; /** Used only for logging purposes*/
public RequestHelper(Session session, Connection conn, Queue tmpQue, QueueReceiver queueReceiver) throws JMSException
{
this.receiver = queueReceiver;
this.conn = conn;
this.qSession = session;
this.threadId = getName();
this.tempQueue = tmpQue;
System.out.println("Temporay QueueName:" + threadId + ":" + tmpQue.getQueueName());
}
public void run()
{
try
{
System.out.println(threadId + ":Calling Connection.start");
conn.start();
System.out.println(threadId + ":Connection.Start Called");
System.out.println(threadId + ":Delivery of Messages Started");
String receivedText = null;
System.out.println(threadId + ":Waiting for Messages on Receiver");
Message messageM = receiver.receive();
System.out.println(threadId + ":Processing of Response started, Unpacking TextMessage");
TextMessage textMessage = (TextMessage)messageM;
receivedText = textMessage.getText();
System.out.println (threadId + ":Successfully Completed Processing of Response" + threadId + ":" + receivedText );
receiver.close();
qSession.close();
}catch ( JMSException exp)
{
exp.printStackTrace();
}
}
}
|