Niveau : | Supérieur |
L'API JMS propose fondamentalement deux grandes fonctionnalités : produire et consommer des messages.
Dans une application Java EE, la consommation de messages est assurée par des EJB de type MDB (Message Driven Bean). Dans les autres types d'applications, la consommation de messages se fait en utilisant des MessageListener de l'API JMS.
Spring offre une abstraction pour faciliter la mise en oeuvre de l'API JMS. Il propose une API qui fournit une abstraction dans la mise en oeuvre de JMS version 1.0.2 et 1.1.
Ce chapitre contient plusieurs sections :
Les classes de Spring dédiées à la mise en oeuvre de JMS sont dans le package org.springframework.jms du fichier spring-jms.jar.
Le package org.springframework.jms.core contient les classes et interfaces de base pour utiliser JMS avec Spring. Il contient notamment un template qui est un helper prennant en charge la création et la libération des ressources et délèguant les traitements spécifiques à un callback.
Le package org.springframework.jms.connection contient des classes pour la connexion et la gestion des transactions avec JMS. Il fournit une implémentation de ConnectionFactory qui peut être utilisée dans une application standalone et une implémentation de PlatformTransactionManager pour permettre une utilisation des ressources JMS dans un contexte transactionnel.
Le package org.springframework.jms.support propose des fonctionnalités pour transformer les exceptions de type checked de JMS en une hiérarchie plus compacte d'exceptions de type unchecked. Une sous-classe de JMSException dédiée à un fournisseur est transformée en UncategorizedJmsException.
Le package org.springframework.jms.support.converter contient des utilitaires pour convertir des messages en objets. Il fournit notamment la classe MessageConverter pour convertir un message JMS en objet Java.
Le package org.springframework.jms.support.destination fournit plusieurs fonctionnalités pour gérer les destinations JMS comme par exemple un service locator pour obtenir une destination stockée dans un annuaire JNDI.
La classe JmsTemplate est la classe de base pour faciliter l'envoi et la réception de messages JMS de façon synchrone. Cette classe se charge de gérer la création et la libération des ressources utiles pour JMS.
Le principe des templates est de fournir une classe de type helper qui facilite la mise en oeuvre de tâches communes à une fonctionnalité tout en délégant les tâches particulières à un callback qui implémente une interface particulière.
La classe JmsTemplate propose ainsi des méthodes permettant d'envoyer un message, de consommer un message de manière synchrone et de permettre un accès à la session JMS et au message producer.
Spring propose la classe JmsTemplate qui est un helper pour faciliter l'utilisation de JMS version 1.1. Sa classe fille JmsTemplate102 propose les mêmes fonctionnalités pour la version 1.0.2 de JMS.
L'envoi d'un message se fait en utilisant la méthode send() de la classe JmsTemplate qui attend en paramètre dans ses surcharges un objet de type MessageCreator.
Une surcharge de la méthode send() attend en paramètre le nom de la destination du message et un objet de type MessageCreator() qui est une interface de callback pour créer un message.
Exemple : |
package com.jmdoudoux.test.spring.jms;
import java.util.Date;
import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.Session;
import org.springframework.jms.core.JmsTemplate;
import org.springframework.jms.core.MessageCreator;
public class JmsProducer {
private JmsTemplate jmsTemplate;
public void envoyerMessage() {
jmsTemplate.send(new MessageCreator() {
public Message createMessage(final Session session) throws JMSException {
return session.createTextMessage("Message " + new Date());
}
});
}
public JmsTemplate getJmsTemplate() {
return jmsTemplate;
}
public void setJmsTemplate(final JmsTemplate jmsTemplate) {
this.jmsTemplate = jmsTemplate;
}
}
Dans l'exemple ci-dessus, une instance anonyme de la classe MessageCreator est utilisée pour créer le message : la méthode createMessage() possède en paramètre la session qui sera utilisée pour créer la nouvelle instance du message.
Pour des besoins plus pointus, il est possible d'utiliser un callback de type SessionCallback qui permet un accès à la session et au MessageProducer.
La classe JmsTemplate prend en charge la fermeture de la session JMS.
Une instance de JmsTemplate peut être utilisée pour recevoir les messages de manière synchrone.
La méthode receive() attend un nouveau message sur la destination par défaut de l'instance de JmsTemplate.
Une autre surcharge de la méthode receive() attend un nouveau message sur la destination fournie en paramètre.
Exemple : |
package com.jmdoudoux.test.spring.jms;
import javax.jms.Message;
import javax.jms.TextMessage;
import org.springframework.jms.core.JmsTemplate;
public class JmsConsumer {
private JmsTemplate jmsTemplate;
public JmsTemplate getJmsTemplate() {
return jmsTemplate;
}
public void recevoirMessage() {
Message msg = jmsTemplate.receive();
try {
TextMessage textMessage = (TextMessage) msg;
if (msg != null) {
System.out.println("Message = " + textMessage.getText());
}
} catch (Exception e) {
e.printStackTrace();
}
}
public void setJmsTemplate(final JmsTemplate jmsTemplate) {
this.jmsTemplate = jmsTemplate;
}
}
La propriété receiveTimeout de la classe JmsTemplate permet de préciser un timeout d'attente puisque la réception est synchrone.
Remarque : La classe JmsTemplate peut être utilisée pour envoyer des messages mais elle n'est pas recommandée pour en recevoir. Pour la réception d'un message, il est préférable d'utiliser une solution asynchrone reposant sur un MessageListenerContainer de Spring.
Cette section va écrire une petite application qui utilise les deux classes définies précédemment, Spring 3.0.5 et ActiveMQ 5.4.2.
Le context Spring doit contenir la définition des différents beans utilisés.
Exemple : |
<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xmlns:context="http://www.springframework.org/schema/context"
xsi:schemaLocation="http://www.springframework.org/schema/beans
http://www.springframework.org/schema/beans/spring-beans-3.0.xsd
http://www.springframework.org/schema/context
http://www.springframework.org/schema/context/spring-context-3.0.xsd">
<!-- Fabrique de connexions à ActiveMQ -->
<bean id="amqConnectionFactory" class="org.apache.activemq.ActiveMQConnectionFactory">
<property name="brokerURL"
value="tcp://localhost:61616?wireFormat.maxInactivityDuration=0" />
</bean>
<!-- Destination dans ActiveMQ -->
<bean id="destination" class="org.apache.activemq.command.ActiveMQQueue">
<constructor-arg value="local.maqueue" />
</bean>
<!-- Instance de JmsTemplate qui utilise ConnectionFactory et la
Destination -->
<bean id="producerTemplate" class="org.springframework.jms.core.JmsTemplate">
<property name="connectionFactory" ref="amqConnectionFactory" />
<property name="defaultDestination" ref="destination" />
</bean>
<bean id="consumerTemplate" class="org.springframework.jms.core.JmsTemplate">
<property name="connectionFactory" ref="amqConnectionFactory" />
<property name="defaultDestination" ref="destination" />
</bean>
<bean id="jmsProducer" class="com.jmdoudoux.test.spring.jms.JmsProducer">
<property name="jmsTemplate" ref="producerTemplate" />
</bean>
<bean id="jmsConsumer" class="com.jmdoudoux.test.spring.jms.JmsConsumer">
<property name="jmsTemplate" ref="consumerTemplate" />
</bean>
</beans>
Le bean amqConnectionFactory est une instance de type ActiveMQConnectionFactory : ce bean est une fabrique de connexions à un ActiveMQ installé en local et qui utilise le port 61616.
Le bean destination est une instance de type ActiveMQQueue : ce bean encapsule une queue nommée « local.maqueue ».
Les beans producerTemplate et consumerTemplate sont des instances de type JmsTemplate.
La propriété connectionFactory est initialisée avec le bean amqConnectionFactory et la propriété defaultDestination l'est avec le bean destination.
Une instance de JmsProducer et de JmsConsumer sont déclarées avec, en dépendance, l'instance de JmsTemplate correspondante.
Pour tester les classes d'envoi et de réception, il suffit d'écrire une petite application qui charge le contexte Spring, obtient une instance de la classe et invoque l'opération voulue.
Exemple : |
package com.jmdoudoux.test.spring.jms;
import org.springframework.context.support.ClassPathXmlApplicationContext;
public class TestProducerConsumer {
public static void main(final String[] args) {
ClassPathXmlApplicationContext appContext = new ClassPathXmlApplicationContext(
new String[] {"appContext.xml" });
System.out.println("envoi du message");
JmsProducer jmsProducer = (JmsProducer) appContext.getBean("jmsProducer");
jmsProducer.envoyerMessage();
System.out.println("reception du message");
JmsConsumer jmsConsumer = (JmsConsumer) appContext.getBean("jmsConsumer");
jmsConsumer.recevoirMessage();
}
}
Le classpath de l'application doit contenir plusieurs fichiers jar : org.springframework.jms-3.0.5.RELEASE.jar, org.springframework.beans-3.0.5.RELEASE.jar, org.springframework.context-3.0.5.RELEASE.jar, org.springframework.core-3.0.5.RELEASE.jar, com.springsource.org.apache.commons.logging-1.1.1.jar, org.springframework.asm-3.0.5.RELEASE.jar, org.springframework.expression-3.0.5.RELEASE.jar, org.springframework.transaction-3.0.5.RELEASE.jar, apache-activemq-5.4.2/activemq-all-5.4.2.jar
Initialement la classe JmsTemplate a été conçue pour être utilisée dans un conteneur Java EE qui offre une gestion des ressources JMS notamment en utilisant des pools.
Si la classe JmsTemplate est utilisée en dehors d'un conteneur Java EE ou si aucune gestion des ressources JMS n'est prise en charge par le fournisseur, il est alors intéressant d'utiliser la classe CachingConnectionFactory.
La classe CachingConnectionFactory est un wrapper qui encapsule une connexion à un MOM en proposant une reconnexion au besoin et une mise en cache de certaines ressources (connections, sessions).
Par défaut, la classe CachingConnectionFactory utilise une seule session pour créer les connections. Il est possible d'utiliser plusieurs sessions pour améliorer la montée en charge en utilisant la propriété sessionCacheSize.
Exemple : |
<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xmlns:context="http://www.springframework.org/schema/context"
xsi:schemaLocation="http://www.springframework.org/schema/beans
http://www.springframework.org/schema/beans/spring-beans-3.0.xsd
http://www.springframework.org/schema/context
http://www.springframework.org/schema/context/spring-context-3.0.xsd">
<!-- Fabrique de connexions à ActiveMQ -->
<bean id="amqConnectionFactory" class="org.apache.activemq.ActiveMQConnectionFactory">
<property name="brokerURL"
value="tcp://localhost:61616?wireFormat.maxInactivityDuration=0" />
</bean>
<!-- Cache des connexions à ActiveMQ -->
<bean id="cachedConnectionFactory"
class="org.springframework.jms.connection.CachingConnectionFactory">
<property name="targetConnectionFactory" ref="amqConnectionFactory" />
<property name="sessionCacheSize" value="3" />
</bean>
<!-- Destination dans ActiveMQ -->
<bean id="destination" class="org.apache.activemq.command.ActiveMQQueue">
<constructor-arg value="local.maqueue" />
</bean>
<!-- Instances de JmsTemplate qui utilise la ConnectionFactory avec
mise en cache et la Destination -->
<bean id="producerTemplate" class="org.springframework.jms.core.JmsTemplate">
<property name="connectionFactory" ref="cachedConnectionFactory" />
<property name="defaultDestination" ref="destination" />
</bean>
<bean id="consumerTemplate" class="org.springframework.jms.core.JmsTemplate">
<property name="connectionFactory" ref="cachedConnectionFactory" />
<property name="defaultDestination" ref="destination" />
</bean>
<bean id="jmsProducer" class="com.jmdoudoux.test.spring.jms.JmsProducer">
<property name="jmsTemplate" ref="producerTemplate" />
</bean>
<bean id="jmsConsumer" class="com.jmdoudoux.test.spring.jms.JmsConsumer">
<property name="jmsTemplate" ref="consumerTemplate" />
</bean>
</beans>
La classe de test est modifiée pour permettre un arrêt et une relance d'ActiveMQ durant son exécution.
Exemple : |
package com.jmdoudoux.test.spring.jms;
import java.io.IOException;
import org.springframework.context.support.ClassPathXmlApplicationContext;
public class TestProducerConsumer {
public static void main(final String[] args) {
ClassPathXmlApplicationContext appContext = new ClassPathXmlApplicationContext(
new String[] { "appContext.xml" });
System.out.println("envoi du message");
JmsProducer jmsProducer = (JmsProducer) appContext.getBean("jmsProducer");
jmsProducer.envoyerMessage();
try {
System.out.println("arret activeMQ");
System.in.read();
System.out.println("relance activeMQ");
System.in.read();
} catch (IOException e) {
e.printStackTrace();
}
System.out.println("reception du message");
JmsConsumer jmsConsumer = (JmsConsumer) appContext.getBean("jmsConsumer");
jmsConsumer.recevoirMessage();
}
}
Lors de l'exécution de l'application, les logs contiennent une trace de la déconnexion au broker JMS mais Spring va assurer une reconnexion automatique dès que le broker est relancé.
Résultat : |
17 avr. 2011 17:15:21 org.springframework.context.support.AbstractApplicationContext
prepareRefresh
INFO: Refreshing org.springframework.context.support.ClassPathXmlApplicationContext@c1b531:
startup date [Sun Apr 17 17:15:21 CEST 2011]; root of context hierarchy
17 avr. 2011 17:15:21 org.springframework.beans.factory.xml.XmlBeanDefinitionReader
loadBeanDefinitions
INFO: Loading XML bean definitions from class path resource [appContext.xml]
17 avr. 2011 17:15:22 org.springframework.beans.factory.support.DefaultListableBeanFactory
preInstantiateSingletons
INFO: Pre-instantiating singletons in org.springframework.beans.factory.support.
DefaultListableBeanFactory@ef5502: defining beans
[amqConnectionFactory,cachedConnectionFactory,destination,producerTemplate,consumerTemplate
,jmsProducer,jmsConsumer]; root of factory hierarchy
envoi du message
17 avr. 2011 17:15:22 org.springframework.jms.connection.SingleConnectionFactory initConnection
INFO: Established shared JMS Connection: ActiveMQConnection
{id=ID:THINKPAD_X60S-1340-1302707722546-0:1,clientId=null,started=false}
arret activeMQ
17 avr. 2011 17:15:35 org.springframework.jms.connection.SingleConnectionFactory onException
ATTENTION: Encountered a JMSException - resetting the underlying JMS Connection
javax.jms.JMSException: java.io.EOFException
at org.apache.activemq.util.JMSExceptionSupport.create(JMSExceptionSupport.java:49)
at org.apache.activemq.ActiveMQConnection.onAsyncException(ActiveMQConnection.java:1833)
at org.apache.activemq.ActiveMQConnection.onException(ActiveMQConnection.java:1850)
at org.apache.activemq.transport.TransportFilter.onException(TransportFilter.java:101)
at org.apache.activemq.transport.ResponseCorrelator.onException(ResponseCorrelator.java:126)
at org.apache.activemq.transport.TransportFilter.onException(TransportFilter.java:101)
at org.apache.activemq.transport.TransportFilter.onException(TransportFilter.java:101)
at org.apache.activemq.transport.WireFormatNegotiator.onException(WireFormatNegotiator.
java:160)
at org.apache.activemq.transport.InactivityMonitor.onException(InactivityMonitor.java:266)
at org.apache.activemq.transport.TransportSupport.onException(TransportSupport.java:96)
at org.apache.activemq.transport.tcp.TcpTransport.run(TcpTransport.java:206)
at java.lang.Thread.run(Unknown Source)
Caused by: java.io.EOFException
at java.io.DataInputStream.readInt(Unknown Source)
at org.apache.activemq.openwire.OpenWireFormat.unmarshal(OpenWireFormat.java:269)
at org.apache.activemq.transport.tcp.TcpTransport.readCommand(TcpTransport.java:227)
at org.apache.activemq.transport.tcp.TcpTransport.doRun(TcpTransport.java:219)
at org.apache.activemq.transport.tcp.TcpTransport.run(TcpTransport.java:202)
... 1 more
relance activeMQ
reception du message
17 avr. 2011 17:16:23 org.springframework.jms.connection.SingleConnectionFactory initConnection
INFO: Established shared JMS Connection: ActiveMQConnection
{id=ID:THINKPAD_X60S-1340-1302707722546-0:2,clientId=null,started=false}
Message = Message Sun Apr 17 17:15:22 CEST 2011
L'écriture de consommateurs de messages JMS nécessite de nombreuses lignes de code surtout si l'on souhaite que la solution soit fiable et sache monter en charge.
La classe JmsTemplate peut être utilisée pour recevoir des messages de manière synchrone, mais son utilisation dans ce cadre n'est pas recommandée car la montée en charge est très problématique.
Les EJB de type Message Bean Driven sont définis dans la version 2.0 des spécifications de EJB : ce sont des EJB stateless, supportant les transactions qui agissent en tant que listeners de messages JMS. Pour permettre une meilleure montée en charge, le serveur d'applications peut mettre en oeuvre un pool d'EJB MDB.
Cela permet au conteneur Java EE d'avoir une solution de traitement des messages asynchrones.
Les EJB de type MDB présentent plusieurs inconvénients :
Spring propose plusieurs solutions pour permettre de recevoir des messages de manière asynchrone en utilisant des MessageListenerContainer.
La classe SimpleMessageListenerContainer est l'implémentation la plus simple : elle offre donc des fonctionnalités limitées. Par exemple, elle ne propose pas de support pour les transactions.
L'utilisation de la classe DefaultMessageListenerContainer a plusieurs avantages :
La classe DefaultMessageListenerContainer a pour but de faciliter la réception de messages asynchrones.
Le plus simple pour utiliser la classe DefaultMessageListenerContainer est d'utiliser le namespace jms dans le fichier de configuration du contexte pour définir les différents composants utilisés. L'exemple ci-dessous utilise Apache ActiveMQ.
Exemple : |
<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xmlns:context="http://www.springframework.org/schema/context"
xmlns:jms="http://www.springframework.org/schema/jms"
xsi:schemaLocation="http://www.springframework.org/schema/beans
http://www.springframework.org/schema/beans/spring-beans-3.0.xsd
http://www.springframework.org/schema/context
http://www.springframework.org/schema/context/spring-context-3.0.xsd
http://www.springframework.org/schema/jms
http://www.springframework.org/schema/jms/spring-jms-3.0.xsd">
<!-- Fabrique de connexions à ActiveMQ -->
<bean id="amqConnectionFactory" class="org.apache.activemq.ActiveMQConnectionFactory">
<property name="brokerURL" value="tcp://localhost:61616?wireFormat.maxInactivityDuration=0"
/>
</bean>
<!-- Cache des connexions à ActiveMQ -->
<bean id="cachedConnectionFactory"
class="org.springframework.jms.connection.CachingConnectionFactory">
<property name="targetConnectionFactory" ref="amqConnectionFactory" />
<property name="sessionCacheSize" value="3" />
</bean>
<!-- Destination dans ActiveMQ -->
<bean id="destination" class="org.apache.activemq.command.ActiveMQQueue">
<constructor-arg value="local.maqueue" />
</bean>
<!-- Instances de JmsTemplate qui utilise la ConnectionFactory
avec mise en cache et la Destination -->
<bean id="producerTemplate" class="org.springframework.jms.core.JmsTemplate">
<property name="connectionFactory" ref="cachedConnectionFactory" />
<property name="defaultDestination" ref="destination" />
</bean>
<bean id="jmsProducer" class="com.jmdoudoux.test.spring.jms.JmsProducer">
<property name="jmsTemplate" ref="producerTemplate" />
</bean>
<!-- Bean qui implemente l'interface MessageListener -->
<bean id="monSimpleMessageListener"
class="com.jmdoudoux.test.spring.jms.MonSimpleMessageListener">
</bean>
<jms:listener-container container-type="default"
connection-factory="cachedConnectionFactory" acknowledge="auto" >
<jms:listener id="monListener" destination="local.maqueue"
ref="monSimpleMessageListener" method="onMessage" />
</jms:listener-container>
</beans>
Un bean de type ActiveMQConnectionFactory est défini pour assurer la connexion avec le broker ActiveMQ.
Un bean qui est le message listener JMS est défini.
Un listener-container est défini en lui fournissant le cache des ConnectionFactory, le type de container default et le message listener. La propriété acknowledge permet de préciser le mode d'acquittement des messages.
Le tag <jms:listener> permet d'associer un message listener avec une destination.
Il est possible de définir plusieurs <jms:listener> qui seront gérés par le conteneur.
Exemple : |
package com.jmdoudoux.test.spring.jms;
import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.MessageListener;
import javax.jms.TextMessage;
public class MonSimpleMessageListener implements MessageListener {
public void onMessage(final Message message) {
try {
System.out.println("debut reception message");
TextMessage msg = (TextMessage) message;
System.out.println(" Message recu : " + msg.getText());
} catch (JMSException e) {
e.printStackTrace();
}
System.out.println("fin reception message");
}
}
Le message listener est un bean qui peut être implémenté de plusieurs manières :
L'utilisation du message listener container de Spring possède plusieurs avantages :
La classe de test envoie plusieurs messages dans la queue et attend une entrée de l'utilisateur. Cela permet au listener démarré automatiquement par le conteneur Spring de consommer les messages.
Exemple : |
package com.jmdoudoux.test.spring.jms;
import java.io.IOException;
import org.springframework.context.support.ClassPathXmlApplicationContext;
public class TestProducerConsumerAsync {
public static void main(final String[] args) {
ClassPathXmlApplicationContext appContext = new ClassPathXmlApplicationContext(
new String[] { "appContext.xml" });
System.out.println("envoi des messages");
JmsProducer jmsProducer = (JmsProducer) appContext.getBean("jmsProducer");
jmsProducer.envoyerMessage();
jmsProducer.envoyerMessage();
jmsProducer.envoyerMessage();
try {
System.in.read();
} catch (IOException e) {
e.printStackTrace();
}
}
}
A l'exécution, les messages sont consommés par le listener.
Résultat : |
17 avr. 2011 17:49:17 org.springframework.context.support.AbstractApplicationContext
prepareRefresh
INFO: Refreshing org.springframework.context.support.ClassPathXmlApplicationContext@19c26f5:
startup date [Sun Apr 17 17:49:17 CEST 2011]; root of context hierarchy
17 avr. 2011 17:49:17 org.springframework.beans.factory.xml.XmlBeanDefinitionReader
loadBeanDefinitions
INFO: Loading XML bean definitions from class path resource [appContext.xml]
17 avr. 2011 17:49:17 org.springframework.beans.factory.support.DefaultListableBeanFactory
preInstantiateSingletons
INFO: Pre-instantiating singletons in org.springframework.beans.factory.support.
DefaultListableBeanFactory@152c4d9: defining beans
[amqConnectionFactory,cachedConnectionFactory,destination,producerTemplate,consumerTemplate,
jmsProducer,jmsConsumer,monSimpleMessageListener,monListener];
root of factory hierarchy
17 avr. 2011 17:49:17 org.springframework.context.support.
DefaultLifecycleProcessor$LifecycleGroup start
INFO: Starting beans in phase 2147483647
17 avr. 2011 17:49:17 org.springframework.jms.connection.SingleConnectionFactory initConnection
INFO: Established shared JMS Connection: ActiveMQConnection
{id=ID:THINKPAD_X60S-1390-1303228157843-0:1,clientId=null,started=false}
envoi des messages
debut reception message
Message recu : Message Sun Apr 17 17:49:18 CEST 2011
fin reception message
debut reception message
Message recu : Message Sun Apr 17 17:49:18 CEST 2011
fin reception message
debut reception message
Message recu : Message Sun Apr 17 17:49:18 CEST 2011
fin reception message
Lors de la mise en oeuvre de JMS, l'envoi de messages est plus rapide que la réception et le traitement d'un message. Pour permettre à un système de monter en charge, il peut être utile de mettre en oeuvre plus de consumers que de producers pour des échanges au travers d'une queue.
La classe DefaultMessageListenerContainer est un conteneur pour la consommation asynchrone de messages.
Elle permet d'adapter dynamiquement le nombre de consumers utilisés en fonction du nombre de messages à traiter. La propriété concurrentConsumers permet de préciser le nombre de consumers à utiliser. La propriété maxConcurrentConsumers permet de préciser le nombre maximal de consumers utilisable. Chaque consumer possède sa propre connexion au broker.
La propriété concurrency permet de préciser le nombre de consumers et le nombre maximal de consumers en séparant les deux valeurs par un caractère tiret.
La propriété idleConsumerLimit permet de préciser le nombre de consumers inactifs qui doivent rester chargés. Ceci permet de réutiliser des consumers sans avoir à toujours en recréer.
Les propriétés concurrentConsumers et maxConcurrentConsumers peuvent être modifiées dynamiquement au runtime.
Il ne faut pas utiliser plusieurs consumers sur un topic car le message sera consommé par chaque consumer.
Pour améliorer la montée en charge, il est possible de modifier la valeur de plusieurs propriétés en fonction des besoins :
Exemple : |
<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xmlns:context="http://www.springframework.org/schema/context"
xmlns:jms="http://www.springframework.org/schema/jms"
xmlns:p="http://www.springframework.org/schema/p"
xsi:schemaLocation="http://www.springframework.org/schema/beans
http://www.springframework.org/schema/beans/spring-beans-3.0.xsd
http://www.springframework.org/schema/context
http://www.springframework.org/schema/context/spring-context-3.0.xsd
http://www.springframework.org/schema/jms
http://www.springframework.org/schema/jms/spring-jms-3.0.xsd">
<!-- Fabrique de connexions à ActiveMQ -->
<bean id="amqConnectionFactory" class="org.apache.activemq.ActiveMQConnectionFactory">
<property name="brokerURL"
value="tcp://localhost:61616?wireFormat.maxInactivityDuration=0" />
</bean>
<!-- Cache des connexions à ActiveMQ -->
<bean id="cachedConnectionFactory"
class="org.springframework.jms.connection.CachingConnectionFactory">
<property name="targetConnectionFactory" ref="amqConnectionFactory" />
<property name="sessionCacheSize" value="3" />
</bean>
<!-- Destination dans ActiveMQ -->
<bean id="destination" class="org.apache.activemq.command.ActiveMQQueue">
<constructor-arg value="local.maqueue" />
</bean>
<!-- Instances de JmsTemplate qui utilise la ConnectionFactory avec mise
en cache et la Destination -->
<bean id="producerTemplate" class="org.springframework.jms.core.JmsTemplate">
<property name="connectionFactory" ref="cachedConnectionFactory" />
<property name="defaultDestination" ref="destination" />
</bean>
<bean id="jmsProducer" class="com.jmdoudoux.test.spring.jms.JmsProducer">
<property name="jmsTemplate" ref="producerTemplate" />
</bean>
<bean id="monSimpleMessageListener"
class="com.jmdoudoux.test.spring.jms.MonSimpleMessageListener">
</bean>
<bean id="monMessageListenerContainer"
class="org.springframework.jms.listener.DefaultMessageListenerContainer"
p:connectionFactory-ref="cachedConnectionFactory"
p:destination-ref="destination"
p:messageListener-ref="monSimpleMessageListener"
p:concurrentConsumers="3"
p:maxConcurrentConsumers="5"
p:receiveTimeout="5000"
p:idleTaskExecutionLimit="10"
p:idleConsumerLimit="3"
/>
</beans>
Lors de l'exécution, les messages sont consommés en parallèle.
Résultat : |
17 avr. 2011 18:31:31 org.springframework.context.support.AbstractApplicationContext
prepareRefresh
INFO: Refreshing org.springframework.context.support.ClassPathXmlApplicationContext@c1b531:
startup date [Sun Apr 17 18:31:31 CEST 2011]; root of context hierarchy
17 avr. 2011 18:31:31 org.springframework.beans.factory.xml.XmlBeanDefinitionReader
loadBeanDefinitions
INFO: Loading XML bean definitions from class path resource [appContext.xml]
17 avr. 2011 18:31:31 org.springframework.beans.factory.support.DefaultListableBeanFactory
preInstantiateSingletons
INFO: Pre-instantiating singletons in org.springframework.beans.factory.support.
DefaultListableBeanFactory@a61164: defining beans
[amqConnectionFactory,cachedConnectionFactory,destination,producerTemplate,consumerTemplate,
jmsProducer,jmsConsumer,monSimpleMessageListener,monMessageListen
erContainer]; root of factory hierarchy
17 avr. 2011 18:31:32 org.springframework.context.support.
DefaultLifecycleProcessor$LifecycleGroup start
INFO: Starting beans in phase 2147483647
17 avr. 2011 18:31:32 org.springframework.jms.connection.SingleConnectionFactory initConnection
INFO: Established shared JMS Connection: ActiveMQConnection
{id=ID:THINKPAD_X60S-1397-1303230692234-0:1,clientId=null,started=false}
envoi des messages
debut reception message
Message recu : Message Sun Apr 17 18:31:32 CEST 2011
debut reception message
Message recu : Message Sun Apr 17 18:31:32 CEST 2011
debut reception message
Message recu : Message Sun Apr 17 18:31:32 CEST 2011
fin reception message
fin reception message
fin reception message
Par défaut, la classe DefaultMessageListenerContainer utilise un cache pour les ressources JMS (connexions, sessions, consumers) sauf si un gestionnaire de transactions externe est utilisé. La propriété cacheLevel permet de préciser les éléments qui doivent être mis en cache (connexions uniquement, connexions et sessions ou connexions, sessions et consumers).
En précisant consumer comme valeur de la propriété cacheLevel, les connexions, les sessions et les consumers seront mis en cache.
La session est mise en cache selon son mode d'acquittement. Le consumer est mis en cache selon sa session, son sélecteur et sa destination.
La mise en cache des ressources peut permettre d'améliorer les performances de la montée en charge de l'application : par exemple ActiveMQ propose la classe org.apache.activemq.pool.PooledConnectionFactory.
Il est cependant préférable d'utiliser la classe CachingConnectionFactory de Spring qui permet en plus de mettre en cache les consumers et qui permet de se connecter à n'importe quel MOM respectant les spécifications JMS.
L'espace de nommage jms permet de définir dans le contexte des objets relatifs à l'API JMS.
Exemple : |
<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xmlns:jms="http://www.springframework.org/schema/jms"
xsi:schemaLocation="
http://www.springframework.org/schema/beans
http://www.springframework.org/schema/beans/spring-beans-3.0.xsd
http://www.springframework.org/schema/jms
http://www.springframework.org/schema/jms/spring-jms-3.0.xsd">
<!-- ... -->
</beans>
Le tag <listener-container> permet de configurer un container qui va gérer des listeners JMS.
Ce tag va créer un objet qui va se connecter au broker de messages et obtenir les messages pour les faire traiter par un listener.
Ce tag peut avoir un ou plusieurs tags fils <listener>.
Le tag <listener> permet de configurer un listener JMS.
Exemple : |
<bean id="messageHandler" class="com.jmdoudoux.test.spring.jms.MessageHandlerImpl" />
<jms:listener-container connectionfactory="amqConnectionFactory">
<jms:listener destination="local.maqueue" ref="messageHandler"
method="onMessage" />
</jms:listener-container>
La propriété destination permet de préciser la destination (queue ou topic) qui sera écoutée.
La propriété method permet de préciser la méthode de l'objet indiqué par la propriété ref qui sera invoquée pour traiter les messages.
Le tag <jca-listener-container> permet de configurer un container JCA qui va gérer des listeners JMS.