IdentifiantMot de passe
Loading...
Mot de passe oublié ?Je m'inscris ! (gratuit)

 

Développons en Java   2.30  
Copyright (C) 1999-2022 Jean-Michel DOUDOUX    (date de publication : 15/06/2022)

[ Précédent ] [ Sommaire ] [ Suivant ] [Télécharger ]      [Accueil ]

 

38. Le framework Executor

 

chapitre    3 8

 

Niveau : niveau 4 Supérieur 

 

 

 

Les threads en Java présentent de nombreux inconvénients essentiellement liés au fait que la classe Thread est de bas niveau et donc certaines fonctionnalités doivent être développées, par exemple :

  • obtenir un résultat de l'exécution d'un thread
  • obtenir par l'appelant une exception levée dans le thread
  • aucun pool de threads n'est proposé en standard
  • attendre la fin d'un ensemble de threads

Pour faciliter la mise en oeuvre de traitements en parallèle, en apportant des réponses à ces problématiques, Java 5 propose le framework Executor qui est spécifié dans la JSR 166.

Il est fortement recommandé d'utiliser le framework Executor à la place de la classe Thread.

Ce chapitre contient plusieurs sections :

 

38.1. L'interface Executor

L'interface java.util.concurrent.Executor décrit les fonctionnalités permettant l'exécution différée de tâches implémentées sous la forme de Runnable.

Elle ne définit qu'une seule méthode :

Méthode

Rôle

void execute(Runnable command)

Exécuter la tâche fournie en paramètre éventuellement dans le futur


Selon l'implémentation, la tâche pourra être exécutée dans un thread dédié ou dans le thread courant.

Elle possède deux interfaces filles :

  • ExecutorService : elle définie les fonctionnalités d'un service permettant l'exécution de tâches de type Runnable ou Callable.
  • ScheduledExecutorService qui hérite de l'interface ExecutorService : elle définit les fonctionnalités d'un service pour l'exécution de tâches planifiées et/ou répétées

 

38.1.1. L'interface ExecutorService

L'interface ExecutorService décrit les fonctionnalités d'un service d'exécution de tâches. Elle hérite de l'interface Executor.

Elle définit plusieurs méthodes

Méthode

Rôle

boolean awaitTermination(long timeout, TimeUnit unit)

Attendre l'achèvement des tâches après une demande d'arrêt ou la fin d'un délai ou l'interruption du thread courant selon ce qui se produira en premier

<T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks)

Exécuter toutes les tâches fournies en paramètres. Renvoie une collection de Future qui permet d'obtenir des informations sur l'exécution des tâches. Cette méthode est bloquante jusqu'à ce que l'exécution de toutes les tâches soit terminée

<T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks, long timeout, TimeUnit unit)

Exécuter toutes les tâches fournies en paramètres. Renvoie une collection de Future qui permet d'obtenir des informations sur l'exécution des tâches. Cette méthode est bloquante jusqu'à ce que l'exécution de toutes les tâches soit terminée ou que le timeout soit atteint

<T> T invokeAny(Collection<? extends Callable<T>> tasks)

Exécuter toutes les tâches fournies en paramètres. Renvoie le résultat d'une des tâches exécutées. Cette méthode est bloquante jusqu'à ce que l'exécution de la tâche dont la valeur est retournée soit terminée

<T> T invokeAny(Collection<? extends Callable<T>> tasks, long timeout, TimeUnit unit)

Exécuter toutes les tâches fournies en paramètre. Renvoie le résultat d'une des tâches exécutées. Cette méthode est bloquante jusqu'à ce que l'exécution de la tâche dont la valeur est retournée soit terminée ou que le timeout soit atteint

boolean isShutdown()

Renvoyer un booléen qui précise si le service est fermé

boolean isTerminated()

Renvoyer un booléen qui précise si toutes les tâches sont terminées

void shutdown()

Demander la fermeture du service. Toutes les tâches en cours d'exécution se poursuivent jusqu'à leur fin mais plus aucune nouvelle tâche ne peut être ajoutée dans le service

List<Runnable> shutdownNow()

Demander la fermeture du service. Elle tente d'arrêter le traitement des tâches en cours d'exécution. Elle renvoie une liste des tâches qui attendaient d'être exécutées

<T> Future<T> submit(Callable<T> task)

Ajouter la tâche dans la queue pour exécution. Renvoie un Future qui permet d'obtenir des informations sur l'exécution de la tâche

Future<?> submit(Runnable task)

Ajouter la tâche dans la queue pour exécution. Renvoie un Future qui permet d'obtenir des informations sur l'exécution de la tâche

<T> Future<T> submit(Runnable task, T result)

Ajouter la tâche dans la queue pour exécution. Renvoie un Future qui permet d'obtenir des informations sur l'exécution de la tâche


Il est important d'invoquer la méthode shutdown() lorsqu'il n'est plus nécessaire aux threads de l'ExecutorService d'attendre de nouvelles tâches à Exécuter. Dès lors, les tâches déjà soumises seront exécutées mais il ne sera plus possible d'en ajouter de nouvelles.

Il est nécessaire d'invoquer la méthode shutdown() pour arrêter le thread en cours d'exécution une fois que l'ExecutorService n'est plus utile.

La méthode shutdownNow() permet de demander l'interruption de l'exécution des tâches en cours et d'annuler l'exécution des tâches en attente.

Plusieurs implémentations sont fournies en standard :

  • java.util.concurrent.ThreadPoolExecutor
  • java.util.concurrent.ScheduledThreadPoolExecutor
  • java.util.concurrent.ForkJoinPool (depuis Java 7)

