IdentifiantMot de passe
Loading...
Mot de passe oublié ?Je m'inscris ! (gratuit)
Développons en Java v 2.20   Copyright (C) 1999-2021 Jean-Michel DOUDOUX.   
[ Précédent ] [ Sommaire ] [ Suivant ] [ Télécharger ]      [ Accueil ] [ Commentez ]


 

65. JMS (Java Message Service)

 

chapitre 6 5

 

Niveau : niveau 4 Supérieur 

 

JMS, acronyme de Java Message Service, est une API pour permettre un dialogue standard entre des applications ou des composants grâce àdes brokers de messages ou MOM (Middleware Oriented Messages). Elle permet donc d'utiliser des services de messaging dans des applications Java comme le fait l'API JDBC pour les bases de données.

La page officielle de JMS est à l'URL : http://www.oracle.com/technetwork/java/jms/.

Ce chapitre contient plusieurs sections :

 

65.1. La présentation de JMS

JMS a été intégré à la plate-forme J2EE à partir de la version 1.3. Il n'existe pas d'implémentation officielle de cette API avant cette version. JMS est utilisable avec les versions antérieures mais elle oblige à utiliser un outil externe qui implémente l'API.

Chaque fournisseur (provider) doit fournir une implémentation de ses spécifications. Il existe un certain nombre d'outils qui implémentent JMS dont la majorité sont des produits commerciaux.

Dans la version 1.3 du J2EE, JMS peut être utilisé dans un composant web ou un EJB, un type d'EJB particulier a été ajouté pour traiter les messages et des échanges JMS peuvent être intégrés dans une transaction gérée avec JTA (Java Transaction API).

JMS définit plusieurs entités :

  • Un provider JMS : outil qui implémente l'API JMS pour échanger les messages : ce sont les brokers de messages
  • Un client JMS : composant écrit en java qui utilise JMS pour émettre et/ou recevoir des messages.
  • Un message : données échangées entre les composants

Différents objets utilisés avec JMS sont généralement stockés dans l'annuaire JNDI du serveur d'applications ou du provider du MOM :

  • La fabrique de connexions (ConnectionFactory)
  • Les destinations à utiliser (Queue et Topic)

JMS définit deux modes pour la diffusion des messages :

  • Point à point (Point to point) : dans ce mode un message est envoyé par un producteur et est reçu par un unique consommateur. Le support utilisé pour la mise en oeuvre de ce mode est la file (queue). Le message émis est stocké dans la file jusqu'à ce que le consommateur le lise et envoie une notification de réception du message. A ce moment là le message est supprimé de la file. Le message a généralement une date d'expiration.
  • Publication / souscription (publish/subscribe) : dans ce mode un message est envoyé par un producteur et est reçu par un ou plusieurs consommateurs. Le support utilisé pour la mise en oeuvre de ce mode est le sujet (topic). Chaque consommateur doit s'abonner à un sujet (souscription). Seuls les messages émis à partir de cet abonnement sont accessibles par le consommateur.

Dans la version 1.0 de JMS, ces modes utilisent  des interfaces distinctes.
Dans la version 1.1 de JMS, ces interfaces sont toujours utilisables mais il est aussi possible d'utiliser des interfaces communes à ces modes ce qui les rend interchangeables.

Les messages sont asynchrones mais JMS définit deux modes pour consommer un message :

  • Mode synchrone : ce mode nécessite l'appel de la méthode receive() ou d'une de ses surcharges. Dans ce cas, l'application est arrêtée jusqu'à l'arrivée du message. Une version surchargée de cette méthode permet de rendre la main après un certain timeout.
  • Mode asynchrone : il faut définir un listener qui va lancer un thread attendant les messages et exécutant une méthode à leur arrivée.

JMS propose un support pour différents types de messages : texte brut, flux d'octets, objets Java sérialisés, ...

 

65.2. Les services de messages

