lunes, 1 de agosto de 2011

MDB - Messaging (Message Driven Beans) - JMS API

Hasta ahora, en los los tutoriales vistos anteriormente, los ejemplos han sido de forma sincrónica. Es decir, un cliente invoca a un bean de sesión (session bean) que realiza alguna operación, mientras que el cliente espera. Cuando el bean de sesión completa su operación (u operaciones), el control se devuelve al cliente y el cliente continúa con la resto de las operaciones. 

Para algunas aplicaciones, trabajar de modo asincrónico es más apropiado, sobre todo en aplicaciones B2B (business to business) donde necesitamos desacoplar ambos sistemas. Es decir, un cliente puede enviar un mensaje solicitando una operación. El procesamiento de dicha operación puede tardar varios minutos. Una vez que el mensaje ha sido enviado, el cliente continúa con sus operaciones sin necesidad de esperar que el pedido haya sido procesado. Ademas, el cliente tendrá que ser notificado de alguna manera que el mensaje ha sido recibido y por otra parte que el pedido solicitado ha sido procesado.


■ Terminología

1.- Message Producer: Es el cliente que envía el mensaje (productor del mensaje).
2.- Message Consumer: El quien recibe el mensaje (consumidor del mensaje).
3.- Message Driven Beans: Es el 3er tipo de EJB (como también lo son los EJBs de tipo Entidad o los EJBs de tipo Session Beans). Estos EJBs son consumidores de mensajes asincrónicos. De aqui en adelante, a estos tipos de EJBs lo llamaremos MDB.
4.- JMS API: El API para el Servicio de Mensajera de Java. Típicamente un MDB esta basado en el API JMS, sin embargo, los MDBs pueden ser implementados basandose en otras APIs. En este tutorial solamente vamos a cubrir los MDBs basados en el API JMS (JMS-based MDBs).

JMS API (Java Message Service)

Es un conjunto de clases e interfaces que soportan dos tipos bien definidos de mensajería
    • Point-to-point: Este modelo permite que el productor del mensaje envíe un mensaje a una cola (queue). Dicho mensaje es leido una sola vez por el consumidor. Una vez que es leido, dicho mensaje es eliminado de la cola.
    • Publish/Subscribe: Este modelo permite que el productor del mensaje envíe un mensaje a un topic. Varios consumidores deberán suscribirse o registrarse al topic para poder leer el mensaje. Si el productor no se suscribió en el topic antes que el mensaje fuera enviado, dicho productor no podrá leer dicho mensaje.


■ MDBs (Message Driver Bean) basados en el API JMS

De la misma manera que los Session Beans, también necesitamos que los MDBs tengan las ventajas ofrecidas por el EJB-Container (seguridad, transaccionabilidad, concurrencia, escalabilidad, etc). Un SessionBean no puede ser un consumidor de mensajes ya que no es asincrónico (los métodos son invocados de forma sincrónica por el cliente), sin embargo, un SessionBean podrá ser un productor de mensaje como se vera mas adelante.

Los JMS-Based MDBs implementaran la interface javax.jms.MessageListener la cual obligara a implementar el metodo onMessage(). Este método sera ejecutado ni bien el MDB reciba el mensaje.
Los MDBs son Stateless como los SessionBeans y podrán llamar a otros SessionBeans e interactuar con Entities y el EntityManager.


Productor/Consumidor JMS y Clases Utiles