Le plus simple pour créer une instance de type ExecutorService est d'utiliser la fabrique java.util.concurrent.Executors.

Il est possible de soumettre des tâches de type Runnable ou Callable pour exécution à un pool de threads en invoquant une des méthodes dédiées.

Ces méthodes renvoient un objet de type Future. Si la tâche exécutée est de type Runnable, la méthode get() renvoie toujours null.

Pour soumettre des tâches à un ExecutorService, il est préférable d'utiliser la méthode submit() plutôt que la méthode execute(). La méthode submit() renvoie un objet de type Future qui permet :

  • d'obtenir la valeur de retour (ou null s'il n'y en a pas)
  • d'obtenir l'exception levée par la tâche au cas où celle-ci en a levée une
  • de demander l'annulation de l'exécution de la tâche (si celle-ci prend en charge cette fonctionnalité)

Si une exception est levée durant l'exécution des traitements d'une tâche soumise avec la méthode execute() alors celle-ci sera traitée par le UncaughtExceptionHandler du thread qui exécute la tâche. Par défaut, ce handler affiche l'exception et sa stacktrace dans le flux System.err.

Si une exception est levée durant l'exécution des traitements d'une tâche soumise avec la méthode submit() alors celle-ci sera chaînée à l'exception de type ExecutionException qui sera levée par l'invocation de la méthode get().

 

38.1.2. L'interface ScheduledExecutorService

L'utilisation de cette interface est détaillée dans la section Le ScheduledExecutorService du chapitre La planification de tâches.

 

38.2. Les pools de threads

Un pool d'objets est une collection d'objets initialisés qui vont être utilisables et réutilisables selon les besoins.

L'utilisation d'un pool peut améliorer les performances car il évite d'avoir à créer et initialiser un objet à chaque fois qu'une telle instance est requise. C'est particulièrement efficace pour les instances qui sont longues et/ou coûteuses à créer comme par exemple les connexions JDBC.

Un pool de threads permet de contenir un ensemble de threads qui pourront être utilisés pour exécuter des tâches. Les pools de threads sont particulièrement utiles pour exécuter des tâches similaires et indépendantes.

Les threads requièrent des ressources systèmes : il est donc nécessaire d'avoir le contrôle sur leur nombre. L'utilisation d'un pool peut permettre de facilement contrôler le nombre maximum de threads qui peuvent être exécutés en simultané. Chaque thread consomme de la ressource (CPU et mémoire) : le nombre de threads exécutables dépend donc des ressources de la machine et du système d'exploitation.

Une solution possible est d'invoquer la méthode availableProcessors() de la classe Runtime qui renvoie un entier représentant le nombre de processeurs disponibles sur la machine et d'utiliser cette valeur directement ou indirectement pour déterminer la taille du pool.

Exemple :
int nbProcs = Runtime.getRuntime().availableProcessors();

Un ExecutorService encapsule un pool de threads et une queue de tâches à exécuter. Tous les threads du pool sont toujours en cours d'exécution. Le service vérifie si une tâche est à traiter dans la queue et si c'est le cas il la retire et l'exécute. Une fois la tâche exécutée, le thread attend de nouveau que le service lui assigne une nouvelle tâche de la queue.

Plusieurs implémentations de l'interface ExecutorService sont proposées en standard dans le JDK :

  • Single Thread Executor : un pool qui ne contient qu'un seul thread. Toutes les tâches soumises sont exécutées de manière séquentielle
  • Cached Thread Pool : un pool qui contient plusieurs threads. Ceux-ci sont utilisés pour exécuter en parallèle les différentes tâches. La taille du pool varie selon les besoins
  • Fixed Thread Pool : un pool qui contient un nombre fixe de threads. Ceux-ci sont utilisés pour exécuter en parallèle les différentes tâches. Si tous les threads sont occupés alors la tâche est empilée jusqu'à ce qu'elle puisse être exécutée par un thread
  • Scheduled Thread Pool : un pool qui contient plusieurs threads pour exécuter des tâches planifiées
  • Single Thread Scheduled Pool : un pool qui ne contient qu'un seul thread pour exécuter des tâches planifiées

 

38.2.1. La classe ThreadPoolExecutor

La classe java.util.concurrent.ThreadPoolExecutor est une implémentation de l'interface ExecutorService qui utilise un pool de threads. Elle hérite de la classe AbstractExecutorService.

La taille du pool de threads d'un ThreadPoolExecutor est configurée grâce à deux propriétés :

  • corePoolSize
  • maximumPoolSize
Exemple :
package fr.jmdoudoux.dej.thread;

import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Future;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;

public class TestThreadPoolExecutor {

  public static void main(String[] args) {

    ExecutorService executorService = new ThreadPoolExecutor(1, 1, 1000,
        TimeUnit.SECONDS, new LinkedBlockingQueue<Runnable>());

    Future future = executorService.submit(new Runnable() {
      @Override
      public void run() {
        System.out.println("debut tache " + Thread.currentThread().getName());
        try {
          Thread.sleep(10000);
        } catch (InterruptedException e) {
          e.printStackTrace();
        }
        System.out.println("fin tache");
      }
    });

    System.out.println("Autre traitement");

    try {
      System.out.println("resultat=" + future.get());
    } catch (InterruptedException e) {
      e.printStackTrace();
    } catch (ExecutionException e) {
      e.printStackTrace();
    }

    executorService.shutdown();

    System.out.println("Fin thread principal");
  }
}

Lorsque l'exécution d'une tâche est confiée à un thread du pool, si le nombre de threads présent dans le pool est inférieur au maximum alors un nouveau thread est créé et ajouté au pool même si un ou plusieurs threads sont inactifs dans le pool.