Les brokers de messages ou MOM (Middleware Oriented Message) permettent d'assurer l'échange de messages entre deux composants nommés clients. Ces échanges peuvent se faire dans un contexte interne (pour l'EAI) ou un contexte externe (pour le B2B).

Les deux clients n'échangent pas directement des messages : un client envoie un message et le client destinataire doit demander la réception du message. Le transfert du message et sa persistance sont assurés par le broker.

Les échanges de message sont :

  • asynchrones :
  • fiables : les messages ne sont délivrés qu'une et une seule fois

Les MOM représentent le seul moyen d'effectuer un échange de messages asynchrones. Ils peuvent aussi être très pratiques pour l'échange synchrone de messages plutôt que d'utiliser d'autres mécanismes plus compliqués à mettre en oeuvre (sockets, RMI, CORBA ... ).

Les brokers de messages peuvent fonctionner selon deux modes :

  • le mode point à point (point to point)
  • le mode publication/abonnement (publish/subscribe)

Le mode point à point (point to point) repose sur le concept de files d'attente (queues). Le message est stocké dans une file d'attente puis il est lu dans cette file ou dans une autre. Le transfert du message d'une file à l'autre est réalisé par le broker de message.

Chaque message est envoyé dans une seule file d'attente. Il y reste jusqu'à ce qu'il soit consommé par un client et un seul. Le client peut le consommer ultérieurement : la persistance est assurée par le broker de message.

Le mode publication/abonnement repose sur le concept de sujets (Topics). Plusieurs clients peuvent envoyer des messages dans ce topic. Le broker de message assure l'acheminement de ce message à chaque client qui se sera préalablement abonné à ce topic. Le message possède donc potentiellement plusieurs destinataires. L'émetteur du message ne connaît pas les destinataires qui se sont abonnés.

 

65.3. Le package javax.jms

Ce package et ses sous-packages contiennent plusieurs interfaces qui définissent l'API.

  • Connection
  • Session
  • Message
  • MessageProducer
  • MessageListener

 

65.3.1. La fabrique de connexions

Un objet de type Factory produit une connexion permettant l'accès au broker de messages. Il faut fournir plusieurs paramètres à l'objet de type Factory.

Il existe deux types de fabriques : QueueConnectionFactory et TopicConnectionFactory selon le type d'échange que l'on fait. Ce sont des interfaces que le broker de messages doit implémenter pour fournir des objets.

Pour obtenir un objet de ce type, il faut soit instancier directement un tel objet soit faire appel à JNDI pour l'obtenir. Cette dernière solution est préférable car elle est plus portable.

La fabrique de type ConnectionFactory permet d'obtenir une instance de l'interface Connection. Cette instance est du type de l'implémentation fournie par le provider, ce qui permet de proposer une manière unique d'obtenir une instance de chaque implémentation.

Chaque provider fournit sa propre solution pour gérer les objets contenus dans l'annuaire JNDI.

 

65.3.2. L'interface Connection

Cette interface définit des méthodes pour la connexion au broker de messages.

Cette connexion doit être établie en fonction du mode utilisé :

  • l'interface QueueConnection pour le mode point à point
  • l'interface TopicConnection pour le mode publication/abonnement

Pour obtenir l'un ou l'autre, il faut utiliser un objet de type Factory correspondant au type QueueConnectionFactory ou TopicConnectionFactory avec la méthode correspondante : createQueueConnection() ou createTopicConnection().

La classe qui implémente cette interface se charge du dialogue avec le broker de messages.

La méthode start() permet de démarrer la connexion.

Exemple :
            connection.start();

La méthode stop() permet de suspendre temporairement la connexion.

La méthode close() permet de fermer la connexion.

Remarque : il est important de fermer explicitement la connexion lorsqu'elle devient inutile en utilisant la méthode close().

 

65.3.3. L'interface Session

Elle représente un contexte transactionnel de réception et d'émission pour une connexion donnée.

C'est d'ailleurs à partir d'un objet de type Connection que l'on crée une ou plusieurs sessions.

La session est monothread : si l'application utilise plusieurs threads qui échangent des messages, il faut définir une session pour chaque thread.

C'est à partir d'un objet session que l'on crée des messages et des objets à envoyer et à recevoir.

Comme pour la connexion, la création d'un objet de type Session dépend du mode de fonctionnement. L'interface Session possède deux interfaces filles :

  • l'interface QueueSession pour le mode point à point
  • l'interface TopicSession pour le mode publication/abonnement

Pour obtenir l'un ou l'autre, il faut utiliser un objet Connection correspondant de type QueueConnection ou TopicConnection avec sa méthode associée : createQueueSession() ou createTopicSession().

Ces deux méthodes demandent deux paramètres : un booléen qui indique si la session gère une transaction et une constante qui précise le mode d'accusé de réception des messages.

Les messages sont considérés comme traités par le MOM à la réception d'un accusé de réception. Celui-ci est fourni au MOM selon le mode utilisé. Il existe trois modes d'accusés de réception (trois constantes sont définies dans l'interface Session) :

  • AUTO_ACKNOWLEDGE : l'accusé de réception est automatique, le MOM reçoit l'accusé de réception à la réception du message que ce dernier soit traité ou non par l'application
  • CLIENT_ACKNOWLEDGE : le MOM reçoit explicitement l'acquittement de la part de l'application, c'est le client qui envoie l'accusé grâce à l'appel de la méthode acknowledge() du message
  • DUPS_OK_ACKNOWLEDGE : ce mode permet d'indiquer au MOM qu'il peut envoyer plusieurs fois le message à une même destination. Ce mode peut améliorer les performances de certains MOM notamment avec un nombre de messages très important.

L'interface Session définit plusieurs méthodes dont les principales sont :

Méthode Rôle
void close() Fermer la session
void commit() Valider la transaction
XXX createXXX() Créer un Message dont le type est XXX
void rollback() Invalider la transaction

 

65.3.4. Les messages

Les messages sont encapsulés dans un objet de type javax.jms.Message : ils doivent obligatoirement implémenter l'interface Message ou l'une de ses sous-classes.

Un message est constitué de trois parties :

  • L'en-tête (header) : contient des données techniques
  • Les propriétés (properties)  : contient des données fonctionnelles
  • Le corps du message (body) : contient les données du message

L'interface Session propose plusieurs méthodes createXXXMessage() pour créer des messages contenant des données au format XXX.

Il existe aussi pour chaque format des interfaces filles de l'interface Message :

  • BytesMessage : message composé d'octets
  • MapMessage : message composé de paires clé/valeur
  • ObjectMessage : message contenant un objet sérialisé
  • StreamMessage : message issu d'un flux
  • TextMessage : message contenant du texte

 

65.3.4.1. L'en-tête

Cette partie du message est constituée d'un certain nombre de champs prédéfinis qui contiennent des données pour identifier et acheminer le message.

La plupart de ces données sont renseignées lors de l'appel à la méthode send() ou publish().

L'en-tête contient des données standardisées dont le nom commence par JMS (JMSDestination, JMSDeliveryMode, JMSExpiration, JMSPriority, JMSMessageID, JMSTimestamp, JMSRedelivered,  JMSCorrelationID, JMSReplyTo et JMSType)

Les champs les plus importants sont :

Nom Rôle
JMSMessageID Identifiant unique du message
JMSDestination File d'attente ou topic destinataire du message
JMSCorrelationID Utilisé pour synchroniser de façon applicative deux messages de la forme requête/réponse. Dans ce cas, dans le message réponse, ce champ contient le messageID du message requête

Les propriétés contiennent des données fonctionnelles sous la forme de paires clé/valeur. Certaines propriétés peuvent aussi être positionnées par l'implémentation du provider. Leurs noms commencent par JMS_ suivis du nom du provider.

 

65.3.4.2. Les propriétés

Ce sont des champs supplémentaires : certains sont définis par JMS mais il est possible d'ajouter ses propres champs.

Cette partie du message est optionnelle. Les propriétés permettent de définir des champs qui seront utilisées pour fournir des données supplémentaires ou pour filtrer le message.

 

65.3.4.3. Le corps du message

Il contient les données du message et est formaté selon son type.

Cette partie du message est optionnelle. Les messages peuvent être de plusieurs types, définis dans les interfaces suivantes :

type Interface Rôle
bytes BytesMessage échange d'octets
texte TextMessage échange de données texte (XML par exemple)
object ObjectMessage échange d'objets Java qui doivent être sérialisables
Map MapMessage échange de données sous la forme clé/valeur. La clé doit être une chaîne de caractères et la valeur de type primitive
Stream StreamMessage échange de données en provenance d'un flux

Il est possible de définir son propre type qui doit obligatoirement implémenter l'interface Message.

C'est un objet de type Session qui contient les méthodes nécessaires à la création d'un message selon son type.

Lors de la réception d'un message, celui-ci est toujours de type Message : il faut effectuer un transtypage en fonction de son type en utilisant l'opérateur instanceof. A ce moment, il faut utiliser le getter correspondant pour obtenir les données.

Exemple :
   Message message = ...

   if (message instanceof TextMessage) {
      TextMessage textMessage = (TextMessage) message;
      System.out.println("message: " + textMessage.getText());
   }

 

65.3.5. L'envoi de messages

L'interface MessageProducer est la super-interface des interfaces qui définissent des méthodes pour l'envoi de messages.

Il existe deux interfaces filles selon le mode de fonctionnement pour envoyer un message : QueueSender et TopicPublisher.

Ces objets sont créés à partir d'un objet représentant la session :

  • la méthode createSender() pour obtenir un objet de type QueueSender
  • la méthode createPublisher() pour obtenir un objet de type TopicPublisher

Ces objets peuvent être liés à une entité physique par exemple une file d'attente particulière pour un objet de type QueueSender. Si ce n'est pas le cas, cette entité devra être précisée lors de l'envoi du message en utilisant une version surchargée de la méthode chargée de l'émission du message.

 

65.3.6. La réception de messages

L'interface MessageConsumer est la super-interface des interfaces qui définissent des méthodes pour la réception de messages.

Il existe des interfaces selon le mode de fonctionnement pour recevoir un message QueueReceiver et TopicSubscriber.

La réception d'un message peut se faire avec deux modes :

  • synchrone : dans ce cas, l'attente d'un message bloque l'exécution du reste du code
  • asynchrone : dans ce cas, un thread est lancé qui attend le message et appelle une méthode (callback) à son arrivée. L'exécution de l'application n'est pas bloquée.

L'interface MessageConsumer définit plusieurs méthodes dont les principales sont :

Méthode Rôle
close() Fermer l'objet qui reçoit les messages pour le rendre inactif
Message receive() Attendre et retourner le message à son arrivée
Message receive(long) Attendre durant le nombre de millisecondes précisé en paramètre et renvoyer le message s'il arrive durant ce laps de temps
Message receiveNoWait() Retourner le prochain message s'il y en a un d'immédiatement disponible
setMessageListener(MessageListener) Associer un Listener pour traiter les messages de façon asynchrone

Pour obtenir un objet qui implémente l'interface QueueReceiver, il faut utiliser la méthode createReceiver() d'un objet de type QueueSession.

Pour obtenir un objet qui implémente l'interface TopicSubscriber, il faut utiliser la méthode createSubscriber() d'un objet de type TopicSession.

 

65.4. L'utilisation du mode point à point (queue)

 

65.4.1. La création d'une fabrique de connexions : QueueConnectionFactory

L'objet QueueConnectionFactory permet d'obtenir une QueueConnection pour se connecter au broker de messages.

Pour obtenir un objet de ce type, il faut soit instancier directement un tel objet soit faire appel à JNDI pour l'obtenir.

Exemple : avec MQSeries 
String qManager = ...
String hostName = ...
String channel = ...

MQQueueConnectionFactory factory = new MQQueueConnectionFactory();
factory.setQueueManager(qManager);
factory.setHostName(hostName);
factory.setChannel(channel);
factory.setTransportType(JMSC.MQJMS_TP_CLIENT_MQ_TCPIP);

Il est cependant préférable de faire appel à JNDI pour obtenir un objet de type QueueConnectionFactory. Une instance de cet objet est stockée dans un annuaire par le broker et il suffit de se connecter à cet annuaire avec JNDI pour obtenir l'instance de la fabrique.

 

65.4.2. L'interface QueueConnection

Cette interface hérite de l'interface Connection.

Pour obtenir un objet qui implémente cette interface, il faut utiliser un objet de type QueueConnectionFactory avec la méthode correspondante : createQueueConnection().

Exemple :
            QueueConnection connection = factory.createQueueConnection();
            connection.start();

L'interface QueueConnection définit plusieurs méthodes dont la principale est :

Méthode Rôle
QueueSession createQueueSession(boolean, int) Renvoyer un objet qui définit la session. Le booléen précise si la session gère une transaction. L'entier précise le mode d'accusé de réception.

 

65.4.3. La session : l'interface QueueSession

Elle hérite de l'interface Session.

Pour obtenir un objet qui implémente cette interface, il faut utiliser la méthode createQueueSession() d'un objet de type QueueConnection.

Exemple :
QueueSession session = connection.createQueueSession(false, Session.AUTO_ACKNOWLEDGE);

L'interface QueueSession définit plusieurs méthodes dont les principales sont :

Méthode Rôle
QueueReceiver createQueueReceiver(Queue) Renvoyer un objet qui définit une file d'attente de réception
QueueSender createQueueSender(Queue) Renvoyer un objet qui définit une file d'attente d'émission

 

65.4.4. L'interface Queue

Un objet qui implémente cette interface encapsule une file d'attente particulière.

Pour obtenir un objet qui implémente cette interface, il faut utiliser la méthode createQueue() d'un objet de type QueueSession.

Exemple avec MQseries :
   Queue fileEnvoi = session.createQueue(
	  "queue:///file.out?expiry=0&persistence=1&targetClient=1");

 

65.4.5. La création d'un message

Pour créer un message, il faut utiliser une méthode createXXXMessage() d'un objet QueueSession où XXX représente le type du message.

Exemple :
            String message = "bonjour";
            TextMessage textMessage = session.createTextMessage();
            textMessage.setText(message);

 

65.4.6. L'envoi de messages : l'interface QueueSender

Cette interface hérite de l'interface MessageProducer.

Pour obtenir un objet qui implémente cette interface, il faut utiliser la méthode createQueueSender() d'un objet de type QueueSession.

Exemple :
           QueueSender queueSender = session.createSender(fileEnvoi);

Il est possible de fournir un objet de type Queue qui représente la file d'attente : dans ce cas, l'objet QueueSender est lié à cette file d'attente. Si l'on ne précise pas de file d'attente (null fourni en paramètre), il faudra obligatoirement utiliser une version surchargée de la méthode send() lors de l'envoi pour indiquer celle à utiliser.

Avec un objet de type QueueSender, la méthode send() permet l'envoi d'un message dans la file d'attente. Cette méthode possède plusieurs surcharges :

Méthode Rôle
void send(Message) Envoyer le message dans la file d'attente définit dans l'objet de type QueueSender
void send(Queue, Message) Envoyer le message dans la file d'attente fournie en paramètre

Exemple :
           queueSender.send(textMessage);

 

65.4.7. La réception de messages : l'interface QueueReceiver

Cette interface hérite de l'interface MessageConsumer.

Pour obtenir un objet qui implémente cette interface, il faut utiliser la méthode createQueueReceiver() à partir d'un objet de type QueueSession.

Exemple :
           QueueReceiver queueReceiver = session.createReceiver(fileReception);

Il est possible de fournir un objet de type Queue qui représente la file d'attente : dans ce cas, l'objet QueueReceiver est lié à cette file d'attente. Si l'on ne précise pas de file d'attente (null fourni en paramètre), dans ce cas, il faudra obligatoirement utiliser une version surchargée de la méthode receive() pour l'indiquer.

Cette interface ne définit qu'une seule méthode supplémentaire :

Méthode Rôle
Queue getQueue() Renvoyer la file d'attente associée à l'objet

La réception de messages peut se faire dans le mode synchrone ou asynchrone.

 

65.4.7.1. La réception dans le mode synchrone

Dans ce mode, le programme est interrompu jusqu'à l'arrivée d'un nouveau message. Il faut utiliser la méthode receive() héritée de l'interface MessageConsumer. Il existe plusieurs méthodes et surcharges de ces méthodes qui permettent de répondre à plusieurs utilisations :

  • receiveNoWait() : renvoie un message présent sans attendre
  • receive(long) : renvoie un message arrivé durant le délais fourni en paramètre
  • receive() : renvoie le message dès qu'il arrive
Exemple :
           Message message = null;
           message = queueReceiver.receive(10000);

 

65.4.7.2. La réception dans le mode asynchrone

Dans ce mode, le programme n'est pas interrompu mais un objet écouteur va être enregistré auprès de l'objet de type QueueReceiver. Cet objet qui implémente l'interface MessageListener va être utilisé comme gestionnaire d'événements lors de l'arrivée d'un nouveau message.

L'interface MessageListener ne définit qu'une seule méthode qui reçoit en paramètre le message : onMessage(). C'est cette méthode qui sera appelée lors de la réception d'un message.

 

65.4.7.3. La sélection de messages

Une version surchargée de la méthode createReceiver() d'un objet de type QueueSession permet de préciser dans ses paramètres une chaîne de caractères qui va servir de filtre sur les messages à recevoir.

Dans ce cas, le filtre est appliqué par le broker de message plutôt que par le programme.

Cette chaîne de caractères contient une expression qui doit avoir une syntaxe proche d'une condition SQL. Les critères de la sélection doivent porter sur des champs inclus dans l'en-tête ou dans les propriétés du message. Il n'est pas possible d'utiliser des données du corps du message pour effectuer le filtre.

Exemple : envoi d'un message requête et attente de sa réponse. Dans ce cas, le champ JMSCorrelationID du message réponse contient le JMSMessageID de la requête
            String messageEnvoi = "bonjour";
            TextMessage textMessage = session.createTextMessage();
            textMessage.setText(messageEnvoi);
            queueSender.send(textMessage);
            
            int correlId = textMessage.getJMSMessageID();
            QueueReceiver queueReceiver = session.createReceiver(
              fileEnvoi, "JMSCorrelationID = '" + correlId +"'");
            Message message = null;
            message = queueReceiver.receive(10000);

 

65.5. L'utilisation du mode publication/abonnement (publish/subscribe)

 

65.5.1. La création d'une fabrique de connexions : TopicConnectionFactory

Un objet TopicConnectionFactory permet d'obtenir une TopicConnection pour se connecter au broker de messages.

Pour obtenir un objet de ce type, il faut soit instancier directement un tel objet soit faire appel à JNDI pour l'obtenir.

 

65.5.2. L'interface TopicConnection

Cette interface hérite de l'interface Connection.

Pour obtenir un objet qui implémente cette interface, il faut utiliser un objet factory correspondant de type TopicConnectionFactory avec la méthode associée : createTopicConnection().

Exemple :
            TopicConnection connection = factory.createTopicConnection();
            connection.start();

L'interface TopicConnection définit plusieurs méthodes dont la principale est :

Méthode Rôle
TopicSession createTopicSession(boolean, int) Renvoyer un objet qui définit la session. Le booléen précise si la session gère une transaction. L'entier précise le mode d'accusé de réception.

 

65.5.3. La session : l'interface TopicSession

Elle hérite de l'interface Session.

Pour obtenir un objet qui implémente cette interface, il faut utiliser la méthode createTopicSession() d'un objet connexion de type TopicConnection.

Exemple :
      TopicSession session = connection.createTopicSession(false,Session.AUTO_ACKNOWLEDGE);

L'interface TopicSession définit plusieurs méthodes dont les principales sont :

Méthode Rôle
TopicSubscriber createSubscriber(Topic) Renvoyer un objet qui permet la réception de messages dans un topic
TopicPublisher createPublisher(Topic) Renvoyer un objet qui permet l'envoi de messages dans un topic
Topic createTopic(String) Créer un topic correspondant à la désignation fournie en paramètre

 

65.5.4. L'interface Topic

Un objet qui implémente cette interface encapsule un sujet.

Pour obtenir un objet qui implémente cette interface, il faut utiliser la méthode createTopic() d'un objet de type TopicSession.

 

65.5.5. La création d'un message

Pour créer un message, il faut utiliser une méthode createXXXMessage() d'un objet TopicSession où XXX représente le type du message.

Exemple :
            String message = "bonjour";
            TextMessage textMessage = session.createTextMessage();
            textMessage.setText(message);

 

65.5.6. L'émission de messages : l'interface TopicPublisher

Cette interface hérite de l'interface MessageProducer.

Avec un objet de type TopicPublisher, la méthode publish() permet l'envoi du message. Cette méthode possède plusieurs surcharges :

Méthode Rôle
void publish(Message) Envoyer le message dans le topic défini dans l'objet de type TopicPublisher
void publish(Topic, Message) Envoyer le message dans le topic fourni en paramètre

 

65.5.7. La réception de messages : l'interface TopicSubscriber

Cette interface hérite de l'interface MessageProducer.

Pour obtenir un objet qui implémente cette interface, il faut utiliser la méthode createSubscriber() à partir d'un objet de type TopicSession.

Exemple :
      TopicSubscriber topicSubscriber = session.createSubscriber(topic);

Il est possible de fournir un objet de type Topic qui représente le topic : dans ce cas, l'objet TopicSubscriber est lié à ce topic. Si l'on ne précise pas de topic (null fourni en paramètre), il faudra obligatoirement utiliser une version surchargée de la méthode receive() lors de l'envoi pour l'indiquer.

Cette interface ne définit qu'une seule méthode supplémentaire :

Méthode Rôle
Topic getTopic() Renvoyer le topic associé à l'objet

 

65.6. La gestion des erreurs

Les erreurs d'exécution liées à l'utilisation de JMS sont rapportées sous la forme d'exceptions. La plupart des méthodes des objets JMS peuvent lever une exception de type JMSException.

Lors de l'utilisation d'un message asynchrone, il est possible d'enregistrer un listener de type ExceptionListener. Une instance de ce listener redéfinit la méthode onException() qui attend en paramètre une instance de type JMSException

 

65.6.1. Les exceptions de JMS

Plusieurs exceptions sont définies par l'API JMS. La classe mère de toutes ces exceptions est la classe JMSException.

Les exceptions définies sont : IllegalStateException, InvalidClientIDException, InvalidDestinationException, InvalidSelectorException, JMSSecurityException, MessageEOFException, MessageFormatException, MessageNotReadableException, MessageNotWriteableException, ResourceAllocationException, TransactionInProgressException,TransactionRolledBackException

La méthode getErrorCode() permet d'obtenir le code erreur spécifique du produit sous la forme d'une chaîne de caractères.

 

65.6.2. L'interface ExceptionListener

Ce listener permet d'être informé des exceptions levées par le provider JMS (exemple : arrêt du serveur, problème réseau, ..).

Cette interface définit la méthode OnException() qui doit être implémentée pour contenir les traitements en cas d'erreur.

Exemple :
import javax.jms.ExceptionListener;
import javax.jms.JMSException;

public class MonExceptionListener implements ExceptionListener {

    public void onException(JMSException jmse) {
        jmse.printStackTrace(System.err);
    }
}

 

65.7. JMS 1.1

La version 1.1 de JMS propose une utilisation de l'API indépendamment du domaine utilisé et ainsi d'unifier l'API pour les modes d'utilisation point à point et publication/souscription. Avec cette version, l'utilisation d'un mode ou l'autre ne nécessite plus d'interfaces spécifiques au mode utilisé, ce qui rend l'API plus simple à utiliser.

JMS 1.1 contient toujours toutes les interfaces dépendantes du domaine utilisé mais propose aussi l'enrichissement des interfaces communes pour permettent leur utilisation indépendamment du domaine utilisé.

La version 1.0.2 de l'API définit trois familles d'interfaces : commune, Queue et Topic. Pour chaque mode, une interface spécifique est définie pour la fabrique, la connexion, la session, la production et la consommation de messages.

Interfaces communes
Interfaces point à point
Interfaces publication/souscription
ConnectionFactory
QueueConnectionFactory
TopicConnectionFactory
Connection
QueueConnection
TopicConnection
Session
QueueSession
TopicSession
Destination
Queue
Topic
MessageProducer
QueueSender
TopicPublisher
MessageConsumer
QueueReceiver
QueueBrowser
TopicSubscriber

JMS propose aussi 9 interfaces supplémentaires pour le support des transactions distribuées avec XA.

Avec JMS 1.1 il est possible de généraliser l'utilisation des interfaces communes que ce soit pour une utilisation dans le mode point à point ou publication/souscription. Ces interfaces communes ont été enrichies pour rendre les interfaces filles polymorphes. Par exemple, l'interface MessageProducer possède une méthode send() pour permettre à un client d'envoyer un message dans un mode ou un autre. Ainsi l'interface MessageProducer permet de réaliser les actions des interfaces QueueSender et TopicPublisher.

Le code devient donc plus simple, plus générique et plus réutilisable.

Ceci  permet de rendre le code indépendant de la solution utilisée : l'utilisation d'une instance de type Destination se fait pour une Queue ou pour un Topic.

Ceci permet aussi dans une même session d'utiliser une queue et un topic simultanément alors que dans les versions précédentes de JMS, il était nécessaire de définir deux sessions.

JMS 1.1 permet l'utilisation de destinations qui n'ont pas besoin de savoir si celles-ci concernent une Queue ou un Topic : le code écrit peut utiliser indifféremment l'un ou l'autre.

La version 1.1 a été diffusée en avril 2002. Cette version est intégrée à J2EE 1.4 : c'est un pré-requis pour la version 2.1 des EJB.

 

65.7.1. L'utilisation de l'API JMS 1.0 et 1.1

Point à point Publication/souscription Point à point ou Publication/souscription
JMS 1.0 ou 1.1 JMS 1.0 ou 1.1 JMS 1.1 uniquement
Obtenir une instance de la fabrique de type connectionFactory
Obtenir une instance de QueueConnectionFactory à partir de JNDI Obtenir une instance de TopicConnectionFactory à partir de JNDI Obtenir une instance de ConnectionFactory à partir de JNDI
Créer une instance de Connection
Appel de la méthode createQueueConnection() de la fabrique Appel de la méthode createTopicConnection() de la fabrique Appel de la méthode createConnection() de la fabrique
Créer une instance de Session
Appel de la méthode createQueueSession() de la connexion Appel de la méthode createTopicSession() de la connexion Appel de la méthode createSession de la connexion
Créer une instance de MessageProducer
Créer une instance de QueueSender en utilisant la méthode createSender() de la session Créer une instance de TopicPublisher en utilisant la méthode createPublisher() de la session Créer une instance de TopicPublisher en la méthode createProducer() de la session
Envoyer un message
Utiliser la méthode send() de la classe QueueSender Utiliser la méthode publish() de la classe TopicPublisher Utiliser la méthode send() de la classe MessageProducer
Créer une instance de MessageConsumer
Créer une instance de la classe QueueReceiver en utilisant la méthode createReceiver() de la session Créer une instance de la classe QueueReceiver en utilisant la méthode createReceiver() de la session Créer une instance de la classe MessageConsumer en utilisant la méthode createConsumer() de la session
Recevoir un message de façon synchrone
Appel de la méthode receive() de l'instance de QueueReceiver Appel de la méthode receive() de l'instance de TopicSubscriber Appel de la méthode receive() de l'instance de MessageConsumer
Recevoir un message de façon asynchrone
Implémenter l'interface MessageListerner et l'enregistrer avec la méthode setMessageListener() de la classe QueueReceiver Implémenter l'interface MessageListerner et l'enregistrer avec la méthode setMessageListener() de la classe TopicSubscriber Implémenter l'interface MessageListerner et l'enregistrer avec la méthode setMessageListener() de la classe MessageConsumer

 

65.7.2. L'interface ConnectionFactory

Un objet de type ConnectionFactory est une fabrique qui permet d'obtenir une instance de l'interface Connection. Il faut interroger un annuaire JNDI pour obtenir une instance de cette fabrique.

Avec JMS 1.1, il est maintenant possible d'utiliser une instance de ConnectionFactory directement : il n'est plus nécessaire comme dans les versions précédentes d'utiliser une fabrique dédiée à l'utilisation de Queue ou de Topic.

Remarque : Avec certaines implémentations, il est possible de créer manuellement une instance de ConnectionFactory mais cela nécessite de faire appel à des objets spécifiques à l'implémentation ce qui rend le code dépendant et donc moins portable.

 

65.7.3. L'interface Connection

L'interface Connection permet de se connecter au serveur JMS. Avec JMS1.1, pour obtenir une instance du type Connection, il faut utiliser une des surcharges de la méthode createConnection() de l'interface ConnectionFactory.

La méthode start() de l'interface Connection permet de démarrer la connexion.

 

65.7.4. L'interface Session

Une session est une fabrique de messages et elle encapsule un contexte dans lequel les messages sont produits et consommés.

Une session JMS permet de créer les objets de type MessageProducer, MessageConsumer  et Message.

Pour obtenir une instance de l'interface Session, il faut utiliser la méthode createSession(). Depuis JMS 1.1, cette méthode est disponible dans l'interface Connection.

Depuis JMS 1.1, de nouvelles méthodes ont été ajoutées à l'interface Session :

Méthode Rôle
MessageProducer createProducer()  
MessageConsumer createConsumer()  
Queue createQueue()  
Topic createTopic()  
TopicSubscriber createDurableSubscriber()  
QueueBrowser createBrowser()  
TemporaryTopic createTemporaryTopic()  
TemporaryQueue createTemporaryQueue()  
void unsubscribe()  

Pour obtenir une session, il faut utiliser la méthode createSession() de l'interface Connection.

Cette méthode attend deux paramètres :

  • Un booléen qui précise si la session est transactionnelle (true) ou non (false)
  • Un entier qui précise le mode d'acquittement de la réception d'un message (Session.AUTO_ACKNOWLEDGE, Session.CLIENT_ACKNOWLEDGE, ou Session.DUPS_OK_ACKNOWLEDGE  )

Une connexion JMS est thread-safe par contre la session JMS ne l'est pas : il faut donc utiliser une session par thread.

Une session JMS peut être transactionnelle en passant la valeur true au paramètre transacted des methodes  createSession(), createQueueSession() ou createTopicSession().
Pour valider la transaction, il faut utiliser la méthode commit() de l'interface Session.

 

65.7.5. L'interface Destination

L'interface Destination est la super-interface des interfaces Queue et Topic.

Avec JMS 1.1, il est préférable d'utiliser cette interface plutôt que d'utiliser une interface dédiée au domaine utilisé.

 

65.7.6. L'interface MessageProducer

L'interface MessageProducer permet d'envoyer un message vers une destination indépendamment du domaine utilisé (queue ou topic)

Avec JMS 1.1, une instance est obtenue en utilisant la méthode createProducer() de l'interface Session avec en paramètre la destination. Il est aussi possible de créer un MessageProducer sans préciser la destination. Dans ce cas, cette dernière devra être indiquée lors de l'envoi du message.

Depuis JMS 1.1, il est possible d'utiliser une instance de cette interface pour produire des messages. Avec JMS 1.0, il était nécessaire d'utiliser TopicPublisher ou QueueSender.

Depuis JMS 1.1, de nouvelles méthodes ont été ajoutées à l'interface MessageProducer notamment la méthode getDestination() et plusieurs surcharges de la méthode send().

Les surcharges de la méthode send() de l'interface MessageProducer permettent d'envoyer un message fourni en paramètre.

 

65.7.7. L'interface MessageConsumer

L'interface MessageConsumer permet la réception de messages d'une destination. Une instance est obtenue en utilisant la méthode createConsumer() de l'interface Session. Cette méthode attend en paramètre une destination.

 

65.7.7.1. La réception synchrone de messages

La méthode receive() de l'interface MessageConsumer permet d'attendre l'arrivée d'un nouveau message en bloquant le reste de l'application. Une version surchargée attend en paramètre un nombre de millisecondes indiquant la durée d'attente maximale.

La méthode receiveNoWait() permet de recevoir un éventuel nouveau message sans attendre.

Un message reçu est retourné par ces méthodes sous la forme d'un objet de type Message. Pour traiter le message, il faut caster ce résultat en fonction du type réel de l'objet.

 

65.7.7.2. La réception asynchrone de messages

La méthode receive() de la classe MessageConsumer permet de recevoir un message de façon synchrone. Lors de l'appel à cette méthode un message est obtenu ou non.

L'arrivée d'un message est cependant rarement prévisible et surtout ne doit pas bloquer l'exécution de l'application. Il est alors préférable de définir un listener et de l'enregistrer pour qu'il soit automatiquement exécuté à l'arrivée d'un message.

L'interface MessageListener permet de définir un listener pour la réception asynchrone de messages. Elle ne définit que la méthode onMessage() qui sera appelée lors de chaque réception d'un nouveau message de la destination.

La méthode onMessage() possède un paramètre de type Message encapsulant le message reçu. Il faut redéfinir cette méthode pour qu'elle exécute les traitements à réaliser sur les messages.

Le listener s'enregistre en utilisant la méthode setMessageListener() de la classe MessageConsumer().

Exemple :
  messageConsumer.setMessageListener(listener);

Remarque : il est important d'enregistrer le listener après que la connexion au serveur est réalisée (appel de la méthode start() de la Connection).

 

65.7.8. Le filtrage des messages

Il est possible de filtrer les messages reçus d'une destination au moyen d'un sélecteur (selector). Les fonctionnalités utilisables correspondent à un petit sous-ensemble de l'ensemble des fonctionnalités de SQL.

Le filtre ne peut s'appliquer que sur certaines données de l'en-tête : JMSDeliveryMode, JMSPriority, JMSMessageID, JMSCorrelationID, JMSType et JMSTimestamp

Le filtre peut aussi utiliser toutes les propriétés personnelles du message.

Exemple :
JMSPriority < 10

Lors de l'instanciation d'un objet de type MessageConsumer, il est possible de préciser le filtre des messages à recevoir sous la forme d'une chaîne de caractères. Cette chaîne est une expression qui précise le filtre à appliquer et est nommée selector..

Exemple :
	messageConsumer consumer = session.createConsumer(destination, "maPropriete = '1234' ") ;

Il est possible de définir ses propres propriétés et de les utiliser dans le filtre. Le nom de ces propriétés doit impérativement respecter les spécifications de JMS (par exemple, le nom ne peut pas commencer par JMSX ou JMS_).

La valeur d'une propriété peut être de type boolean, byte, short, int, long, float, double ou String.

Les valeurs des propriétés sont précisées avant l'envoi du message et ne peuvent plus être modifiées après l'envoi du message.

Les spécifications JMS ne précisent pas de règle pour l'utilisation d'une donnée sous la forme d'une propriété ou dans le corps du message. Il est cependant conseillé de réserver l'utilisation des propriétés pour des besoins spécifiques (filtre de messages par exemple).

Les filtres permettent à un client de ne recevoir que les messages dont les données de l'en-tête respectent le filtre précisé. Il n'est pas possible d'utiliser dans le filtre les données du corps du message.

Les messages retenus sont ceux dont l'évaluation de l'expression avec les valeurs de l'en-tête du message vaut true.

Le filtre ne peut pas être changé en cours d'exécution.

 

65.7.8.1. La définition du filtre

Le filtre, nommé selector est une chaîne de caractères définissant une expression dont la syntaxe est un sous-ensemble des expressions conditionnelles de la norme SQL 92.

Par défaut, le filtre est évalué de gauche à droite mais l'usage de parenthèses peut être mis en oeuvre pour modifier cet ordre.

Un selector peut contenir :

des séparateurs espaces, tabulations, retour chariot, ...
des littéraux des chaînes de caractères encodées en Unicode et entourées par de simples quotes
des numériques entiers correspondant au type Java long
des numériques flottants correspondant au type Java double
des booléens qui peuvent avoir les valeurs true ou false
des identifiants leur nom doit respecter ceux des identifiants Java et ne doivent pas correspondre à des mots clés (true, false, null, not, and, or, ...)
ils ne doivent pas commencer par JMSX ou JMS_
ils sont sensibles à la casse
ils ne peuvent pas  correspondre aux propriétés d'en-tête prédéfinis : JMSDeliveryMode, JMSPriority, JMSMessageID, JMSTimestamp, JMSCorrelationID ou JMSType
des parenthèses pour modifier l'ordre d'évaluation de l'expression
des expressions arithmétiques
conditionnelles
des opérateurs logiques NOT, AND, OR
des opérateurs de comparaisons =, >, >=, <, <=, <> (seuls = et <> sont utilisables avec des booléens et des chaînes de caractères)
des opérateurs arithmétiques +, -, *, /
l'opérateur between exemple : valeur between 5 and 9 est équivalent à valeur >= 5 et valeur <= 9, valeur not between 5 and 9 est équivalent à valeur < 5 ou valeur > 9
l'opérateur in permet la comparaison parmi plusieurs chaînes de caractères (exemple : valeur in ("aa", "bb", "cc") est équivalant à (valeur = "aa") ou (valeur = "bb") ou (valeur = "cc"))
l'opérateur like permet la comparaison par rapport à un motif : dans ce motif le caractère _ désigne un caractère quelconque, le caractère % désigne zéro ou plusieurs caractères, le caractères \ permet de déspécialiser les deux précédents caractères
L'opérateur is null permet de tester la valeur null d'une propriété ou l'existence de sa définition

 

65.7.9. Des exemples de mise en oeuvre

Exemple : envoie d'un message dans une queue
package com.jmdoudoux.test.openjms;

import javax.jms.Connection;
import javax.jms.ConnectionFactory;
import javax.jms.Destination;
import javax.jms.JMSException;
import javax.jms.MessageProducer;
import javax.jms.Session;
import javax.jms.TextMessage;
import javax.naming.Context;
import javax.naming.InitialContext;
import javax.naming.NamingException;

public class TestOpenJMS1 {

  public static void main(final String[] args) {
    Context context = null;
    ConnectionFactory factory = null;
    Connection connection = null;
    Destination destination = null;
    Session session = null;
    MessageProducer sender = null;
    try {
      context = new InitialContext();
      factory = (ConnectionFactory) context.lookup("ConnectionFactory");
      destination = (Destination) context.lookup("queue1");
      connection = factory.createConnection();
      session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
      sender = session.createProducer(destination);
      connection.start();

      final TextMessage message = session.createTextMessage();
      message.setText("Mon message");
      sender.send(message);
      System.out.println("Message envoye= " + message.getText());
    } catch (Exception e) {
      e.printStackTrace();
    } finally {
      if (context != null) {
        try {
          context.close();
        } catch (Exception e) {
          e.printStackTrace();
        }
      }

      if (connection != null) {
        try {
          connection.close();
        } catch (Exception e) {
          e.printStackTrace();
        }
      }
    }
  }
}

Pour exécuter correctement l'application il faut qu'un broker de messages JMS soit installé et configuré. Il suffit alors de fournir les paramètres de connexion à ce serveur.

Exemple : le fichier jndi.properties avec OpenJMS
java.naming.provider.url=tcp://localhost:3035
java.naming.factory.initial=org.exolab.jms.jndi.InitialContextFactory
java.naming.security.principal=admin
java.naming.security.credentials=openjms

Résultat :
Message envoye= Mon message

Grâce à la version 1.1 de JMS, pour envoyer un message dans le topic1, il suffit simplement le remplacer le nom JNDI de la destination

Exemple :
... 
      destination = (Destination) context.lookup("topic1");
...

Exemple : lecture d'un message dans une file d'attente
package com.jmdoudoux.test.openjms;

import javax.jms.Connection;
import javax.jms.ConnectionFactory;
import javax.jms.Destination;
import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.MessageConsumer;
import javax.jms.Session;
import javax.jms.TextMessage;
import javax.naming.Context;
import javax.naming.InitialContext;
import javax.naming.NamingException;

public class TestOpenJMS2 {

  public static void main(String[] args) {
    Context context = null;
    ConnectionFactory factory = null;
    Connection connection = null;
    Destination destination = null;
    Session session = null;
    MessageConsumer receiver = null;

    try {
      context = new InitialContext();
      factory = (ConnectionFactory) context.lookup("ConnectionFactory");
      destination = (Destination) context.lookup("queue1");
      connection = factory.createConnection();
      session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
      receiver = session.createConsumer(destination);
      connection.start();

        Message message = receiver.receive();
        if (message instanceof TextMessage) {
          TextMessage text = (TextMessage) message;
          System.out.println("message recu= " + text.getText());
        } else if (message != null) {
          System.out.println("Aucun message dans la file");
        }
    } catch (Exception e) {
      e.printStackTrace();
    } finally {
      if (context != null) {
        try {
          context.close();
        } catch (NamingException e) {
          e.printStackTrace();
        }
      }

      if (connection != null) {
        try {
          connection.close();
        } catch (JMSException e) {
          e.printStackTrace();
        }
      }
    }
  }
}

 

65.8. Les ressources relatives à JMS

Le site de JMS.

La Documentation de l'API JMS en version 1.0.2b et 1.1.

Pour mettre en oeuvre JMS, il faut une implémentation de l'API fournie soit par un serveur d'applications soit par une implémentation autonome.

Les principaux brokers de messages commerciaux sont :

Produit Société URL
Sonic MQ Progress software http://www.progress.com/fr/sonic/sonicmq.phpl
Swift MQ   http://www.swiftmq.com
Websphere MQ (MQ Series) IBM http://www-01.ibm.com/software/integration/wmq/#
Rendez vous Tibco http://www.tibco.com/products/soa/messaging/rendezvous/default.jsp

Il existe quelques brokers de messages open source :

Outils Description / URL
Apache ActiveMQ MOM Open Source de la fondation Apache
http://activemq.apache.org
OW2 Joram implémentation open source des spécifications JMS par le consortium OW2
http://joram.ow2.org/
OpenJMS implémentation Open Source des spécifications JMS
http://openjms.sourceforge.net/

 


Développons en Java v 2.20   Copyright (C) 1999-2021 Jean-Michel DOUDOUX.   
[ Précédent ] [ Sommaire ] [ Suivant ] [ Télécharger ]      [ Accueil ] [ Commentez ]