Producteur-Consommateur : est-ce correct ?

Le problème exposé dans ce sujet a été résolu.

Salut tout le monde,

J'ai mis en place un système producteur-consommateur en Java, avec un tampon borné de 10 entiers implémenté à l'aide de la classe ConcurrentBuffer. Cette dernière possède deux méthodes : une qui permet de déposer un entier, l'autre d'en retirer un (le plus vieux). Un délai de 1/100s est mis entre chaque opération.

Mon code compile, s'exécute bien et fait ce qu'il est censé faire ; cependant je voudrais être sûr que je n'ai pas fait une petite erreur…

J'ai donc en tout quatre classes :

  1. Consommateur ;
  2. Producteur ;
  3. ConcurrentBuffer ;
  4. Launcher

Classes Consommateur et Producteur

Les classes Consommateur et Producteur possèdent toutes deux un attribut d'instance de type ConcurrentBuffer, donné lors de leur instanciation dans la classe Launcher.

Consommateur ne fait qu'appeler la méthode de ConcurrentBuffer extrayant l'élément. De même, Producteur ne fait qu'appeler la méthode ajoutant un élément (tout simplement 0).

Code de Consommateur

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
public class  Consommateur extends Thread {
    private ConcurrentBuffer concurrent_buffer;

    public Consommateur(ConcurrentBuffer concurrent_buffer) {
        this.concurrent_buffer = concurrent_buffer;
    }

    public void run() {
        try {
            while (true) {
                concurrent_buffer.getOldest();
                System.out.println("Le consommateur a retiré un élément.");
                Thread.sleep(100);
            }
        } catch(InterruptedException e) {
            System.out.println(e);
        }
    }

}

Code de Producteur

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
public class Producteur extends Thread {
    private ConcurrentBuffer concurrent_buffer;

    public Producteur(ConcurrentBuffer concurrent_buffer) {
        this.concurrent_buffer = concurrent_buffer;
    }

    public void run() {
        try {
            while (true) {
                concurrent_buffer.push(0);
                System.out.println("Le producteur a placé un élément.");
                Thread.sleep(100);
            }
        } catch(InterruptedException e) {
            System.out.println(e);
        }
    }
}

Code de ConcurrentBuffer

ConcurrentBuffer possède l'attribut d'instance array (une ArrayList) ainsi que trois autres attributs d'instance, des Semaphore. L'idée est d'avoir un mutex (qui empêche le thread-producteur et le thread-consommateur de travailler au même instant T sur le ConcurrentBuffer). Les deux autres Semaphore permettent de borner l'ArrayList du ConcurrentBuffer : ainsi places_libres est-il décrémenté (acquire()) lorsque le thread-producteur travaille, et incrémenté (release()) lorsque le thread-consommateur travaille. Inversement pour le Semaphore nommé nombre_elements.

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
import java.util.ArrayList;
import java.util.concurrent.Semaphore;

public class ConcurrentBuffer {
    private ArrayList<Integer> array;
    private Semaphore mutex, places_libres, nombre_elements;

    public ConcurrentBuffer() {
        array = new ArrayList<Integer>();

        this.mutex = new Semaphore(1);
        this.places_libres = new Semaphore(10);
        this.nombre_elements = new Semaphore(0);
    }

    public void push(int value_to_push) {
        try {

            places_libres.acquire();
            mutex.acquire();
            array.add(value_to_push);
            mutex.release();
            nombre_elements.release();

        } catch (InterruptedException e) {
            System.out.println(e);
        }
    }

    public void getOldest() {
        try {

            nombre_elements.acquire();
            mutex.acquire();
            array.remove(0);
            mutex.release();
            places_libres.release();

        } catch(InterruptedException e) {
            System.out.println(e);
        }
    }

} 

Voilà. Si quelqu'un a le temps de vérifier que ce que j'ai fait est correct, ce serait vraiment cool !

Merci d'avance !

+0 -0

Ton approche à base de sémaphores est courante, mais en java on fait plutôt ça avec des moniteurs et du wait/notify, mais je ne sais pas si la contrainte vient d'un éventuel énoncé. Pour le mutex, il pourrait être remplace par un synchronized(array), mais la aussi tu as peut être des contraintes d’énoncé. Sinon, un ReentrantLock suffira.

D'un point de vue terminologique, les classes Concurrent* en java emploient des mécanismes de synchronisation complètement différents, le nom est donc a priori inadapté.