Si la queue interne est pleine et que le nombre de threads est supérieur à corePoolSize et inférieur à maximumPoolSize alors un nouveau thread est créé et ajouté dans le pool pour exécuter des tâches.

Exemple :
package fr.jmdoudoux.dej.thread;

import java.util.concurrent.ExecutorService;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;

public class TestThreadPoolExecutor {

  public static void main(String[] args) throws InterruptedException {

    ExecutorService executorService = new ThreadPoolExecutor(2, 4, 60,
        TimeUnit.SECONDS, new LinkedBlockingQueue<Runnable>());

    for (int i = 0; i < 5; i++) {
      executorService.submit(new Runnable() {
        @Override
        public void run() {
          System.out.println("debut tache " + Thread.currentThread().getName());
          try {
            Thread.sleep(1000);
          } catch (InterruptedException e) {
            e.printStackTrace();
          }
          System.out.println("fin tache");
        }
      });
    }

    System.out.println("Autre traitement");

    executorService.shutdown();
    executorService.awaitTermination(300, TimeUnit.SECONDS);

    System.out.println("Fin thread principal");
  }
}
Résultat :
debut tache pool-1-thread-1
debut tache pool-1-thread-2
Autre traitement
fin tache
fin tache
debut tache pool-1-thread-1
debut tache pool-1-thread-2
fin tache
fin tache
debut tache pool-1-thread-1
fin tache
Fin thread principal

Dans l'exemple ci-dessous, le nombre de threads du pool ne dépasse pas 2, ce qui correspond à la propriété corePoolSize. Pourtant la propriété maximumPoolSize vaut 4 et 5 tâches sont soumises pour exécution au pool. Ceci est lié au fait que la collection de type LinkedBloquingQueue n'étant pas pleine (sa méthode offer() ne renvoie pas false), le ThreadPoolExecutor ne crée pas de nouveaux threads.

La classe ThreadPoolExecutor possède de nombreuses options de configuration. La création d'une nouvelle instance de type ThreadPoolExecutor peut être requise pour des besoins spécifiques mais le plus simple est d'utiliser la classe java.util.concurrent.Executors qui propose des fabriques. Pour faciliter la création d'instances avec une configuration classique, la classe Executors propose plusieurs méthodes qui sont des fabriques permettant de créer des instances préconfigurées.

Il est important qu'un ExecutorService soit éteint proprement lorsqu'il n'a plus d'utilité ou lorsqu'il doit être arrêté. Deux méthodes sont utilisables pour cela :

  • shutdown() : une fois invoquée, le service ne peut plus prendre de nouvelles tâches, les tâches en cours d'exécution se poursuivent, les tâches non encore exécutées sont supprimées et enfin libère les ressources une fois toutes les tâches exécutées
  • shutdownNow() : permet de demander un arrêt immédiat (le service ne peut plus prendre de nouvelles tâches, il essaie d'arrêter les tâches en cours d'exécution, supprime les tâches non encore exécutées et enfin libère les ressources). Elle renvoie une liste des tâches dont l'exécution ne s'est pas terminée.

Une tâche soumise à un ExecutorService sera exécutée par un thread inactif du pool.

La méthode execute(Runnable) fait exécuter la tâche fournie en paramètre par le pool de threads.

Exemple ( code Java 5.0 ) :
package fr.jmdoudoux.dej.thread;
 
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
 
public class TestExecutorService {
 
  public static void main(String[] args) {
    ExecutorService executorService = Executors.newSingleThreadExecutor();
 
    executorService.execute(new Runnable() {
      public void run() {
        System.out.println("debut tache");
        try {
          Thread.sleep(10000);
        } catch (InterruptedException e) {
          e.printStackTrace();
        }
        System.out.println("fin tache");
      }
    });
 
    executorService.shutdown();
 
    System.out.println("Fin thread principal");
  }
}
Résultat :
debut tache
Fin thread principal
fin tache

La méthode submit(Runnable) permet de demander l'exécution d'une tâche qui implémente l'interface Runnable. Elle renvoie un objet de type Future qui permet de déterminer si l'exécution de la tâche est terminée.

Exemple ( code Java 5.0 ) :
package fr.jmdoudoux.dej.thread;
 
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
 
public class TestExecutorService {
 
  public static void main(String[] args) {
 
    ExecutorService executorService = Executors.newSingleThreadExecutor();
 
    Future future = executorService.submit(new Runnable() {
      @Override
      public void run() {
        System.out.println("debut tache " + Thread.currentThread().getName());
        try {
          Thread.sleep(10000);
        } catch (InterruptedException e) {
          e.printStackTrace();
        }
        System.out.println("fin tache");
      }
    });
 
    System.out.println("Autre traitement");
 
    try {
      System.out.println("resultat=" + future.get());
    } catch (InterruptedException e) {
      e.printStackTrace();
    } catch (ExecutionException e) {
      e.printStackTrace();
    }
 
    executorService.shutdown();
 
    System.out.println("Fin thread principal");
  }
}
Résultat :
Autre traitement
debut tache pool-1-thread-1
fin tache
resultat=null
Fin thread principal

La méthode get() de l'instance de type Future obtenue en invoquant la méthode submit() avec un Runnable renvoie toujours null.

La méthode submit(Callable) permet de soumettre l'exécution d'un tâche de type Callable<V> et renvoie un objet de type Future<V> qui permet d'obtenir des informations sur l'exécution.

Exemple ( code Java 5.0 ) :
package fr.jmdoudoux.dej.thread;
 
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
 
