开发者

How to match MQ Server reply messages to the correct request

开发者 https://www.devze.com 2023-02-12 00:57 出处:网络
I\'m connecting to an IBM Websphere MQ. I want to be able to match the reply message with the correct request message. I\'ve trawled through hundreds of pages to get this and have had no luck.

I'm connecting to an IBM Websphere MQ. I want to be able to match the reply message with the correct request message. I've trawled through hundreds of pages to get this and have had no luck.

I have a class - MQHandler - which sends a message to one defined queue, and reads the request from another. This works fine, however, if multiple users are using the application at the same time, messages get mixed up.

I can't seem to get a method on the receiver to indicate the CorrelationID to match. Something like...

consumer.receive( selector );

Can you check the below methods to ensure I'm doing this correctly?

/**
 * When the class is called, this initialisation is done first.
 * 
 * @throws JMSException
 */
public void init() throws JMSException
{
    // Create a connection factory
    JmsFactoryFactory ff;
    try
    {
        ff = JmsFactoryFactory.getInstance( WMQConstants.WMQ_PROVIDER );
        cf = ff.createConnectionFactory();

        // Set the properties
        cf.setStringProperty( WMQConstants.WMQ_HOST_NAME, hostServer );
        cf.setIntProperty( WMQConstants.WMQ_PORT, 1414 );
        cf.setStringProperty( WMQConstants.WMQ_CHANNEL, channel );
        cf.setIntProperty( WMQConstants.WMQ_CONNECTION_MODE, WMQConstants.WMQ_CM_CLIENT );
        cf.setStringProperty( WMQConstants.WMQ_QUEUE_MANAGER, qManager );

        connection = cf.createConnection();

        session = connection.createSession( false, Session.AUTO_ACKNOWLEDGE );
    }
    catch( JMSException e )
    {
        throw e;
    }

} // end of init



/**
 * @param request
 * @return
 * @throws JMSException
 */
private String sendRequest( String request ) throws JMSException
{

    // Create JMS objects
    Destination destination = session.createQueue( "queue:///" + writeQueueName );

    // Enable write of MQMD fields. See documentation for further
    // details.
    ((JmsDestination) destination).setBooleanProperty( WMQConstants.WMQ_MQMD_WRITE_ENABLED, true );

    // Set message context, if needed. See comment at the top.

    // Create a producer
    MessageProducer producer = session.createProducer( destination );

    // Create a message
    TextMessage message = session.createTextMessage( request );

    // Generate a custom message id
    message.setJMSCorrelationID( generateRandomID() );

    // Start the connection
    connection.start();

    // And, send the message
    producer.send( message );
    System.out.println(message);

    return message.getJMSCorrelationID();
}


/**
 * @param customMessageId
 * @return
 * @throws JMSException
 */
private String recvResponse( String customMessageId ) throws JMSException
{
    Destination destination = session.createQueue( "queue:///" + readQueueName );

    // Enable read of MQMD fi开发者_运维技巧elds.
    ((JmsDestination) destination).setBooleanProperty( WMQConstants.WMQ_MQMD_READ_ENABLED, true );
    ((JmsDestination) destination).setObjectProperty( WMQConstants.JMS_IBM_MQMD_CORRELID, customMessageId );

    // Create a consumer
    MessageConsumer consumer = session.createConsumer( destination );

    // Start the connection
    connection.start();

    // And, receive a message from the queue
    TextMessage receivedMessage = (TextMessage)consumer.receive( 15000 );

    connection.close();
    session.close();

    return receivedMessage.getText();
}

Here is a snippet of the main method...

    try
    {
        String customMessageId;
        init();
        customMessageId = sendRequest( request );
        return recvResponse( customMessageId );
    }
    catch( Exception ex )
    {
        System.out.println( "Error on MQ." );
        throw new Exception( "\n\n*** An error occurred ***\n\n" + ex.getLocalizedMessage()
                             + "\n\n**********************************" );
    }


QueueReceiver queueReceiver = 
    session.createReceiver(destination, "JMSCorrelationID='customMessageId'");

TextMessage receivedMessage = (TextMessage)queueReceiver.receive( 15000 );

In my example, customMessageId should contain the actual value you have previously set.

Also, I have seen many cases where people generate a correlationID and set it in the outbound message expecting to be able to select the response based on that value. The textbook way to do this is for the service provider app to copy the message ID to the correlation ID when responding. The requestor would still specify the JMSCorrelationID as the selector but would use the original JMSMessageID as the value. Since the JMSMessageID is guaranteed to be unique even across QMgrs, you are MUCH less likely to get collisions on this value. You will need to insure that your client matches the behavior of the service provider with respect to which value gets copied into the correlation ID.


This could be a use case for a temporary queue, which is only associated with the connection that creates it.

There is a detailled article at onJava, Designing Messaging Applications with Temporary Queues

“Temporary destinations (temporary queues or temporary topics) are proposed as a lightweight alternative in a scalable system architecture that could be used as unique destinations for replies. Such destinations have a scope limited to the connection that created it, and are removed on the server side as soon as the connection is closed.”

and the Java documentation explains:

You can use temporary destinations to implement a simple request/reply mechanism. If you create a temporary destination and specify it as the value of the JMSReplyTo message header field when you send a message, the consumer of the message can use the value of the JMSReplyTo field as the destination to which it sends a reply and can also reference the original request by setting the JMSCorrelationID header field of the reply message to the value of the JMSMessageID header field of the request.

0

精彩评论

暂无评论...
验证码 换一张
取 消