D'un autre coté, vu que ton fonctionnement est FIFO, je ne pense pas qu'un ArrayList est ce qu'il y a de plus adapté. Si par exemple j'avais voulu prendre une classe native pour faire ce que tu fais, j'aurais choisi ArrayBlockingQueue, qui utilise un simple array en interne, mais utilise une technique d'index circulaires.

Sinon, c'est un peu bizarre que getOldest ne renvoie rien. Et ce serait plus joli d'avoir une classe ConcurrentBuffer générique (si tu sais faire).

+0 -0

En effet l'énoncé m'impose d'utiliser des sémaphores. De même, le nom de la classe est indiqué par le prof =/

J'ai pensé à utiliser un tableau d'entier (int[]) à la place d'une ArrayList. L'avantage étant que la borne du haut est explicitement indicable. Mais comment effectuer simplement un empilement (méthode push) ? En tout cas merci de m'avoir parlé d'ArrayBlockingQueue, je vais voir de ce côté. J'ai cependant un peu peur que certains éléments soient remplacés par d'autres vu que c'est une liste circulaire. Il aurait été préférable de retourner une exception quand on essaie d'ajouter un élément en trop.

Pour getOldest je pense que je vais retourner la valeur supprimée et enlever les 2 release du coup. Je placerai ces derniers dans une nouvelle méthode, releaseOldest qui servira juste à rendre disponible les sémaphores. Qu'en dis-tu ?

Je vais me renseigner sur la généricité, merci ! :)

EDIT

Voici le nouveau code, qui tient compte de tes remarques (généricité + getOldest).

Classe Producteur

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
public class Producteur extends Thread {
    private ConcurrentBuffer<Integer> concurrent_buffer;

    public Producteur(ConcurrentBuffer<Integer> concurrent_buffer) {
        this.concurrent_buffer = concurrent_buffer;
    }

    public void run() {
        try {
            while (true) {
                Integer value = new Integer((int) (Math.random() + 0.5f));
                concurrent_buffer.push(value);
                System.out.println("Le producteur a placé l'élément : " + value);
                Thread.sleep(100);
            }
        } catch(InterruptedException e) {
            System.out.println(e);
        }
    }
}

Classe Consommateur

1
2
3
4
5
6
7
8
public class  Consommateur extends Thread {
    private ConcurrentBuffer<Integer> concurrent_buffer;

    public Consommateur(ConcurrentBuffer<Integer> concurrent_buffer) {
        this.concurrent_buffer = concurrent_buffer;
    }
[...]
}

Classe ConcurrentBuffer

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
package producteur_consommateur;

import java.util.ArrayList;
import java.util.concurrent.Semaphore;

public class ConcurrentBuffer<Integer> {
    private ArrayList<Integer> array;
    private Semaphore mutex, places_libres, nombre_elements;

    public ConcurrentBuffer() {
        array = new ArrayList<Integer>();

        this.mutex = new Semaphore(1);
        this.places_libres = new Semaphore(10);
        this.nombre_elements = new Semaphore(0);
    }

    public void push(Integer value_to_push) {
        try {

            places_libres.acquire();
            mutex.acquire();
            array.add(value_to_push);
            mutex.release();
            nombre_elements.release();

        } catch (InterruptedException e) {
            System.out.println(e);
        }
    }

    public Integer getOldest() {
        try {
            nombre_elements.acquire();
            mutex.acquire();
        } catch(InterruptedException e) {
            System.out.println(e);
        }

        return array.remove(0);
    }

    public void releaseOldest() {
        mutex.release();
        places_libres.release();
    }

}

La classe Launcher contient désormais le main suivant :

1
2
ConcurrentBuffer<Integer> concurrent_buffer = new ConcurrentBuffer<Integer>();
[...]
+0 -0

J'ai cependant un peu peur que certains éléments soient remplacés par d'autres vu que c'est une liste circulaire. Il aurait été préférable de retourner une exception quand on essaie d'ajouter un élément en trop.

Ça ne peut pas arriver, grâce a tes sémaphores. Le but étant justement de ne pas lever d'exception, mais plutôt mettre le thread appelant en attente.

Pour getOldest je pense que je vais retourner la valeur supprimée et enlever les 2 release du coup. Je placerai ces derniers dans une nouvelle méthode, releaseOldest qui servira juste à rendre disponible les sémaphores. Qu'en dis-tu ?

J'en dis que c'est une très mauvaise idée. Ta classe doit rester simple a utiliser, et tout le travail de synchronisation doit être fait dedans, tu ne dois surtout pas te reposer sur le code utilisateur pour appeler.