public class TestExecutorService {
 
  public static void main(String[] args) {
    ExecutorService executorService = Executors.newFixedThreadPool(3);
 
    Future<String> future2 = executorService.submit(new Callable<String>() {
      public String call() throws Exception {
        int i = 0;
        System.out.println("debut tache 1");
        while (i < 10 && !Thread.currentThread().isInterrupted()) {
          Thread.sleep(1000);
          i++;
        }
        System.out.println("fin tache 1");
        return "Tache 1";
      }
    });

    Future<String> future1 = executorService.submit(new Callable<String>() {
      public String call() throws Exception {
        int i = 0;
        System.out.println("debut tache 2 ");
        while (i < 10 && !Thread.currentThread().isInterrupted()) {
          Thread.sleep(500);
          i++;
        }
        System.out.println("fin tache 2");
        return "Tache 2";
      }
    });
 
    executorService.shutdown();
 
    try {
      executorService.awaitTermination(1, TimeUnit.HOURS);
      System.out.println("result1 = " + future1.get());
      System.out.println("result2 = " + future2.get());
    } catch (InterruptedException ie) {
      ie.printStackTrace();
    } catch (ExecutionException ee) {
      ee.printStackTrace();
    }
  }
}
Résultat :
debut tache 1
debut tache 2 
fin tache 2
fin tache 1
result1 = Tache 2
result2 = Tache 1

La méthode invokeAny() renvoie le résultat de l'exécution d'une des tâches fournies en paramètres. Il n'y a pas de garantie sur la tâche dont la valeur sera retournée : c'est la première dont l'exécution se termine.

Exemple ( code Java 5.0 ) :
package fr.jmdoudoux.dej.thread;
 
import java.util.HashSet;
import java.util.Set;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
 
public class TestExecutorService {
 
  public static void main(String[] args) {
    ExecutorService executorService = Executors.newFixedThreadPool(3);
 
    Set<Callable<String>> callables = new HashSet<Callable<String>>();
 
    callables.add(new Callable<String>() {
      public String call() throws Exception {
        int i = 0;
        System.out.println("debut tache 1");
        while (i < 100 && !Thread.currentThread().isInterrupted()) {
          Thread.sleep(10000);
          i++;
        }
        System.out.println("fin tache 1");
        return "Tache 1";
      }
    });

    callables.add(new Callable<String>() {
      public String call() throws Exception {
        int i = 0;
        System.out.println("debut tache 2 ");
        while (i < 50 && !Thread.currentThread().isInterrupted()) {
          Thread.sleep(10);
          i++;
        }
        System.out.println("fin tache 2");
        return "Tache 2";
      }
    });

    callables.add(new Callable<String>() {
      public String call() throws Exception {
        int i = 0;
        System.out.println("debut tache 3 ");
        while (i < 200 && !Thread.currentThread().isInterrupted()) {
          Thread.sleep(100);
          i++;
        }
        System.out.println("fin tache 3");
        return "Tache 3";
      }
    });
 
    try {
      String result = executorService.invokeAny(callables);
      System.out.println("result = " + result);
    } catch (InterruptedException e) {
      e.printStackTrace();
    } catch (ExecutionException e) {
      e.printStackTrace();
    }
    executorService.shutdown();
  }
}
Résultat :
debut tache 3 
debut tache 2 
debut tache 1
fin tache 2
result = Tache 2

Dès qu'une tâche est terminée, son résultat est retourné et l'exécution des autres tâches est annulée.

La méthode invokeAny() permet d'exécuter plusieurs tâches et de renvoyer le résultat de la première qui est terminée. L'invocation de cette méthode est bloquante.

La méthode invokeAll() permet de demander l'exécution de toutes les tâches.

Exemple ( code Java 5.0 ) :
package fr.jmdoudoux.dej.thread;
 
import java.util.HashSet;
import java.util.List;
import java.util.Set;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
 
public class TestExecutorService {
 
  public static void main(String[] args) {
    ExecutorService executorService = Executors.newFixedThreadPool(2);
 
    Set<Callable<String>> callables = new HashSet<Callable<String>>();
 
    callables.add(new Callable<String>() {
      public String call() throws Exception {
        int i = 0;
        System.out.println("debut tache 1");
        while (i < 10 && !Thread.currentThread().isInterrupted()) {
          Thread.sleep(1000);
          i++;
        }
        System.out.println("fin tache 1");
        return "Tache 1";
      }
    });

    callables.add(new Callable<String>() {
      public String call() throws Exception {
        int i = 0;
        System.out.println("debut tache 2 ");
        while (i < 10 && !Thread.currentThread().isInterrupted()) {
          Thread.sleep(500);
          i++;
        }
        System.out.println("fin tache 2");
        return "Tache 2";
      }
    });
 
    try {
      List<Future<String>> futures = executorService.invokeAll(callables);
 
      executorService.shutdown();
 
      executorService.awaitTermination(1, TimeUnit.HOURS);
 
      for (Future<String> future : futures) {
        System.out.println("resultat = " + future.get());
      }
    } catch (InterruptedException ie) {
      ie.printStackTrace();
    } catch (ExecutionException ee) {
      ee.printStackTrace();
    }
  }
}
Résultat :
debut tache 2 
debut tache 1
fin tache 2
fin tache 1
resultat = Tache 2
resultat = Tache 1

La méthode awaitTermination() permet d'attendre de manière bloquante la fin de l'exécution de toutes les tâches soumises.

Les threads du pool ne sont pas des démons : si la méthode shutdown() n'est pas invoquée alors la JVM continuera indéfiniment de s'exécuter même si les traitements du thread principal sont terminés.

