blockingcollection

C# propose de nombreuses classes et structures de données pour gérer les tâches et les flux de données asynchrones. Après la classe ManualResetEvent, étudions la classe BlockingCollection. Elle offre une solution élégante pour coordonner producteurs et consommateurs dans un environnement multithread.

Dans cet article, nous explorerons les BlockingCollection en C#.
Comment elles simplifient la communication entre les différents threads en fournissant un moyen sûr et fiable d’envoyer et de recevoir des données?
Quelles sont les différentes fonctionnalités offertes, telles que les opérations de blocage, les stratégies de file d’attente?

Création d’une BlockingCollection

La création d’une BlockingCollection est très simple. Voici la syntaxe:

BlockingCollection<int> bCollection = new BlockingCollection<int>(10);

L’entier en paramètre est le nombre maximum d’éléments qu’elle peut contenir.

Par défaut, l’objet de type BlockingCollection utilisera une ConcurrentQueue pour stocker ses éléments. Il est possible d’utiliser d’autres collections, par exemple ConcurrentStack ou ConcurrentBag. On peut également créer notre propre classe, il suffit qu’elle hérite de l’interface IProducerConsumerCollection.

Si on utilise par exemple, un ConcurrentBag, la création de notre objet devient:

BlockingCollection<int> bCollection = new BlockingCollection<int>(new ConcurrentBag<int>(), 10);

Ajouter des éléments

Pour ajouter des éléments dans une BlockingCollection, on peut utiliser les méthodes Add(…) et TryAdd(…). Add(…) prend un seul paramètre, l’élément qu’on veut ajouter.

bCollection.Add(1)

Si on tente d’ajouter un élément alors que la collection est pleine, l’exécution est bloquée jusqu’à ce qu’un élément soit retiré.

TryAdd(…) est une alternative. En plus de l’élément à ajouter, elle prend en paramètre un timeout.

Si l’élément n’est pas ajouté dans le délai imparti, elle retourne false.

BlockingCollection<int> bCollection = new BlockingCollection<int>(2);
bCollection.Add(1);
bCollection.Add(2);

if(bCollection.TryAdd(3, TimeSpan.FromSeconds(1)))
{
    Console.WriteLine("Item added");
}
else
{
    Console.WriteLine("Item not added");
}

L’exécution du code ci-dessus affichera « Item not added« , car la collection est pleine et ne peut plus recevoir de nouveaux éléments.

Retirer des éléments

La classe BlockingCollection propose deux méthodes pour retirer un élément: Take() et TryTake().

Take() retire un élément de la collection et le renvoie. La méthode est bloquante tant que la collection est vide.

BlockingCollection<int> bCollection = new BlockingCollection<int>(2);
bCollection.Add(1);
bCollection.Add(2);

int item = bCollection.Take();
Console.WriteLine(item); //affiche 1

La méthode TryTake() retire aussi un élément mais elle prend en plus un paramètre de timeout. Si la collection est vide, la méthode attend la durée du timeout avant de retourner false. Elle retournera true, si elle a trouvé un élément dans la collection.

BlockingCollection<int> bCollection = new BlockingCollection<int>(2);
bCollection.Add(1);
bCollection.Add(2);

int item = bCollection.Take();
item = bCollection.Take();

if (bCollection.TryTake(out item, TimeSpan.FromSeconds(1)))
{
    Console.WriteLine(item);
}
else
{
    Console.WriteLine("No item removed");
}

L’exécution du code ci-dessus affichera « No item removed« 

La méthode CompleteAdding et la propriété IsCompleted

Le thread producteur appelle la méthode CompleteAdding() pour informer le consommateur qu’il n’ajoutera plus d’élément dans la BlockingCollection. Cet appel fixe la propriété booléenne IsAddingCompleted à true. La propriété IsCompleted est utilisée par le thread consommateur. Elle renvoie true lorsque IsAddingCompleted est true et la collection est vide.

Mais l’utilisation de cette propriété n’est pas recommandée.