Los pasos son:
  • Conseguir un objeto ConnectionFactory a través de JNDI. Obtain a javax.jms.ConnectionFactory using a JNDI lookup.  (The default ConnectionFactory names are different depending on the JMS provider.)
  • Conseguir un destino, mediante el objeto Destination a través de JNDI. Los destinos podrán ser o bien un Queue o bien un Topic.
  • Usar ConnectionFactory para conseguir un objeto Connection. Obtain a javax.jms.Connection from the ConnectionFactory.
  • Usar Destination para crear un objeto Session. Obtain a javax.jms.Session from the Connection.
  • Crear el Sender o el Producer (son sinonimos) o bien el Consumer o Receiver (son sinonimos) mediante el objeto Session. Create a javax.jms.MessageProducer or javax.jms.MessageConsumer from the Session.
  • Crear el Message (en este caso crearemos el objeto ObjectMessage) mediante el objeto Session
  • Utilizar el Sender o Producer (o bien el Consumer o Receiver) para enviar el send (o bien para recibir el mensaje). For a MessageProducer, send the message, or for a MessageConsumer, receive the message (synchronous) or set the message listener (asynchronous).
  • Start the Connection to start message delivery
  • Ultimately close the Connection.

    La siguiente clase permite via JNDI conseguir el objeto ConnectionFactory
      • javax.naming.InitialContext
    Context jndiContext = new InitialContext();

    QueueConnectionFactory factory = (QueueConnectionFactory) jndiContext .lookup("java:/JmsXA");

    La siguientes clases se usan para crear una conexión al proveedor del sistema de mensajes. Estas clases son similares a un datasource en JDBC. Proveen una conexión JMS para rutear el mensaje a enviar:
      • javax.jms.ConnectionFactory 
      • javax.jms.QueueConnectionFactory (extiende de la clase anterior)
    QueueConnection connect = factory.createQueueConnection();
      • javax.jms.Connection
      • javax.jms.QueueConnection (extiende de la clase anterior)
    QueueSession session = connect.createQueueSession(false, Session.CLIENT_ACKNOWLEDGE);

    Las siguientes clases son utilizadas para crear el sender/producer que terminara enviando el mensaje a la cola (para esto se procede a encontrar via JNDI la cola:
      • javax.jms.Destination 
      • javax.jms.Queue (extiende de la interface anterior)
      • javax.jms.TemporaryQueue (extiende de la clase anterior)
    Queue queue = (Queue) jndiContext.lookup("queue/AxesoCashMBQueue");
      • javax.jms.Session
      • javax.jms.QueueSession (extiende de la clase anterior)
      • javax.jms.MessageProducer
      • javax.jms.QueueSender  (extiende de la clase anterior)
    QueueSender sender = session.createSender(queue);

    Por ultimo, se procede a enviar el mensaje. Algo que también es interesante en este ejemplo es que al método send() se le ha dado un parámetro del tipo Object y en el mensaje se setea este objeto, una utilidad podría ser el envío de adjuntos en un mensaje de texto o el envío de cualquier clase de objeto para que luego sea procesado por la lógica de negocio. Los tipos de mensajes que se pueden enviar son:
      • javax.jms.ObjectMessage (cualquier pojo que implementa la interface Serializable)
      • javax.jms.TextMessage
      • javax.jms.BytesMessage
      • javax.jms.StreamMessage
      • javax.jms.MapMessage
      • javax.jms.Message (todos los anteriores extienden de esta interface)
    Parametro p = new Parametro();
    ObjectMessage message = session.createObjectMessage();
    message.setObject(p);

    try {
        sender.send(message);
    } catch (Exception e) {
        e.printStackTrace();
    }
    connect.start();

    Las siguientes clases se utlizaran para recibir un mensaje de form asincrónica.
    • javax.jms.MessageConsumer
    • javax.jms.QueueReceiver (extiende de la clase anterior)


    Session

    A Session is a single-threaded context for sending and receiving messages. Messages are produced and/or consumed in serial; to increase salability, use an additional Session per thread. If a program wants to both produce and consume messages, one Session must be used for each task.


    Transactions

    A Session can be optional transacted, where messages consumed and/or produced within a transaction form a single unit of work. Upon a commit, all messages received are acknowledged and all produced messages are sent. Alternately, if a transaction is rolledback, all sent messages are destroyed and all received messages are recovered.


    Acknowledgment

    There are three modes for non-transacted Sessions:
     
    Acknowledgment Mode Description
    DUPS_OK_ACKNOWLEDGE Session lazily acknowledges message delivery; minimizes Session overhead.
    AUTO_ACKNOWLEDGE Session automatically acknowledges receipt of message after consumer finishes processing.
    CLIENT_ACKNOWLEDGE Client calls Session.acknowledge() to acknowledge receipt of all messages received during the session.

    Ver: http://www.thirdeyeconsulting.com/indyjug/jms/jms.html


    ■ Ejemplo de envio de un mensaje a una cola de forma asincrónica y esperando la respuesta de forma sincrónica (podría ser Session Bean o un simple POJO)

                  //Voy a configurar las colas y los tiempos de espera para enviar al MB
    QueueConnection connect = null;
    QueueConnection responseConnect = null;
    try {
    Context jndiContext = new InitialContext();

    QueueConnectionFactory factory = (QueueConnectionFactory)
                            jndiContext.lookup("java:/JmsXA");

    // Creamos la cola donde enviaremos el mensaje de forma asincrónica
    connect = factory.createQueueConnection();
    QueueSession session = connect.createQueueSession(false,
                                                   Session.CLIENT_ACKNOWLEDGE);
    Queue consGenQueue = (Queue) jndiContext.lookup("queue/AxesoCashMBQueue");
    QueueSender sender = session.createSender(consGenQueue);
    connect.start();

    // Creamos una cola temporal donde esperamos la respuesta de forma sincrónica del MDB
    responseConnect = factory.createQueueConnection();
    QueueSession responseSession = responseConnect.createQueueSession(false, 
                                                           Session.AUTO_ACKNOWLEDGE);
    TemporaryQueue responseQueue = responseSession.createTemporaryQueue();
    QueueReceiver queueReceiver = responseSession.createReceiver(responseQueue);
    responseConnect.start();

    // creo el mensaje
    ObjectMessage message = session.createObjectMessage();
    message.setObject(parametros);
    message.setJMSReplyTo(responseQueue);
    try {
    sender.send(message);
    } catch (Exception e) {
    e.printStackTrace();
    }
    timeOut = timeOut - 2000;


    log.info("Voy a esperar la respuesta del MB durante" + timeOut / 1000
    + " segundos");


    ObjectMessage objectMessage = (ObjectMessage) queueReceiver.receive(timeOut);


                            // Si la respuesta tardo menos que el timeout
    if (objectMessage != null) {
    log.info("El MB respondio dentro del tiempo estimado");
    Parametros param = (Parametros) objectMessage.getObject();
                            }
                      } catch (NamingException e) {
    e.printStackTrace();
    throw new TxFailException(RegistroErroresFE.TXERR_CONFIG_NO_VALIDA);
     } catch (JMSException e) {
    e.printStackTrace();
    throw new TxFailException(RegistroErroresFE.TXERR_CONFIG_NO_VALIDA);
     } finally {
    try {
    if (sender != null) {
    sender.close();
    }
    if (connect != null) {
    connect.close();
    }
    if (responseConnect != null) {
    responseConnect.close();
    }
        } catch (JMSException jmse) {
                  jmse.printStackTrace();
           }
    }   

    ■ Ejemplo de recepción de un mensaje de forma asincronica (es un MDB)


    @MessageDriven(activationConfig = {
    @ActivationConfigProperty(propertyName = "destinationType", propertyValue = "javax.jms.Queue"),
    @ActivationConfigProperty(propertyName = "destination", propertyValue = "queue/AxesoCashMBQueue"),
    @ActivationConfigProperty(propertyName = "acknowledgeMode", propertyValue = "Auto-acknowledge") })
    @TransactionManagement(TransactionManagementType.BEAN)
    public class AxesoCashMB implements MessageListener {


    @Resource(mappedName = "java:/JmsXA")
    private QueueConnectionFactory qcf;
    private Logger logger = Logger.getLogger(AxesoCashMB.class);
    private MensajeAxesoCash message = null;
    private Parametros parametros;

    public static String IDMAYORISTA = "IDMAYORISTA";
    public static String IDPUNTOVENTA = "IDPUNTOVENTA";


    private AxesoCashDAO dao = null;
    /**
    * metodo onMessage

    * Aca llega el mensaje del sessionBean o del simple pojo
    * @param m Message
    */
    public void onMessage(Message m) {


    QueueConnection conn = null;
    QueueSession session = null;
    QueueSender sender = null;
    try {
    m.acknowledge();
    ObjectMessage tm = (ObjectMessage) m;


    logger.info("LLego el mensaje al MB");
                     ....
                     .....  
    }

    ■ Bibliografia


    1 comentario:

    1. Estimado,
      se que esta publicación es antigua, pero es lo que necesito y de manera urgente, en lo posible me podrias enviar las clases a mi correo electronico (ipalm001@gmail.com)
      gracias por el blog es muy bueno

      ResponderEliminar