Exemple ( code Java 5.0 ) :
package fr.jmdoudoux.dej.thread;
 
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
 
public class TestExecutorService {
 
  public static void main(String[] args) {
    ExecutorService executorService = Executors.newSingleThreadExecutor();
 
    Future<String> future = executorService.submit(new Callable<String>() {
      public String call() throws Exception {
        System.out.println("debut tache");
        Thread.sleep(1000);
        System.out.println("fin tache");
        return "Tache";
      }
    });
 
    try {
      System.out.println("resultat = " + future.get());
    } catch (InterruptedException ie) {
      ie.printStackTrace();
    } catch (ExecutionException ee) {
      ee.printStackTrace();
    }
    System.out.println("Fin thread principal");
  }
}
Résultat :
debut tache
fin tache
resultat = Tache
Fin thread principal

La méthode shutdownNow() va tenter de stopper l'exécution des tâches en cours et va retirer les tâches dont l'exécution n'est pas encore démarrée. Aucune garantie n'est offerte concernant les tâches qui sont stoppées : elles peuvent être interrompues ou leur exécution peut se poursuivre jusqu'à sa fin.

 

38.2.2. La classe Executors

La classe java.util.concurrent.Executors est une fabrique qui permet de créer des instances de différents types du framework Executor : Executor, ExecutorService, ScheduledExecutorService, ThreadFactory et Callable.

La classe Executors permet de créer :

  • des ExecutorService dont l'implémentation utilise un pool de threads : CachedThreadPool, FixedThreadPool, SingleThreadExecutor
  • des ScheduledExecutorService
  • des Callable

Elle possède plusieurs méthodes, notamment :

Méthode 

Rôle

Callable<Object> callable(PrivilegedAction<?> action)

Renvoyer une instance de type Callable qui lors de son invocation exécutera la tâche fournie en paramètre et renverra le résultat

Callable<Object> callable(PrivilegedExceptionAction<?> action)

Renvoyer une instance de type Callable qui lors de son invocation exécutera la tâche fournie en paramètre et renverra le résultat

Callable<Object> callable(Runnable task)

Renvoyer une instance de type Callable qui lors de son invocation exécutera la tâche fournie en paramètre et renverra toujours null

<T> Callable<T> callable(Runnable task, T result)

Renvoyer une instance de type Callable qui lors de son invocation exécutera la tâche fournie et renverra le résultat fourni en paramètre

ThreadFactory defaultThreadFactory()

Renvoyer la fabrique de threads utilisée par défaut

ExecutorService newCachedThreadPool()

Renvoyer une instance d'un ExecutorService utilisant un pool de threads dont la taille peut être agrandie selon les besoins de manière non limitée. Les threads inactifs sont utilisés pour exécuter des tâches mais de nouveaux threads peuvent être créés et ajoutés dans le pool au besoin.

ExecutorService newCachedThreadPool(ThreadFactory threadFactory)

Cette surcharge de la méthode newCachedThreadPool() permet de passer en paramètre la fabrique de threads utilisée pour créer de nouveaux threads qui seront ajoutés dans le pool

ExecutorService newFixedThreadPool(int nThreads)

Renvoyer une instance de type ExecutorService qui utilise un pool de threads dont la taille est fixe. Les tâches à exécuter sont stockées dans une queue qui est dépilée au fur et à mesure de l'exécution des tâches par les threads du pool. La taille de la queue est illimitée.

ExecutorService newFixedThreadPool(int nThreads, ThreadFactory threadFactory)

Cette surcharge de la méthode newFixedThreadPool() permet de passer en paramètre la fabrique de threads qui sera utilisée pour créer de nouveaux threads ajoutés dans le pool

ScheduledExecutorService newScheduledThreadPool(int corePoolSize)

Renvoyer une instance de type ScheduledExecutorService dont la taille du pool de threads est précisée en paramètre

ScheduledExecutorService newScheduledThreadPool(int corePoolSize, ThreadFactory threadFactory)

Cette surcharge de la méthode newScheduledThreadPool() permet de passer en paramètre la fabrique de threads qui sera utilisée pour créer de nouveaux threads ajoutés dans le pool

ExecutorService newSingleThreadExecutor()

Renvoyer une instance de type ExecutorService qui utilise un pool n'ayant qu'un seul thread. Les tâches à exécuter sont stockées dans une queue qui est dépilée au fur et à mesure de l'exécution des tâches par le thread du pool. La taille de la queue est illimitée.

ExecutorService newSingleThreadExecutor(ThreadFactory threadFactory)

Cette surcharge de la méthode newSingleThreadExecutor() permet de passer en paramètre la fabrique de threads utilisée pour créer de nouveaux threads qui seront ajoutés dans le pool

ScheduledExecutorService newSingleThreadScheduledExecutor()

Renvoyer une instance de type ScheduledExecutorService dont le pool ne contient qu'un seul thread

ScheduledExecutorService newSingleThreadScheduledExecutor(ThreadFactory threadFactory)

Cette surcharge de la méthode newSingleThreadScheduledExecutor() permet de passer en paramètre la fabrique de threads utilisée pour créer de nouveaux threads qui seront ajoutés dans le pool

<T> Callable<T> privilegedCallable(Callable<T> callable)

Renvoyer une instance de type Callable qui lors de son invocation exécutera la tâche fournie en paramètre sous le contrôle de l'AccessController courant

<T> Callable<T> privilegedCallableUsingCurrentClassLoader(Callable<T> callable)

Renvoyer une instance de type Callable qui lors de son invocation exécutera la tâche fournie en paramètre sous le contrôle de l'AccessController courant