Voici le nouveau code, qui tient compte de tes remarques (généricité + getOldest).

1
2
3
public class ConcurrentBuffer<Integer> {
    // ...
}

Lors de la déclaration, on utilise généralement T pour un type générique. En l’état, le nom n'a pas de sens puisque ton buffer peut très bien stocker autre chose que des entiers, et ton code prête complètement a confusion, vu que le type parameter a le même nom qu'un type natif de java, et le masque du coup.

Ça ne peut pas arriver, grâce a tes sémaphores. Le but étant justement de ne pas lever d'exception, mais plutôt mettre le thread appelant en attente.

Oui c'est vrai, je vais faire ça du coup merci :) !

J'en dis que c'est une très mauvaise idée. Ta classe doit rester simple a utiliser, et tout le travail de synchronisation doit être fait dedans, tu ne dois surtout pas te reposer sur le code utilisateur pour appeler.

Du coup le code suivant te paraît bon ?

1
2
3
4
5
6
7
// Je n'écris pas try catch par souci de lisibilité
public int getOldest() {
    les_semaphores.acquire();
    int e = array.remove(0);
    les_semaphores.release();
    return e;
}

Lors de la déclaration, on utilise généralement T pour un type générique. En l’état, le nom n'a pas de sens puisque ton buffer peut très bien stocker autre chose que des entiers, et ton code prête complètement a confusion, vu que le type parameter a le même nom qu'un type natif de java, et le masque du coup.

Je pensais qu'il fallait indiquer à l'utilisateur de la classe ConcurrentBuffer que cette dernière possédait une liste d'entiers en fait (puisque sa méthode push n'y place que des entiers, et idem pour le getter). Du coup je suis un peu perdu =/

Du coup le code suivant te paraît bon ?

1
2
3
4
5
6
7
// Je n'écris pas try catch par souci de lisibilité
public int getOldest() {
    les_semaphores.acquire();
    int e = array.remove(0);
    les_semaphores.release();
    return e;
}

Oui.

Je pensais qu'il fallait indiquer à l'utilisateur de la classe ConcurrentBuffer que cette dernière possédait une liste d'entiers en fait (puisque sa méthode push n'y place que des entiers, et idem pour le getter). Du coup je suis un peu perdu =/

Ca se passe comme ça :

1
2
3
4
5
6
7
8
// declaration
public class ConcurrentBuffer<T> {
    private ArrayList<T> array;
    // ...
}

// utilisation
ConcurrentBuffer<Integer> concurrent_buffer = new ConcurrentBuffer<Integer>();

Alors du coup j'ai changé le type ArrayList en ArrayBlockingQueue comme tu me l'avais recommandé, j'ai corrigé mon "erreur" concernant la généricité et enfin j'ai mis à jour la méthode getOldest.

Voici le nouveau code, n'hésite pas à me dire s'il y a un nouveau truc à corriger :) !

Note que j'ai utilisé add et poll d'ArrayBlockingQueue.

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
package producteur_consommateur;

import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.Semaphore;

public class ConcurrentBuffer<T> {
    private ArrayBlockingQueue<T> array;
    private Semaphore mutex, places_libres, nombre_elements;

    public ConcurrentBuffer() {
        array = new ArrayBlockingQueue<T>(10);

        this.mutex = new Semaphore(1);
        this.places_libres = new Semaphore(10);
        this.nombre_elements = new Semaphore(0);
    }

    public void push(T value_to_push) {
        try {

            places_libres.acquire();
            mutex.acquire();
            array.add(value_to_push);
            mutex.release();
            nombre_elements.release();

        } catch (InterruptedException e) {
            System.out.println(e);
        }
    }

    public T getOldest() {
        T ret = null;
        try {
            nombre_elements.acquire();
            mutex.acquire();
            ret = array.poll();
            mutex.release();
            places_libres.release();
        } catch(InterruptedException e) {
            System.out.println(e);
        }

        return ret;
    }
}
+0 -0

Attention, je t'ai indiqué ArrayBlockingQueue comme remplaçant potentiel pour ton ConcurrentBuffer (et éventuellement t'inspirer de sa mécanique interne), surtout pas pour ton ArrayList (du coup, tu refais deux fois le travail de synchronisation).

+0 -0
Connectez-vous pour pouvoir poster un message.
Connexion

Pas encore membre ?

Créez un compte en une minute pour profiter pleinement de toutes les fonctionnalités de Zeste de Savoir. Ici, tout est gratuit et sans publicité.
Créer un compte