Dans l’exemple ci-dessous, nous avons un thread producteur et deux threads consommateurs.

BlockingCollection<int> bCollection = new BlockingCollection<int>(10);

Task producerThread = Task.Factory.StartNew(() =>
{
    for (int i = 1; i <= 10; ++i)
    {
        Thread.Sleep(1000);
        Console.WriteLine($"producteur - element {i}");
        bCollection.Add(i);
    }
    
    bCollection.CompleteAdding();
    Console.WriteLine($"producteur - fin des envois");
});

Task consumerThread = Task.Factory.StartNew(() =>
{
    while (!bCollection.IsCompleted)
    {
        int item = bCollection.Take();
        Console.WriteLine($"consommateur[1] - element {item}");
    }

    Console.WriteLine($"consommateur[1] - fin de la reception");
});


Task consumerThread2 = Task.Factory.StartNew(() =>
{
    while (!bCollection.IsCompleted)
    {
        int item = bCollection.Take();
        Console.WriteLine($"consommateur[2] - element {item}");
    }

    Console.WriteLine($"consommateur[2] - fin de la reception");
});

Task.WaitAll(producerThread, consumerThread, consumerThread2);

Si les deux threads consommateurs sont dans la boucle while(), l’un d’eux consommera le dernier élément avant l’autre. Lorsque le second essaiera de consommer l’élément déjà pris par le premier avec Take() une exception : System.InvalidOperationException: ‘The collection argument is empty and has been marked as complete with regards to additions.’ sera levée vu que la collection est déjà vide.

La méthode GetConsumingEnumerable()

La méthode GetConsumingEnumerable() retourne un IEnumerable que nous pouvons utiliser dans une boucle foreach. Cette méthode retourne les éléments une fois qu’ils sont disponibles dans la collection. Quand la collection est vide, la boucle est bloquée jusqu’à ce que le producteur confirme la fin de l’ajout de nouveaux éléments.

Notre code précédent devient

BlockingCollection<int> bCollection = new BlockingCollection<int>(10);

Task producerThread = Task.Factory.StartNew(() =>
{
    for (int i = 1; i <= 10; ++i)
    {
        Thread.Sleep(1000);
        Console.WriteLine($"producteur - element {i}");
        bCollection.Add(i);
    }
    
    bCollection.CompleteAdding();
    Console.WriteLine($"producteur - fin des envois");
});

Task consumerThread = Task.Factory.StartNew(() =>
{
    while (!bCollection.IsCompleted)
    {
        int item = bCollection.Take();
        Console.WriteLine($"consommateur[1] - element {item}");
    }

    Console.WriteLine($"consommateur[1] - fin de la reception");
});


Task consumerThread2 = Task.Factory.StartNew(() =>
{
    while (!bCollection.IsCompleted)
    {
        int item = bCollection.Take();
        Console.WriteLine($"consommateur[2] - element {item}");
    }

    Console.WriteLine($"consommateur[2] - fin de la reception");
});

Task.WaitAll(producerThread, consumerThread, consumerThread2);

Avec ce code, il n’y aura aucun soucis. Une fois que le producteur informera qu’il n’ajoutera pas d’élément et que la collection sera vide, les consommateurs sortiront de la boucle. GetConsumingEnumerable() permet d’itérer en toute sécurité sur une collection sans se soucier des problèmes liés à l’accès concurrentiel par plusieurs threads.

En conclusion, les BlockingCollection offrent une solution puissante et simple pour la gestion des communications entre threads. Elles permettent d’établir une synchronisation efficace et sécurisée, facilitant ainsi le développement d’applications multi-threadées robustes. Cette fonctionnalité simplifie grandement la tâche du développeur en éliminant la nécessité de mettre en place des mécanismes de synchronisation complexes tels que les sémaphores ou les mutex.

Image by pch.vector on Freepik

Auteur : Daniel MINKO FASSINOU

Laisser un commentaire




Ce site utilise Akismet pour réduire les indésirables. En savoir plus sur comment les données de vos commentaires sont utilisées.