ThreadFactory privilegedThreadFactory()

Renvoyer la fabrique de threads utilisée pour créer de nouveau thread ayant les mêmes permissions que le thread courant

 

38.3. L'interface java.util.concurrent.Callable

L'interface java.lang.Runnable présente des limitations :

  • la méthode run() ne permet pas de récupérer une valeur de retour suite à l'exécution des traitements
  • la méthode run() ne permet pas de récupérer une exception levée durant l'exécution.

Avant Java 5, pour pallier ces limitations, il était nécessaire d'ajouter du code pour gérer ces fonctionnalités.

Java 5 propose l'interface java.util.concurrent.Callable<V>. Elle ne définit qu'une seule méthode public V call() throws Exception.

L'interface Callable est typée avec un generic qui permet de préciser le type de la valeur de retour de son unique méthode.

Exemple ( code Java 5.0 ) :
package fr.jmdoudoux.dej.thread;
 
import java.util.concurrent.Callable;
 
public class MonCallable implements Callable<String> {
 
  @Override
  public String call() throws Exception {
    try {
      Thread.sleep(5000);
    }
    catch(InterruptedException e){
      throw new Exception("Thread interrompu",e);
    }
    return "test";
  }
}

Si le Callable n'a pas de valeur de retour, il faut le typer avec Void.

Exemple ( code Java 5.0 ) :
package fr.jmdoudoux.dej.thread;
      
import java.util.concurrent.Callable;

public class MonCallable implements Callable<Void> {
  @Override
  public Void call() throws Exception {
    try {
      Thread.sleep(5000);
    } catch (InterruptedException e) {
      throw new Exception("Thread interrompu", e);
    }
    return null;
  }
}

Pour demander l'exécution asynchrone d'un Callable à un ExecutorService, il faut utiliser une des surcharges de la méthode submit(). Celle-ci renvoie une instance de type Future qui permet de vérifier le statut d'exécution d'un Callable et d'obtenir la valeur de retour à la fin de son exécution.

 

38.4. L'interface java.util.concurrent.Future

L'interface java.util.concurrent.Future<V> définit les fonctionnalités qui permettent de gérer le cycle de vie de l'exécution d'une tâche.

Méthode

Rôle

boolean cancel(boolean mayInterruptIfRunning)

Tenter d'interrompre l'exécution de la tâche

V get()

Attendre de manière bloquante la fin de l'exécution de la tâche et renvoyer son résultat

V get(long timeout, TimeUnit unit)

Attendre de manière bloquante la fin de l'exécution de la tâche ou la fin du timeout fourni en paramètre et renvoyer son résultat s'il est disponible

boolean isCancelled()

Renvoyer un booléen qui précise si l'exécution de la tâche est interrompue avant sa fin

boolean isDone()

Renvoyer un booléen qui précise si l'exécution de la tâche est terminée


Attention : la méthode get() est bloquante. Le thread qui l'invoque reste donc en attente de la fin de l'exécution de la tâche.

Exemple :
package fr.jmdoudoux.dej.thread;

import java.util.concurrent.Callable;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;

public class TestCallable {
  public static void main(String[] args) {
    Integer resultat;
    ExecutorService executor = Executors.newSingleThreadExecutor();
    Future<Integer> result = executor.submit(new Callable<Integer>() {
      @Override
      public Integer call() throws Exception {
        try {
          Thread.sleep(1000);
        } catch (InterruptedException e) {
          e.printStackTrace();
        }
        return Integer.valueOf(10);
      }
    });
    executor.shutdown();
    
    System.out.println("debut");
    long debut = System.currentTimeMillis();
    try {
      resultat = result.get();
      System.out.println("fin");
      System.out.printf("Resultat %d (%d ms)%n", resultat,
          System.currentTimeMillis() - debut);
    } catch (InterruptedException e) {
      e.printStackTrace();
    } catch (ExecutionException e) {
      e.printStackTrace();
    }
  }
}
Résultat :
debut
fin
Resultat 10 (1000 ms)

Il n'est pas pertinent de lancer l'exécution d'une tâche de manière asynchrone pour immédiatement attendre le résultat de l'exécution. Il est donc préférable de vérifier si l'exécution de la tâche est terminée en invoquant périodiquement la méthode isDone() qui renvoie un booléen valant true si l'exécution est finie.

Exemple :
package fr.jmdoudoux.dej.thread;

import java.util.concurrent.Callable;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;

public class TestCallable {
  public static void main(String[] args) {
    Integer resultat;
    ExecutorService executor = Executors.newSingleThreadExecutor();
    Future<Integer> result = executor.submit(new Callable<Integer>() {
      @Override
      public Integer call() throws Exception {
        try {
          Thread.sleep(1000);
        } catch (InterruptedException e) {
          e.printStackTrace();
        }
        return Integer.valueOf(10);
      }
    });
    executor.shutdown();
    
    long debut = System.currentTimeMillis();
    while (!result.isDone()) {
      System.out
          .printf("attente (%d ms)%n", System.currentTimeMillis() - debut);
      try {
        Thread.sleep(200);
      } catch (InterruptedException e) {
        e.printStackTrace();
      }
    }
    
    try {
      resultat = result.get();
      System.out.printf("Resultat %d (%d ms)%n", resultat,
          System.currentTimeMillis() - debut);
    } catch (InterruptedException e) {
      e.printStackTrace();
    } catch (ExecutionException e) {
      e.printStackTrace();
    }
  }
}
Résultat :
attente (0 ms)
attente (203 ms)
attente (406 ms)
attente (609 ms)
attente (812 ms)
Resultat 10 (1016 ms)

Ce n'est généralement pas une bonne pratique d'attendre, potentiellement indéfiniment, la fin de l'exécution d'une tâche car si celle-ci ne se termine jamais, c'est deux threads qui ne se terminent pas.

Exemple :
package fr.jmdoudoux.dej.thread;
      
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;

public class TestCallable {
  public static void main(String[] args) {
    Integer resultat;
    ExecutorService executor = Executors.newSingleThreadExecutor();
    Future<Integer> result = executor.submit(new Callable<Integer>() {
      @Override
      public Integer call() throws Exception {
        try {
          Thread.sleep(10000);
        } catch (InterruptedException e) {
          e.printStackTrace();
        }
        return Integer.valueOf(10);
      }
    });
    executor.shutdown();
    
    long debut = System.currentTimeMillis();
    try {
      resultat = result.get(500L, TimeUnit.MILLISECONDS);
      System.out.printf("Resultat %d (%d ms)%n", resultat,
          System.currentTimeMillis() - debut);
    } catch (InterruptedException e) {
      e.printStackTrace();
    } catch (ExecutionException e) {
      e.printStackTrace();
    } catch (TimeoutException e) {
      System.out
          .printf("Timeout (%d ms)%n", System.currentTimeMillis() - debut);
      System.out.printf("Execution terminee %b%n", result.isDone());
    }
  }
}
Résultat :
Timeout (515 ms)
Execution terminee false

L'idéale est de demander l'arrêt de l'exécution de la tâche en utilisant la méthode cancel() de la classe Future.

Une tâche effectue généralement des traitements dans une boucle. La méthode cancel() invoque la méthode interrupt() du thread. Ceci positionne le flag du thread mais ne met pas fin à son exécution.

Exemple :
package fr.jmdoudoux.dej.thread;
      
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;

public class TestCallable {
  public static void main(String[] args) {
    Integer resultat;
    ExecutorService executor = Executors.newSingleThreadExecutor();
    Future<Integer> result = executor.submit(new Callable<Integer>() {
      @Override
      public Integer call() throws Exception {
        int compteur = 0;
        while (compteur < 1000000000) {
          compteur++;
        }
        System.out.println("compteur=" + compteur + " "
            + Thread.currentThread().isInterrupted());
        return Integer.valueOf(compteur);
      }
    });
    executor.shutdown();
    
    long debut = System.currentTimeMillis();
    try {
      resultat = result.get(50L, TimeUnit.MILLISECONDS);
      System.out.printf("Resultat %d (%d ms)%n", resultat,
          System.currentTimeMillis() - debut);
    } catch (InterruptedException e) {
      e.printStackTrace();
    } catch (ExecutionException e) {
      e.printStackTrace();
    } catch (TimeoutException e) {
      System.out
          .printf("Timeout (%d ms)%n", System.currentTimeMillis() - debut);
      result.cancel(true);
      System.out.printf("Execution terminee %b%n", result.isDone());
    }
  }
}
Résultat :
Timeout (62 ms)
Execution terminee true
compteur=1000000000 true

Il est alors nécessaire de gérer dans la condition de la boucle, la demande d'interruption de l'exécution du thread de la tâche.

Exemple :
package fr.jmdoudoux.dej.thread;
      
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;

public class TestCallable {
  public static void main(String[] args) {
    Integer resultat;
    ExecutorService executor = Executors.newSingleThreadExecutor();
    Future<Integer> result = executor.submit(new Callable<Integer>() {
      @Override
      public Integer call() throws Exception {
        int compteur = 0;
        while ((compteur < 100000000)
            && (!Thread.currentThread().isInterrupted())) {
          compteur++;
        }
        System.out.println("compteur=" + compteur + " "
            + Thread.currentThread().isInterrupted());
        return Integer.valueOf(compteur);
      }
    });
    executor.shutdown();
    
    long debut = System.currentTimeMillis();
    try {
      resultat = result.get(50L, TimeUnit.MILLISECONDS);
      System.out.printf("Resultat %d (%d ms)%n", resultat,
          System.currentTimeMillis() - debut);
    } catch (InterruptedException e) {
      e.printStackTrace();
    } catch (ExecutionException e) {
      e.printStackTrace();
    } catch (TimeoutException e) {
      System.out
          .printf("Timeout (%d ms)%n", System.currentTimeMillis() - debut);
      result.cancel(true);
      System.out.printf("Execution terminee %b%n", result.isDone());
    }
  }
}
Résultat :
Timeout (62 ms)
Execution terminee true
compteur=1877256 true

 

38.5. L'interface java.util.concurrent.CompletionService

La gestion de la récupération des résultats de plusieurs tâches exécutées de manière asynchrone par un ExecutorService nécessite d'écrire un peu de code.

Exemple :
package fr.jmdoudoux.dej.thread;

import java.util.concurrent.Callable;

public class MaTache implements Callable<Integer> {
  
  private final int duree;
  
  public MaTache(int duree) {
    this.duree = duree;
  }
  
  @Override
  public Integer call() {
    System.out.println("Debut tache " + Thread.currentThread().getName());
    try {
      Thread.sleep(1000 * duree);
    } catch (InterruptedException e) {
      e.printStackTrace();
    }
    System.out.println("Fin tache " + Thread.currentThread().getName());
    return duree;
  }
}
Exemple :
package fr.jmdoudoux.dej.thread;
      
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;

public class TestCompletionService {
  private static final int NB_TACHES = 5;
  
  public static void main(String[] args) {
    ExecutorService executor = Executors.newFixedThreadPool(NB_TACHES);
    List<Future<Integer>> futures = 
      new ArrayList<Future<Integer>>(NB_TACHES);
    
    for (int i = 0; i < NB_TACHES; i++) {
      futures.add(executor.submit(new MaTache(NB_TACHES - i)));
    }
    
    for (Future<Integer> future : futures) {
      Integer resultat;
      try {
        resultat = future.get();
        System.out.println("resultat = " + resultat);
      } catch (InterruptedException e) {
        e.printStackTrace();
      } catch (ExecutionException e) {
        e.printStackTrace();
      }
    }
    executor.shutdown();
  }
}
Résultat :
Debut tache
pool-1-thread-1
Debut tache
pool-1-thread-2
Debut tache
pool-1-thread-3
Debut tache
pool-1-thread-4
Debut tache
pool-1-thread-5
Fin tache
pool-1-thread-5
Fin tache
pool-1-thread-4
Fin tache
pool-1-thread-3
Fin tache
pool-1-thread-2
Fin tache
pool-1-thread-1
resultat = 5
resultat = 4
resultat = 3
resultat = 2
resultat = 1

L'inconvénient de cette solution est qu'elle récupère les résultats dans l'ordre dans lequel les tâches sont soumises. Ainsi, si le temps d'exécution de la première tâche est le plus long, il faudra attendre sa fin avant de commencer à obtenir un résultat même si toutes les autres tâches sont terminées.

Pour faciliter la gestion de l'attente et de l'obtention des résultats de plusieurs tâches exécutées par un ExecutorService, l'API propose l'interface CompletionService.

Une instance de type CompletionService permet l'exécution de tâches asynchrones et surtout facilite la récupération de leurs résultats au fur et à mesure de l'achèvement de ces tâches.

Elle définit plusieurs méthodes :

Méthode

Rôle

Future<V> poll()

Obtenir l'instance de type Future de la prochaine tâche qui se terminera. Elle renvoie null si aucune tâche ne s'est terminée

Future<V> poll(long timeout, TimeUnit unit)

Obtenir l'instance de type Future de la prochaine tâche qui se terminera. Elle renvoie null si aucune tâche ne s'est terminée avant l'expiration du timeout fourni en paramètre

Future<V> submit(Callable<V> task)

Demander l'exécution de la tâche fournie en paramètre

Future<V> submit(Runnable task, V result)

Demander l'exécution de la tâche fournie en paramètre

Future<V> take()

Obtenir l'instance de type Future de la prochaine tâche qui se terminera. Cette méthode est bloquante jusqu'à ce qu'une tâche soit terminée.


La classe ExecutorCompletionService implémente l'interface CompletionService.

Elle sépare les activités de soumission de tâches et de récupérations de leurs résultats en utilisant deux files. Les méthodes poll() et take() retirent de la file des résultats les objets de type Future qui sont retournés.

La méthode bloquante take() permet de renvoyer le résultat d'une exécution qui n'a pas encore été consommé.

Exemple :
package fr.jmdoudoux.dej.thread;

import java.util.concurrent.CompletionService;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorCompletionService;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;

public class TestCompletionService {
  private static final int NB_TACHES = 5;
  public static void main(String[] args) {
    ExecutorService executor = Executors.newFixedThreadPool(NB_TACHES);
    CompletionService<Integer> completion = new ExecutorCompletionService<Integer>(
        executor);
    for (int i = 0; i < NB_TACHES; i++) {
      completion.submit(new MaTache(NB_TACHES - i));
    }
    for (int i = 0; i < NB_TACHES; i++) {
      Integer resultat;
      try {
        resultat = completion.take().get();
        System.out.println("resultat = " + resultat);
      } catch (InterruptedException e) {
        e.printStackTrace();
      } catch (ExecutionException e) {
        e.printStackTrace();
      }
    }
    executor.shutdown();
  }
}
Résultat :
Debut tache
pool-1-thread-1
Debut tache
pool-1-thread-2
Debut tache
pool-1-thread-3
Debut tache
pool-1-thread-4
Debut tache
pool-1-thread-5
Fin tache
pool-1-thread-5
resultat = 1
Fin tache
pool-1-thread-4
resultat = 2
Fin tache
pool-1-thread-3
resultat = 3
Fin tache
pool-1-thread-2
resultat = 4
Fin tache
pool-1-thread-1
resultat = 5

Il n'est plus utile de gérer une collection de type Future.

Les invocations successives de la méthode take() renvoient les instances de type Future au fur et à mesure de la fin de l'exécution des tâches. Les résultats sont ainsi obtenus non pas dans leur ordre de lancement mais dans l'ordre dans lequel les tâches se terminent.

Attention : comme la méthode take() est bloquante, il ne faut surtout pas l'invoquer un nombre de fois supérieur à celui des tâches soumises car dans ce cas, le thread restera bloqué indéfiniment.

 


[ Précédent ] [ Sommaire ] [ Suivant ] [Télécharger ]      [Accueil ]

78 commentaires Donner une note à l´article (5)

 

Copyright (C) 1999-2022 Jean-Michel DOUDOUX. Vous pouvez copier, redistribuer et/ou modifier ce document selon les termes de la Licence de Documentation Libre GNU, Version 1.1 ou toute autre version ultérieure publiée par la Free Software Foundation; les Sections Invariantes étant constitués du chapitre Préambule, aucun Texte de Première de Couverture, et aucun Texte de Quatrième de Couverture. Une copie de la licence est incluse dans la section GNU FreeDocumentation Licence. La version la plus récente de cette licence est disponible à l'adresse : GNU Free Documentation Licence.