最后活跃于 1746591606

taken from https://gist.github.com/AleksandrKonst/8f4513f9d331bc4265ffc1cc3dbe05a4#02

Parallel.md 原始文件

Оглавление

  1. Потоки.
  2. Асинхронный и синхронный код.
  3. Введение в проблематику параллельного программирования.
  4. Процессы.
  5. Ресурсы. Проблемы взаимодействия процессов.
  6. Примитивы синхронизации: критические секции.
  7. Примитивы синхронизации: семафоры.
  8. Примитивы синхронизации: Мьютексы, Спинлоки.
  9. Асинхронность в .NET
  10. Планировщик задач.
  11. C#
  12. Notes
  13. Приятная неожиданность

01. Потоки.

  • Потоки как правило являются базовыми примитивами параллельного/конкурентного выполнения кода
  • В реальности уже применятся более высокоуровневые абстракции и приемы асинхронного и конкурентного кода

C++

  • До C++ 11
    • Основной подход – определить конкурентную функцию
    • Далее создать поток «привязав» его к функции
    • Реализовать «ожидание выполнения» с помощью средств синхронизации
    • После выполнения – закрыть поток
void mythreadA(void* data)
{
  printf("mythreadA %d \n", GetCurrentThreadId());
}
void mythreadB(void* data)
{
  volatile int i;
  //Эмулируем бурную активность
  for (i = 0; i < 100000; i++) {}
  //GetCurrentThreadId() – возвращает id потока
  printf("mythreadB %d \n", GetCurrentThreadId());
}

int main(int argc, char* argv[])
{
  HANDLE myhandleA, myhandleB;
  myhandleA = (HANDLE)_beginthreadex(0, 0, &mythreadA;, 0, 0, 0);
  myhandleB = (HANDLE)_beginthreadex(0, 0, &mythreadB;, 0, 0, 0);
  WaitForSingleObject(myhandleA, INFINITE);
  WaitForSingleObject(myhandleB, INFINITE);
  CloseHandle(myhandleA);
  CloseHandle(myhandleB);
}
  • C++ 11. Класс thread
    • Основной подход – создать объект потока, передав в конструктор «вызываемый» (callable) объект
      • После создания выполнится callable код
      • callable код:
      • указатель на функцию
      • функциональный объект
      • лямбда выражение
#include <iostream>
#include <thread>
using namespace std;
//Простая функция
void foo(int Z)
{
  for (int i = 0; i < Z; i++) {
    cout << "Поток использует указатель на функцию"
      " как «вызываемую» - callable\n";
  }
}

// Вызываемый объект
class thread_obj {
public:
  void operator()(int x)
  {
    for (int i = 0; i < x; i++)
      cout << "Поток использует"
        " объект как вызываемый\n";
  }
}; 

int main()
{
  cout << "Потоки 1 и 2 выполняются независимо" << endl;
  // Данный поток выполняется используя функцию как «вызываемую»
  thread th1(foo, 3);
  // Данный поток выполняется используя ф-ый объект как «вызываемый»
  thread th2(thread_obj(), 3);
  // Ждем завершения потоков
  th1.join();
  th2.join();
  return 0;
} 

Java

  • Java до 5 версии
    • Класс Thread как примитивная основа для конкурентного кода
    • Два подхода:
      • Наследуем класс Thread и переопределяем метод run
      • Реализуем интерфейс Runnable
class MultithreadingCodeDemo extends Thread {
  public void run()
  {
    try {
      // Выводим сообщение о работе потока и его id
      System.out.println(
        "Thread " + Thread.currentThread().getId()
        + " is running");
    }
    catch (Exception e) {
      // По хорошему обрабатываем ошибки!
      System.out.println("Все кекнулось :(");
    }
  }
}
public class Main {
  public static void main(String[] args)
  {
    int n = 8; // Количество потоков
    for (int i = 0; i < n; i++) {
      MultithreadingCodeDemo object
        = new MultithreadingCodeDemo();
      object.start();
      //Можно ждать завершения – object.join()
    }
  }
}
  • Интерфейс Runnable
class MultithreadingCodeDemo implements Runnable {
  public void run()
  {
    try {
      // Выводим сообщение о работе потока и его id
      System.out.println(
        "Thread " + Thread.currentThread().getId()
        + " is running");
    }
    catch (Exception e) {
      // По хорошему обрабатываем ошибки!
      System.out.println("Все кекнулось :(");
    }
  }
}
class Multithread {
  public static void main(String[] args)
  {
    int n = 8; // Количество потоков
    for (int i = 0; i < n; i++) {
      Thread object
        = new Thread(new MultithreadingCodeDemo());
      object.start();
      //Можно ждать завершения – object.join()
    }
  }
}

С#

  • С# до 3
    • Класс Thread как примитивная основа для конкурентного кода
    • Создаем объект класса Thread и «привязываем» метод
class Program
{
  static void Main(string[] args)
  {
    int n = 8; // Количество потоков
    for (int i = 0; i < n; i++) {
      Thread thread = new Thread(new ThreadStart(MultithreadingCodeMethod));
      thread.Start();
      //Можно ждать завершения – thread.Join();
    }
  }
  
  static void MultithreadingCodeMethod()
  {
    Console.WriteLine($"Id потока - {Thread.CurrentThread.ManagedThreadId}");
  }
}  

02. Асинхронный и синхронный код.

  • Поток.
    • Поток выполнения – наименьшая единица обработки для параллельного выполнения отдельных частей одной программы.
    • Для работы с потоками выполнения в .NET есть пространство имен System.Threading и основной его класс Thread

Синхронный код

image

  • Синхронный код выполняется шаг за шагом
static void Main()
{
 int n = int.Parse(Console.ReadLine());
 PrintNumbersInRange(0, n);
 Console.WriteLine("Done.");
}
static void PrintNumbersInRange(int a, int b)
{
 for (int i = a; i <= b; i++)
 {
  Console.WriteLine(i);
 }
}
  • Синхронный код – ресурсоемкие операции

image

  • Недостатки синхронного кода
    • Если один компонент заблокирован, то блокируется вся программа
    • Пользовательский интерфейс может перестать отвечать на запросы
    • Отсутствие использования преимуществ многоядерных систем
    • Требовательные к процессору задачи задерживают выполнение всех остальных задач
    • Доступ к удаленным ресурсам может заблокировать всю программу
      • Особенно проблематично с веб-ресурсами

Асинхронный код

  • Программные компоненты могут выполняться параллельно
    • Некоторые действия выполняются параллельно с другими действиями
    • Каждое действие может происходить в отдельном потоке
  • Независимые компоненты не ждут друг друга
  • Программные ресурсы разделены между потоками
    • Если один поток использует ресурсы, другие не должны их использовать

image

  • Асинхронное программирование

    • Асинхронное программирование – подход к написанию кода, который позволяет выполнять второстепенные и долго выполняемые задачи, не блокируя основной поток выполнения.
  • Пример. Загрузка файла.

    • Асинхронный процесс :
      • Начинаем асинхронную загрузку файла.
      • Выводим пользователю индикатор загрузки.
      • Считываем с большого файла данные в приложение.
      • Происходит считывание строк.
      • Подсчитываем количество строк.
      • Когда результат асинхронной операции будет готов – убираем индикатор загрузки и выводим результат на экран пользователю.

Параллельное программирование

  • Параллельное программирование – физическое выполнение несколько операций одновременно. Достигается путем аппаратных возможностей вычислительной техники, а именно благодаря наличию нескольких ядер

  • Асинхронный процесс с использованием параллельности:

    • Начинаем асинхронную загрузку файла.
    • Выводим пользователю индикатор загрузки.
    • Считываем с большого файла данные в приложение.
    • Происходит параллельное считывание строк
    • Параллельно подсчитываем количество строк
    • Когда результат асинхронной операции будет готов – убираем индикатор загрузки и выводим результат на экран пользователю.
  • Применение асинхронности

    • Пользовательский интерфейс – чтобы избежать не отвечающих приложений. Второстепенные задачи
    • Одновременная обработка нескольких клиентских запросов. Запросы в базу данных.
    • Работа с файловой системой. Сетевые запросы.
  • Асинхронный код, преимущества

    • Если компонент заблокирован, другие компоненты все еще работают
    • Пользовательский интерфейс работает отдельно и всегда остается отзывчивым
    • Использование преимуществ многоядерных систем
    • На каждом ядре выполняется один или несколько потоков
    • Требовательные к процессору задачи выполняются в «фоновых» потоках
    • Доступ к удаленным ресурсам выполняется в «фоновых» потоках
  • Асинхронный код, недостатки

    • Трудно понять, какие части кода выполняются в конкретное время
    • Отлаживать труднее, чем обычно
    • Приходится беречь ресурсы
      • Один поток использует ресурс
      • Другие потоки должны ждать освобождение этого ресурса
    • Трудно синхронизировать доступ к ресурсам
      • Могут возникнуть взаимоблокировки (deadlocks)

image

Пул потоков – Thread Pool

image

  • Создание потоков требует времени. Если есть различные короткие задачи, подлежащие выполнению, можно создать набор потоков заранее и затем просто отправлять соответствующие запросы, когда наступает очередь для их выполнения. Было бы неплохо, если бы количество этих потоков автоматически увеличивалось с ростом потребности в потоках и уменьшалось при возникновении потребности в освобождении ресурсов.

  • Создавать подобный список потоков самостоятельно не понадобится. Для управления таким списком предусмотрен класс ThreadPool, который по мере необходимости уменьшает и увеличивает количество потоков в пуле до максимально допустимого. Значение максимально допустимого количества потоков в пуле может изменяться. В случае двуядерного ЦП оно по умолчанию составляет 1023 рабочих потоков и 1000 потоков ввода-вывода.

  • Можно указывать минимальное количество потоков, которые должны запускаться сразу после создания пула, и максимальное количество потоков, доступных в пуле. Если остались какие-то подлежащие обработке задания, а максимальное количество потоков в пуле уже достигнуто, то более новые задания будут помещаться в очередь и там ожидать, пока какой-то из потоков завершит свою работу.

  • Чтобы запросить поток из пула для обработки вызова метода, можно использовать метод QueueUserWorkItem(). Этот метод перегружен, чтобы в дополнение к экземпляру делегата WaitCallback позволить указывать необязательный параметр System.Object для специальных данных состояния.

  • Ниже приведен пример приложения, в котором сначала читается и выводится на консоль информация о максимальном количестве рабочих потоков и потоков ввода-вывода. Затем в цикле for метод JobForAThread() назначается потоку из пула потоков за счет вызова метода ThreadPool.QueueUserWorkltem() и передачи делегата типа WaitCallback. Пул потоков получает этот запрос и выбирает из пула один из потоков для вызова метода. Если пул еще не существует, он создается и запускается первый поток. Если же пул существует и в нем имеется один свободный поток, задание переадресовывается этому потоку:

using System;
using System.Threading;

namespace ConsoleApplication1
{
    class Program
    {
        static void Main()
        {
            int nWorkerThreads;
            int nCompletionThreads;
            ThreadPool.GetMaxThreads(out nWorkerThreads, out nCompletionThreads);
            Console.WriteLine("Максимальное количество потоков: " + nWorkerThreads
                + "\nПотоков ввода-вывода доступно: " + nCompletionThreads);
            for (int i = 0; i < 5; i++)
                ThreadPool.QueueUserWorkItem(JobForAThread);
            Thread.Sleep(3000);
            
            Console.ReadLine();
        }

        static void JobForAThread(object state)
        {
            for (int i = 0; i < 3; i++)
            {
                Console.WriteLine("цикл {0}, выполнение внутри потока из пула {1}",
                    i, Thread.CurrentThread.ManagedThreadId);
                Thread.Sleep(50);
            }
        }
    }
}

03. Введение в проблематику параллельного программирования.

  • Мультипрограммирование

    • Мультипрограммирование - параллельное выполнение нескольких программ. Мультипрограммирование позволяет уменьшить общее время их выполнения.
  • Параллельные вычисления

    • Под параллельными вычислениями понимается параллельное выполнение одной и той же программы. Параллельные вычисления позволяют уменьшить время выполнения одной программы.
  • Формы параллелизма

    • параллелизм на уровне битов
      • Эта форма параллелизма основана на увеличении размера машинного слова. Увеличение размера машинного слова уменьшает количество операций, необходимых процессору для выполнения действий над переменными, чей размер превышает размер машинного слова.
    • параллелизм на уровне инструкций
      • Компьютерная программа – это, по существу, поток инструкций, выполняемых процессором. Но можно изменить порядок этих инструкций, распределить их по группам, которые будут выполняться параллельно, без изменения результата работы всей программы.
    • параллелизм данных
      • Основная идея подхода, основанного на параллелизме данных, заключается в том, что одна операция выполняется сразу над всеми элементами массива данных.
      • Различные фрагменты такого массива обрабатываются на векторном процессоре или на разных процессорах параллельной машины.
      • Распределением данных между процессорами занимается программа. Векторизация или распараллеливание в этом случае чаще всего выполняется уже на этапе компиляции – перевода исходного текста программы в машинные команды.
      • Роль программиста в этом случае обычно сводится к заданию настроек векторной или параллельной оптимизации компилятору, директив параллельной компиляции, использованию специализированных языков для параллельных вычислений.
    • параллелизм задач (многопоточность)
      • Стиль программирования, основанный на параллелизме задач, подразумевает, что вычислительная задача разбивается на несколько относительно самостоятельных подзадач и каждый процессор загружается своей собственной подзадачей.

      • В курсе данной дисциплины мы будем рассматривать именно многопоточные вычисления.

      • Существуют всего две цели, которые преследуют программисты, используя потоки:

        • позволить приложению параллельно работать над несколькими относительно независимыми задачами;
        • использовать преимущества многопроцессорных систем для повышения производительности приложения

04. Процессы.

Процесс

  • Понятие процесса характеризует некоторую совокупность набора исполняющихся команд, ассоциированных с ним ресурсов (выделенная для исполнения память или адресное пространство, стеки, используемые файлы и устройства ввода-вывода и т. д.) и текущего момента его выполнения (значения регистров, программного счетчика, состояния стека и значения переменных), находящуюся под управлением операционной системы.

  • В операционных системах процесс рассматривается операционной системой как заявка на потребление всех видов ресурсов.

  • Не существует взаимнооднозначного соответствия между процессами и программами, обрабатываемыми вычислительными системами:

    • в некоторых ОС для работы программ может организовываться более одного процесса
    • один и тот же процесс может исполнять последовательно несколько различных программ
  • Процесс находится под управлением операционной системы, поэтому в нем может выполняться часть кода ее ядра – при использовании системных вызовов, обработке внешних прерываний.

  • С позиции данной абстрактной модели, у каждого процесса есть собственный виртуальный центральный процессор.

  • Для того чтобы процессы не могли вмешаться в распределение ресурсов, а также не могли повредить коды и данные друг друга, важнейшей задачей ОС является изоляция одного процесса от другого. Виртуальное адресное пространство процесса – это совокупность адресов, которыми может манипулировать программный модуль процесса. Операционная система отображает виртуальное адресное пространство процесса на отведенную процессу физическую память

image

image

  • Информацию, для хранения которой предназначен PCB, удобно разделить на две части. Содержимое всех регистров процесса, включая значение программного счетчика, называется регистровым контекстом процесса, а все остальное – системным контекстом процесса. Код и данные, находящиеся в адресном пространстве процесса, называются его пользовательским контекстом. Совокупность регистрового, системного и пользовательского контекстов принято называть просто контекстом процесса. В любой момент времени процесс полностью характеризуется своим контекстом.

image image

  • По завершении работы операционная система освобождает все ресурсы, ассоциированные с данным процессом, делая соответствующие записи в PCB. Сам PCB не уничтожается, а остается в системе еще некоторое время. Это связано с тем, что процесс-родитель после окончания работы дочернего процесса запросить операционную систему о причине смерти процесса.

image image

Потоки

  • С одной стороны, процесс можно рассматривать как способ группирования родственных ресурсов в одну группу. С другой стороны, процесс можно рассматривать как поток исполняемых команд, или просто поток.
  • Процессы используются для группирования ресурсов, а потоки являются объектами, поочередно выполняющимися на процессоре.
  • В многопоточном режиме поток обычно запускаются с одним потоком, который называют основным или главным потоком.

image image image image image image image

05. Ресурсы. Проблемы взаимодействия процессов.

  • Ресурс - это общий термин, описывающий объект (физическое устройство, область памяти), который может одновременно использоваться только одной задачей.
  • По своим характеристикам ресурсы разделяют на:
    • активные – способны изменить информацию (процессор)
    • пассивные – способны хранить информацию
    • локальные – принадлежат одному процессу; время жизни совпадает с временем жизни процесса
    • разделяемые – могут быть использованы несколькими процессами; существуют, пока есть хоть один процесс, который их использует
    • постоянные – используются посредством операций «захватить» и «освободить»
    • временные – используются посредством «создать» и «удалить».
  • Разделяемые ресурсы бывают:
    • не критичные – могут быть использованы одновременно несколькими процессами (например, жесткий диск)
    • критичные – могут быть использованы только одним процессом, и пока этот процесс не завершит работу с ресурсом, последний не доступен другим процессам (например, разделяемая память, доступная на запись).
  • По типу взаимодействия различают:
    • сотрудничающие процессы:
      • процессы, разделяющие только коммуникационный канал, по которому один передает данные, а другой получает их;
      • процессы, осуществляющие взаимную синхронизацию: когда работает один, другой ждет окончания его работы
    • конкурирующие процессы:
      • процессы, использующие совместно разделяемый ресурс;
      • процессы, использующие критические секции;
      • процессы, использующие взаимные исключения.

Синхронизация

  • Когда несколько модулей одной и той же программы запускаются на разных процессорах (потоках), то возникает необходимость в синхронизации их действий.
  • Проблемы взаимодействия процессов:
    • Состояние состязания
      • Ситуация, в которой два процесса считывают или записывают данные одновременно и конечный результат зависит от того, какой процесс был первым, называется состоянием состязания.
      • Основным способом предотвращения состояния состязания является запрет использования совместно используемых данных одновременно нескольким процессам.
    • Тупиковая ситуация
      • Часто процесс для выполнения задачи нуждается в исключительном доступе не к одному, а к нескольким ресурсам.
      • Предположим, что имеется два процесса A и B. Процесс A имеет в исключительном доступе ресурс R, процесс B – ресурс S.
      • Такая ситуация называется тупиком, тупиковой ситуацией или взаимоблокировкой.
  • Как избавиться от deadlock:
    • Каждая критическая секция захватывает все общие ресурсы.
      • Это означает, что вход в каждую критическую секцию закрывается одним ключом. Недостатком такого подхода является увеличение общего времени ожидания. Во многих ситуациях неразумно, когда все ресурсы принадлежат одному владельцу, а он не пользуется ими одновременно.
    • Если в критических секциях работа с ресурсами ведется последовательно, а не одновременно, то ресурс следует освобождать, как только работа с ним закончена.
      • Это общее правило работы с разделяемыми ресурсами. Оно не всегда работает, поскольку часто необходимы одновременно несколько ресурсов в каждой из критических секций.
    • Захват не возникает, если есть только одна критическая секция.
      • По сути, это также захват всех ресурсов, означающий, ожидание в очереди всех потоков, пока не отработает поток, вошедший в критическую секцию.
    • В основном потоке можно использовать форму ожидания, позволяющую ожидать завершения работы запущенного потока в течение времени, заданного параметром t.
      • Используя этот механизм, основной поток может корректно обработать возникшую ситуацию, возможно связанную с клинчем. В любом случае приложение не зависнет.
    • Применение мягких методов блокировки, когда блокируется только запись, но не чтение ресурса.
      • В то же время, если ресурс используется только для чтения, то возможно его одновременное использование.
  • Инверсия приоритетов
    • Представим, что у нас есть высокоприоритетная Задача A, среднеприоритетная Задача B и низкоприоритетная Задача C. Пусть в начальный момент времени Задачи A и B блокированы в ожидании какого-либо внешнего события. Допустим, получившая в результате этого управление Задача C захватила Ресурс A, но не успела его отдать, как была прервана Задачей A. В свою очередь, Задача A при попытке захватить Ресурс A будет блокирована, так как этот ресурс уже захвачен Задачей C.
    • Если к этому времени Задача B находится в состоянии готовности, то управление после этого получит именно она, как имеющая более высокий, чем у Задачи C, приоритет. Теперь Задача B может занимать процессорное время, пока ей не надоест, а мы получаем ситуацию, когда высокоприоритетная Задача A не может функционировать из-за того, что необходимый ей ресурс занят низкоприоритетной Задачей C.
  • Блокировка
    • Процесс ожидает ресурс, который никогда не освободится
  • Голодовка
    • Процесс монополизировал процессор.

Разделяемая память

  • Разделяемая память – два или более процесса могут иметь доступ к одному и тому же блоку памяти.
  • В системах с виртуальной памятью организация такого вида взаимодействия требует поддержки со стороны операционной системы, поскольку необходимо отобразить соответствующие блоки виртуальной памяти на один и тот же блок физической памяти.

Критические секции

  • Критическая секция (области) – это участок программы, в котором есть обращение к совместно используемым данным. На этом участке запрещается переключение задач для обеспечения исключительного использования ресурсов процессом. Говоря иными словами, необходимо взаимное исключение.
  • Объекты, представляющие критические секции, доступны в пределах одного процесса. Процедура входа и выхода из критических секций обычно занимает меньшее время, нежели аналогичные операции с мьютексами или семаформаи, что связано с отсутствием необходимости обращаться к ядру ОС.

Семафоры

  • Семафоры – два и более процесса имеют доступ к одной переменной, принимающей значение 0 или 1 (в случае булевского семафора), или значение от 0 до n, где n – целое неотрицательное число (в случае счетного семафора).
  • Сама переменная часто находится в области данных операционной системы и доступ к ней организуется с помощью специальных функций.
  • С каждым семафором связаны счетчик (значение семафора) и очередь ожидания (процессов, задач, ожидающих принятие счетчиком определенного значения).

Мьютексы

  • Часто используется упрощенная версия семафора, называемая мьютексом. Мьютекс не способен считать, он может лишь управлять взаимным исключением доступа к совместно используемым ресурсам или кодам.
  • Мьютекс фактически состоит из пары: булевского семафора и идентификатора задачи – текущего владельца семафора.
  • В операционных системах семейства Microsoft Windows разница между мьютексом и критической секцией в том, что мьютекс является объектом ядра и может быть использован несколькими процессами одновременно, критическая секция же принадлежит процессу и служит для синхронизации только его потоков.

Мониторы

  • Мониторы – примитивы синхронизации высокого уровня, представляющие собой набор процедур, переменных и других структур данных, объединенных в особый модуль или пакет.
  • Монитор при создании автоматически инициирует число ресурсов и включает процедуры, позволяющие блокировать и активизировать процессы. Вход в монитор находится под жестким контролем системы и только через монитор осуществляется взаимоисключение процессов. Если процесс обращается к монитору и требуемый ресурс занят, то процесс переводится в состояние ожидания. Со временем некоторый процесс обращается к монитору для возвращения ресурса и монитор оповещает процесс о том, что может выделить ресурс и покинуть очередь. Режимом ожидания управляет сам монитор, который для гарантии получения ресурса процессом повышает приоритеты процессов критических областей.

Сообщения

  • Сообщения. Этот вид межпроцессного взаимодействия использует механизмы явной передачи информации от одного процесса к другому (такие объекты, как семафоры, можно отнести к механизму неявной передачи информации). Объем информации, передаваемой в сообщении, может меняться от одного бита до всей свободной емкости памяти системы. Сообщение может содержать как сами данные, предназначенные для передачи, так и указатель на такие данные. В последнем случае обмен может производиться с помощью разделяемых областей памяти, разделяемых файлов и т.п. Для передачи сообщений используются два примитива: send и receive.(послать и принять сообщение).
  • Сообщения могут быть реализованы различным образом, в частности, с помощью сигналов или почтовых ящиков

Сигналы

Сигналы – это сообщения, доставляемые посредством операционной системы процессу. Процесс должен зарегистрировать обработчик этого сообщения у операционной системы, чтобы получить возможность реагировать на него. Часто операционная система извещает процесс сигналом о наступлении какого-либо сбоя, например, делении на 0, или о каком-либо аппаратном прерывании, например, прерывании таймера.

Почтовые ящики

Почтовые ящики – это очередь сообщений (обычно – тех или иных структур данных), которые помещаются в почтовый ящик процессами и/или операционной системой. Несколько процессов могут ждать поступления сообщения в почтовый ящик и активизироваться по поступлении сообщения. Требует поддержки со стороны операционной системы.

Барьеры

Барьеры – используются для организации взаимодействия групп процессов. Пусть, например, работа каждого процесса разбита на фазы и существует правило, что процесс не может перейти в следующую фазу, пока к этому не готовы все остальные процессы. Этого можно добиться, разместив в конце каждой фазы барьер. Когда процесс доходит до барьера, он блокируется, пока все процессы не достигнут барьера.

image

06. Примитивы синхронизации: критические секции.

  • В параллельных программах используются два основных типа синхронизации: взаимное исключение и условная синхронизация.

    • Взаимное исключение обеспечивает, чтобы критические секции операторов не выполнялись одновременно.
    • Условная синхронизация задерживает процесс до тех пор, пока не выполнится определенное условие
  • Критическая секция (области) (англ. critical section) – это участок программы, в котором есть обращение к совместно используемым данным. На этом участке запрещается переключение задач для обеспечения исключительного использования ресурсов процессом. Говоря иными словами, необходимо взаимное исключение.

image image

  • Каждая критическая секция является последовательностью операторов, имеющих доступ к некоторому разделяемому объекту. Предполагается, что процесс, вошедший в критическую секцию, обязательно когда-нибудь из нее выйдет. Таким образом, процесс может завершиться только вне критической секции.

image

Алгоритм Деккера

  • Алгоритм Деккера – первое известное корректное решение проблемы взаимного исключения в конкурентном программировании. Он позволяет двум потокам выполнения совместно использовать неразделяемый ресурс без возникновения конфликтов, используя только общую память для коммуникации.

image image image

Алгоритм Петерсона

image image image

Алгоритм пекарни Лемпорта

image image

Критические секции и мьютексы

  • Критическая секция – объект синхронизации потоков, позволяющий предотвратить одновременное выполнение некоторого набора операций (обычно связанных с доступом к данным) несколькими потоками.
  • Критическая секция выполняет те же задачи, что и мьютекс. Между мьютексом и критической секцией есть терминологические различия, так процедура, аналогичная захвату мьютекса, называется входом в критическую секцию (англ. enter), снятию блокировки мьютекса — выходом из критической секции (англ. leave).
  • Процедура входа и выхода из критических секций обычно занимает меньшее время, нежели аналогичные операции мьютекса, что связано с отсутствием необходимости обращаться к ядру ОС.
  • Прежде всего, следует отметить, что критические секции – это не объекты ядра операционной системы. Практически вся работа с критическими секциями происходит в создавшем их процессе. Из этого следует, что критические секции могут быть использованы только для синхронизации в пределах одного процесса.
  • В операционной системе Windows, в отличие от других механизмов синхронизации, критическая секция не является объектом ядра операционной системы. Она представляет собой структуру, содержащую несколько флагов и какой-то (не важно) объект ядра. Для проверки, занята она или нет, программа не переходит в режим ядра, а лишь проверяются флаги, что, естественно, работает быстрее, чем при использовании объектов ядра.

07. Примитивы синхронизации: семафоры.

  • Первым средством способным быть использованым для блокирования приостанавливаемых процессов, не потерявшим до сих пор актуальности, являются семафоры, предложенные Дейкстрой в 1965 году.
  • Семафор – это объект синхронизации, задающий количество пользователей (задач, процессов), имеющих одновременный доступ к некоторому ресурсу.
  • Различают:
    • двоичные (булевские) семафоры – это механизм взаимного исключения для защиты критичного разделяемого ресурса;
    • счетные семафоры – это механизм взаимного исключения для защиты ресурса, который может быть использован не более чем ограниченным фиксированным числом задач n.

image image image image image image

  • В операционной системе Windows семафоры являются объектами синхронизации, принадлежащими операционной системе. Следовательно, и операции над такими объектами могут быть выполнены только с помощью средств операционной системы.
  • Операциям взять и вернуть соответствуют операции WaitForSingleObject и ReleaseSemaphore.
  • В начале работы семафор должен быть создан, в конце – уничтожен.

08. Примитивы синхронизации: Мьютексы, Спинлоки.

Мьютексы

  • Мьютекс (mutex, сокращение от mutual exclusion - взаимное исключение) – объект ядра ОС, служащий для управления взаимным исключением доступа к совместно используемым ресурсам и кодам.

  • Мьютекс фактически состоит из пары:

    • булевского семафора
    • идентификатора задачи – текущего владельца семафора (т.е. той задачи, которая успешно выполнила функцию взять и стала владельцем разделяемого ресурса).
  • Мьютекс не способен считать, он может лишь управлять взаимным исключением доступа к совместно используемым ресурсам или кодам.

  • Освободить мьютекс может только его владелец.

  • Для доступа к объекту типа мьютекс определены три примитивные операции:

    • Lock(m) – блокировать мьютекс m. Если m уже заблокирован другой задачей, то эта операция переводит задачу в состояние ожидания разблокирования m.
    • Unlock(m) – разблокировать мьютекс m. Если m ожидается другой задачей, то она может быть активизирована, удалена из очереди ожидания и может вытеснить текущую задачу, если ее приоритет выше. Если вызвавшая эту операцию задача не является владельцем m, то операция не имеет никакого эффекта.
    • TryLock(m) – попробовать блокировать мьютекс m. Если m не блокирован, то эта операция эквивалентна Lock(m), иначе возвращается признак неудачи.
    • Как и для семафоров, эти операции являются неделимыми.
  • Поток может завладевать одним и тем же ресурсом несколько раз (в зависимости от параметров создания мьютекса), и при этом не будет блокироваться даже в тех случаях, когда уже владеет данным ресурсом. В конечном счете, поток должен освободить мьютекс столько раз, сколько он его захватывал.

  • Мьютекс, владевший которым поток завершился, не освободив его, называют покинутым (abandoned), и его дескриптор переходит в сигнальное состояние.

image image image image

Спинлок

  • Спинлок (англ. Spinlock – циклическая блокировка) – низкоуровневый примитив синхронизации, применяемый в многопроцессорных системах для реализации взаимного исключения.
  • Физически спинлок представляет собой переменную в памяти и реализуется на атомарных операциях, которые должны присутствовать в системе команд процессора. Каждый процессор, желающий получить доступ к разделяемому ресурсу, атомарно записывает условное значение «занято» в эту переменную, используя операцию CompareAnd-Swap (сравнить и записать). Если предыдущее значение переменной (возвращаемое командой) было «свободно» то считается, что данный процессор получил доступ к ресурсу, в противном случае, процессор возвращается к операции Compare-And-Swap и крутится в цикле ожидая, пока спинлок будет освобождён. После работы с разделяемым ресурсом процессор-владелец спинлока должен записать в него условное значение «свободно»
  • Compare-And-Swap А, В, С - атомарно сравнивает значение в памяти [A] с нужным значением B, и если всё верно, выставляет значение в памяти на число C.

09. Асинхронность в .NET

  • В .NET-фреймворке исторически сложилось несколько более старых паттернов организации асинхронного кода:

    • APM (IAsyncResult, они же коллбеки) (.NET 1.0).
    • EAP — события, этот паттерн все видели в WinForms (.NET 2.0).
    • TAP (Task Asynchronous Pattern) — класс Task и его экосистема (.NET 4.0).
  • Рекомендованный подход - TAP. В C# 5 он был дополнен механизмом async/await, помогающим избежать блокирующего исполнения кода, в более новых версиях языка появилось еще несколько полезных нововведений.

Какие проблемы связанные с блокировками возникают?

  • Существует два типа возможности занять поток:
    • CPU Bound — блокировка, когда поток занят непосредственно вычислениями. Здесь необходимо позаботиться о том, чтобы длинная операция не блокировала потоки пула потоков `.NET (ThreadPool), а работала отдельно и синхронизировала возврат результата.
    • IO Bound — блокировка, ожидание результата от устройств вводавывода — тут асинхронный подход имеет максимальный эффект, так как, по сути, ожидая результата потоки могут занятся другой работой
  • Async/Await идеально решает проблему IO Bound, с CPU Bound можно использовать средства параллельного программирования

image image image

  • Status – перечисление, с помощью которого можно отслеживать состояние задачи, в котором она находится. Возможно побитовое сложение флагов перечисления.
  • TaskCreationOptions – перечисление, с помощью которого можно задать дополнительные параметры для выполнения задачи. Возможно побитовое сложение флагов перечисления.
  • Методы ожиданияWait(), WaitAll(), WaitAny() позволяют подождать выполнения указанных вами задач.
  • Метод Task.Run() – добавлен в .NET Framework 4.5 и служит для быстрого создания горячих задач. Замена метода из фабрики задач StartNew().
  • Метод RunSynchronously() – выполняет метод, сообщенный с задачей синхронно. image

Способы создания задач

  • Фабрики запущенных задач. Run — более легкая версия метода StartNew с установленными дополнительными параметрами по умолчанию. Возвращает созданную и запущенную задачу. Самый популярный способ запуска задач. Оба метода вызывают скрытый от нас Task.InternalStartNew. Возвращают объект Task.
  • Фабрики завершенных задач. Иногда нужно вернуть результат задачи без необходимости создавать асинхронную операцию. Это может пригодиться в случае подмены результата операции на заглушку при юнит-тестировании или при возврате заранее известного/рассчитанного результата.

Как отменить задачу

  • За отмену задач отвечает класс CancellationTokenSource и порождаемый им CancellationToken.
  • Принцип работы:
    • Создается экземпляр CancellationTokenSource (cts).
    • cts.Token отправляется параметром в задачу (ассоциируется с ней).
    • При необходимости отмены задачи для экземпляра CancellationTokenSource вызывается метод Cancel().
    • Внутри кода задачи на токене вызывается метод ThrowIfCancellationRequested(), который выбрасывает исключение в случае, если в CancellationTokenSource произошла отмена. Если токен был ассоциирован с задачей при создании, исключение будет перехвачено, выполнение задачи остановлено (так как исключение), ей будет выставлен статус Cancelled. В противном случае задача перейдет в статус Faulted.
  • Конструктор CancellationTokenSource может принимать значение таймаута, после которого метод Cancel будет вызван автоматически.

Продолжения - Continuations

image image image

Как извлечь результат из задачи

  • До появления await извлекать результат из задач можно было такими блокирующими способами:
  • t.Result(); — возврат результата / выброс исключения AggregateException.
  • t.Wait(); — ожидание выполнения задачи, выброс исключения AggregateException.
  • t.GetAwaiter().GetResult(); — возврат результата / выброс оригинального исключения — служебный метод компилятора, поэтому использовать его не рекомендуется. Используется механизмом async/await.
  • После появления async/await рекомендованной техникой стал оператор await, производящий неблокирующее ожидание. То есть если await добрался до незавершенной задачи, выполнение кода в потоке будет прервано и продолжится только с завершением задачи.
  • await t; — возврат результата / выброс оригинального исключения.
  • Следует заметить, что для t.GetAwaiter().GetResult(); и await будет выброшено только первое исключение, аналогично манере поведения обычного синхронного кода.

Async

image image

Фабрика задач

  • Фабрика задач – механизм, позволяющий настроить набор сгруппированных задач, которые находятся в одном состоянии.
  • Классы для работы с фабрикой y TaskFactory и TaskFactory <TResult>.
  • Можно создать экземпляр класса TaskFactory и настроить его нужными параметрами для создания экземпляров класса Task с этими параметрами. Удобность применения TaskFactory состоит в отсутствии необходимости указания этих параметров при каждом создании экземпляров класса Task.

Значимая задача - ValueTask

  • ValueTask – – представляет собой обертку (wrapper) над обыкновенной задачей (Task).
  • Был создан для уменьшения потребления ресурсов управляемой кучи.
  • В некоторых случаях является сомнительной оптимизацией.

image

Ошибочное мнение - Task — это облегченный Thread

image image

10. Планировщик задач. Дочерние задачи.

Планировщик

  • Планировщик задач – – это механизм, который позволяет настроить выполнение задач указанным вами способом и методами.

    • Для работы с планировщиком в .NET используют класс TaskScheduler
  • Планировщик является базовым (абстрактным) классом.

  • Реализация конкретной логики работы планировщика полностью ложится на программистапользователя.

  • В библиотеке .NET есть стандартные варианты планировщиков. К примеру стандартный планировщик построенный на ThreadPool из статического свойства TaskScheduler. Default

  • Абстрактные методы:

    • QueueTask(Task task) – помещает переданную задачу в очередь выполнения.
    • GetScheduledTasks() – возвращает очередь задач в виде коллекции.
    • TryExecuteTaskInline(Task task, bool taskWasPreviouslyQueued) – запрашивает возможность выполнится синхронно.
  • Другие методы: :

    • TryExecuteTask(Task task) – попытка выполнить задачу.
    • TryDequeue(Task task) – попытка удалить задачу из очереди выполнения.
    • FromCurrentSynchronizationContext() – создает планировщик, связанный с текущем элементом SyncrhonizationContext.
  • Свойства:

    • Default – стандартный планировщик, построенный на пуле потоков.
    • Current – выдает текущего планировщика.
    • MaximumConcurrencyLevel – максимальный уровень параллелизма.
    • Id – идентификатор планировщика

Дочерние задачи (Child Tasks)

  • Дочерние задания – создание задач и их дальнейшее прикрепление к другой задачи, которую будут считать родителем. Другими словами – это способ настроить связь Родитель-Потомок.
  • Задача может иметь любое количество дочерних задач.
  • Несколько дочерних задач могут иметь общего родителя. Пока дочерние задачи полностью не выполнятся, родитель не вернет результат.
  • Для присоединения к родительской задачи дочерней, нужно передать флаг перечисления TaskCreationOptions.AttachedToParent
  • Можно запретить присоединять к задачи дочерние. При создании передать флаг перечисления TaskCreationOptions.DenyChildAttach

Вложенные задачи (Nested Tasks)

  • Вложенные и задачи – создание задач в теле другой задачи, которые выполняются независимо от родительского объекта.
  • Родительская задача может иметь любое количество вложенных задач. Но, родительская задача абсолютно не зависит от работы вложенных в нее задач и может выполнится гораздо раньше, чем вложенные.

image image

Ключевые слова async await Техническая реализация:

  • Ключевое слово async – является модификатором для методов. Указывает, что метод является асинхронным. Модификатор async позволяет использовать в асинхронном методе ключевое слово await и указывает компилятору на необходимость создания конечного автомата для обеспечения работы асинхронного метода.
  • Ключевое слово await – является унарным оператором, операнд которого располагается справа от самого оператора. Применение оператора await означает, что необходимо дождаться завершения выполнения асинхронной операции. При этом, если ожидание будет произведено, то вызывающий поток будет освобожден для своих дальнейших действий, а код, находящейся после оператора `await, по завершению асинхронной операции будет выполнен в виде продолжения.

image image image image image image image

Конечный автомат

  • Конечный автомат (Finite-state machine) – это модель вычислений, которая позволяет объекту изменить свое поведение в зависимости от своего внутреннего состояния. Поведение объекта изменяется настолько, что создается впечатление, что изменился класс объекта.
  • В один момент времени может быть активно только одно состояние. По завершению выполнения действия, конечный автомат меняет свое внутреннее состояние.

image

  • Работу ключевых слов async await обслуживает конечный автомат. Компилятор, с помощью интерфейса IAsyncStateMachine и специальных строителей, создает конечный автомат, который обслуживает асинхронный метод «под капотом».
  • Сам асинхронный метод превращается в метод-заглушку, который будет использовать созданный конечный автомат.

image

  • Задача-марионетка – это обыкновенная задача, жизненным циклом которой управляем мы с вами. Результат выполнения задачи может быть указан позже, не в момент создания задачи-марионетки.
  • Результат задачи-марионетки указываем мы. Мы можем как отдать ей результат (означает успешное выполнение), так и пробросить исключение (означает провальное выполнение).

image

  • Каждый строитель для повышения производительности представляет из себя структуру. Они оптимизированы под работу с async-методами.
  • У строителей есть весь необходимый функционал для создания задачимарионетки. Имеются в виду методы, с помощью которых можно указать результат (SetResult), исключение (SetException) или вызвать ожидание с регистрацией продолжения для асинхронной задачи (AwaitOnCompleted/AwaitUnsafeOnCompleted).
  • Строители асинхронных методов лучше не использовать напрямую. Они созданы для использования компилятором. Для создания задач-марионеток пользователями существует открытый API в виде класса TaskCompletionSource.

image image image image image image image image image

Ключевые слова async await

image

Результат асинхронной операции

image image

Выполнение

image

C#

Thread

  • Одним из ключевых аспектов в современном программировании является многопоточность. Ключевым понятием при работе с многоопоточностью является поток. Поток предствляет некоторую часть кода программы. При выполнении программы каждому потоку выделяется определенный квант времени. И при помощи многопоточности мы можем выделить в приложении несколько потоков, которые будут выполнять различные задачи одновременно. Если у нас, допустим, графическое приложение, которое посылает запрос к какому-нибудь серверу или считывает и обрабатывает огромный файл, то без многопоточности у нас бы блокировался графический интерфейс на время выполнения задачи. А благодаря потокам мы можем выделить отправку запроса или любую другую задачу, которая может долго обрабатываться, в отдельный поток. Поэтому, к примеру, клиент-серверные приложения (и не только они) практически не мыслимы без многопоточности.

  • Основной функционал для использования потоков в приложении сосредоточен в пространстве имен System.Threading. В нем определен класс, представляющий отдельный поток - класс Thread.

  • Язык C# позволяет запускать и выполнять в рамках приложения несколько потоков, которые будут выполняться одновременно.

  • Для создания потока применяется один из конструкторов класса Thread:

    • Thread(ThreadStart): в качестве параметра принимает объект делегата ThreadStart, который представляет выполняемое в потоке действие
    • Thread(ThreadStart, Int32): в дополнение к делегату ThreadStart принимает числовое значение, которое устанавливает размер стека, выделяемого под данный поток
    • Thread(ParameterizedThreadStart): в качестве параметра принимает объект делегата ParameterizedThreadStart, который представляет выполняемое в потоке действие
    • Thread(ParameterizedThreadStart, Int32): вместе с делегатом ParameterizedThreadStart принимает числовое значение, которое устанавливает размер стека для данного потока
using System.Threading;
 
// создаем новый поток
Thread myThread1 = new Thread(Print); 
Thread myThread2 = new Thread(new ThreadStart(Print));
Thread myThread3 = new Thread(()=>Console.WriteLine("Hello Threads"));
 
myThread1.Start();  // запускаем поток myThread1
myThread2.Start();  // запускаем поток myThread2
myThread3.Start();  // запускаем поток myThread3
 
void Print() => Console.WriteLine("Hello Threads");

Потоки с параметрами и ParameterizedThreadStart

using System.Threading;
 
// создаем новые потоки
Thread myThread1 = new Thread(new ParameterizedThreadStart(Print));
Thread myThread2 = new Thread(Print);
Thread myThread3 = new Thread(message => Console.WriteLine(message));
 
// запускаем потоки
myThread1.Start("Hello");
myThread2.Start("Привет");
myThread3.Start("Salut");
 
 
void Print(object? message) => Console.WriteLine(message);
using System.Threading;
 
int number = 4;
// создаем новый поток
Thread myThread = new Thread(Print);
myThread.Start(number);    // n * n = 16
 
 
// действия, выполняемые во втором потокке
void Print(object? obj)
{
    // здесь мы ожидаем получить число
    if (obj is int n)
    {
        Console.WriteLine($"n * n = {n * n}");
    }
}
  • Но что делать, если нам надо передать не один, а несколько параметров различного типа? В этом случае можно определить свои типы:
using System.Threading;
 
Person tom = new Person("Tom", 37);
// создаем новый поток
Thread myThread = new Thread(Print);
myThread.Start(tom);
 
void Print(object? obj)
{
    // здесь мы ожидаем получить объект Person
    if (obj is Person person)
    {
        Console.WriteLine($"Name = {person.Name}");
        Console.WriteLine($"Age = {person.Age}");
    }
}
 
record class Person(string Name, int Age);
  • Но тут опять же есть одно ограничение: метод Thread.Start не является типобезопасным, то есть мы можем передать в него любой тип, и потом нам придется приводить переданный объект к нужному нам типу. Для решения данной проблемы рекомендуется объявлять все используемые методы и переменные в специальном классе, а в основной программе запускать поток через ThreadStart. Например:
using System.Threading;
 
Person tom = new Person("Tom", 37);
// создаем новый поток
Thread myThread = new Thread(tom.Print);
myThread.Start();
 
record class Person(string Name, int Age)
{
    public void Print()
    {
        Console.WriteLine($"Name = {Name}");
        Console.WriteLine($"Age = {Age}");
    }
}

Семафоры. C#

  • Семафоры позволяют ограничить количество потоков, которые имеют доступ к определенным ресурсам. В .NET семафоры представлены классом Semaphore.

  • Для создания семафора применяется один из конструкторов класса Semaphore:

    • Semaphore (int initialCount, int maximumCount) параметр initialCount задает начальное количество потоков, а maximumCount - максимальное количество потоков, которые имеют доступ к общим ресурсам
      • initialCount - изначальное доступное колличество свободных мест.
      • maximumCount - вместимость.
    • Semaphore (int initialCount, int maximumCount, string? name) в дополнение задает имя семафора
    • Semaphore (int initialCount, int maximumCount, string? name, out bool createdNew): последний параметр - createdNew при значении true указывает, что новый семафор был успешно создан. Если этот параметр равен false, то семафор с указанным именем уже существует
  • Для работы с потоками класс Semaphore имеет два основных метода:

    • WaitOne() ожидает получения свободного места в семафоре
    • Release() освобождает место в семафоре
  • В отличие от lock (Monitor) и Mutex, у Semaphore нет «владельца» — он не зависит от потока. Любой поток может вызвать Release на семафоре, тогда как с Mutex и блокировкой только поток, получивший блокировку, может ее освободить.

// запускаем пять потоков
for (int i = 1; i < 6; i++)
{
    Reader reader = new Reader(i);
}
class Reader
{
    // создаем семафор
    static Semaphore sem = new Semaphore(3, 3);
    Thread myThread;
    int count = 3;// счетчик чтения
 
    public Reader(int i)
    {
        myThread = new Thread(Read);
        myThread.Name = $"Читатель {i}";
        myThread.Start();
    }
 
    public void Read()
    {
        while (count > 0)
        {
            sem.WaitOne();  // ожидаем, когда освободиться место
 
            Console.WriteLine($"{Thread.CurrentThread.Name} входит в библиотеку");
 
            Console.WriteLine($"{Thread.CurrentThread.Name} читает");
            Thread.Sleep(1000);
 
            Console.WriteLine($"{Thread.CurrentThread.Name} покидает библиотеку");
 
            sem.Release();  // освобождаем место
 
            count--;
            Thread.Sleep(1000);
        }
    }
}

Мьютексы. C#

  • Еще один инструмент управления синхронизацией потоков представляет класс Mutex или мьютекс, который также располагается в пространстве имен System.Threading.
int x = 0;
Mutex mutexObj = new Mutex();
 
// запускаем пять потоков
for (int i = 1; i < 6; i++)
{
    Thread myThread = new(Print);
    myThread.Name = $"Поток {i}";
    myThread.Start(); 
}
 
void Print()
{
    mutexObj.WaitOne();     // приостанавливаем поток до получения мьютекса
    x = 1;
    for (int i = 1; i < 6; i++)
    {
        Console.WriteLine($"{Thread.CurrentThread.Name}: {x}");
        x++;
        Thread.Sleep(100);
    }
    mutexObj.ReleaseMutex();    // освобождаем мьютекс
}
  • Основную работу по синхронизации выполняют методы WaitOne() и ReleaseMutex(). Метод mutexObj.WaitOne() приостанавливает выполнение потока до тех пор, пока не будет получен мьютекс mutexObj.
  • Изначально мьютекс свободен, поэтому его получает один из потоков.
  • После выполнения всех действий, когда мьютекс больше не нужен, поток освобождает его с помощью метода mutexObj.ReleaseMutex(). А мьютекс получает один из ожидающих потоков.
  • Таким образом, когда выполнение дойдет до вызова mutexObj.WaitOne(), поток будет ожидать, пока не освободится мьютекс. И после его получения продолжит выполнять свою работу.

Мониторы. C#

  • Наряду с оператором lock для синхронизации потоков мы можем использовать мониторы, представленные классом System.Threading.Monitor. Для управления синхронизацией этот класс предоставляет следующите методы:

    • void Enter(object obj): получает в экслюзивное владение объект, передаваемый в качестве параметра.
    • void Enter(object obj, bool acquiredLock): дополнительно принимает второй параметра - логическое значение, которое указывает, получено ли владение над объектом из первого параметра
    • void Exit(object obj): освобождает ранее захваченный объект
    • bool IsEntered(object obj): возвращает true, если монитор захватил объект obj
    • void Pulse (object obj): уведомляет поток из очереди ожидания, что текущий поток освободил объект obj
    • void PulseAll(object obj): уведомляет все потоки из очереди ожидания, что текущий поток освободил объект obj. После чего один из потоков из очереди ожидания захватывает объект obj.
    • bool TryEnter (object obj): пытается захватить объект obj. Если владение над объектом успешно получено, то возвращается значение true
    • bool Wait (object obj): освобождает блокировку объекта и переводит поток в очередь ожидания объекта. Следующий поток в очереди готовности объекта блокирует данный объект. А все потоки, которые вызвали метод Wait, остаются в очереди ожидания, пока не получат сигнала от метода Monitor.Pulse или Monitor.PulseAll, посланного владельцем блокировки.
  • Стоит отметить, что фактически конструкция оператора lock инкапсулирует в себе синтаксис использования мониторов. Например, в прошлой теме для синхронизации потоков применялся оператор lock:

int x = 0;
object locker = new();  // объект-заглушка
// запускаем пять потоков
for (int i = 1; i < 6; i++)
{
    Thread myThread = new(Print);
    myThread.Name = $"Поток {i}";
    myThread.Start();
}
 
 
void Print()
{
    lock (locker)
    {
        x = 1;
        for (int i = 1; i < 6; i++)
        {
            Console.WriteLine($"{Thread.CurrentThread.Name}: {x}");
            x++;
            Thread.Sleep(100);
        }
    }
}
  • Фактически данный пример будет эквивалентен следующему коду:
int x = 0;
object locker = new();  // объект-заглушка
// запускаем пять потоков
for (int i = 1; i < 6; i++)
{
    Thread myThread = new(Print);
    myThread.Name = $"Поток {i}";
    myThread.Start();
}
 
void Print()
{
    bool acquiredLock = false;
    try
    {
        Monitor.Enter(locker, ref acquiredLock);
        x = 1;
        for (int i = 1; i < 6; i++)
        {
            Console.WriteLine($"{Thread.CurrentThread.Name}: {x}");
            x++;
            Thread.Sleep(100);
        }
    }
    finally
    {
        if (acquiredLock) Monitor.Exit(locker);
    }
}
  • Метод Monitor.Enter принимает два параметра - объект блокировки и значение типа bool, которое указывает на результат блокировки (если он равен true, то блокировка успешно выполнена). Фактически этот метод блокирует объект locker так же, как это делает оператор lock. А в блоке try...finally с помощью метода Monitor.Exit происходит освобождение объекта locker, если блокировка осуществлена успешно, и он становится доступным для других потоков.

Задачи и класс Task.

  • В эпоху многоядерных машин, которые позволяют параллельно выполнять сразу несколько процессов, стандартных средств работы с потоками в .NET уже оказалось недостаточно. Поэтому во фреймворк .NET была добавлена библиотека параллельных задач TPL (Task Parallel Library), основной функционал которой располагается в пространстве имен System.Threading.Tasks. Данная библиотека упрощает работу с многопроцессорными, многоядерными система. Кроме того, она упрощает работу по созданию новых потоков. Поэтому обычно рекомендуется использовать именно TPL и ее классы для создания многопоточных приложений, хотя стандартные средства и класс Thread по-прежнему находят широкое применение.

  • В основе библиотеки TPL лежит концепция задач, каждая из которых описывает отдельную продолжительную операцию. В библиотеке классов .NET задача представлена специальным классом - классом Task, который находится в пространстве имен System.Threading.Tasks. Данный класс описывает отдельную задачу, которая запускается асинхронно в одном из потоков из пула потоков. Хотя ее также можно запускать синхронно в текущем потоке.

  • Для определения и запуска задачи можно использовать различные способы.

Task task1 = new Task(() => Console.WriteLine("Task1 is executed"));
task1.Start();
 
Task task2 = Task.Factory.StartNew(() => Console.WriteLine("Task2 is executed"));
 
Task task3 = Task.Run(() => Console.WriteLine("Task3 is executed"));
  • Ожидание завершения задачи
Task task1 = new Task(() => Console.WriteLine("Task1 is executed"));
task1.Start();
 
Task task2 = Task.Factory.StartNew(() => Console.WriteLine("Task2 is executed"));
 
Task task3 = Task.Run(() => Console.WriteLine("Task3 is executed"));
 
task1.Wait();   // ожидаем завершения задачи task1
task2.Wait();   // ожидаем завершения задачи task2
task3.Wait();   // ожидаем завершения задачи task3
  • Стоит отметить, что метод Wait() блокирует вызывающий поток, в котором запущена задача, пока эта задача не завершит свое выполнение.

Синхронный запуск задачи

Console.WriteLine("Main Starts");
// создаем задачу
Task task1 = new Task(() =>
{
    Console.WriteLine("Task Starts");
    Thread.Sleep(1000); 
    Console.WriteLine("Task Ends");
 });
task1.RunSynchronously(); // запускаем задачу синхронно
Console.WriteLine("Main Ends"); // этот вызов ждет завершения задачи task1 

Вложенные задачи

  • Одна задача может запускать другую - вложенную задачу. При этом эти задачи выполняются независимо друг от друга.
var outer = Task.Factory.StartNew(() =>      // внешняя задача
{
    Console.WriteLine("Outer task starting...");
    var inner = Task.Factory.StartNew(() =>  // вложенная задача
    {
        Console.WriteLine("Inner task starting...");
        Thread.Sleep(2000);
        Console.WriteLine("Inner task finished.");
    });
});
outer.Wait(); // ожидаем выполнения внешней задачи
Console.WriteLine("End of Main");
  • Несмотря на то, что здесь мы ожидаем выполнения внешней задачи, но вложенная задача может завершить выполнение даже после завершения метода Main
  • Если необходимо, чтобы вложенная задача выполнялась как часть внешней, необходимо использовать значение TaskCreationOptions.AttachedToParent:
var outer = Task.Factory.StartNew(() =>      // внешняя задача
{
    Console.WriteLine("Outer task starting...");
    var inner = Task.Factory.StartNew(() =>  // вложенная задача
    {
        Console.WriteLine("Inner task starting...");
        Thread.Sleep(2000);
        Console.WriteLine("Inner task finished.");
    }, TaskCreationOptions.AttachedToParent);
});
outer.Wait(); // ожидаем выполнения внешней задачи
Console.WriteLine("End of Main");
Outer task starting...
Inner task starting...
Inner task finished.
End of Main
  • В данном случае вложенная задача прикреплена к внешней и выполняется как часть внешней задачи. И внешняя задача завершится только когда завершатся все прикрепленные к ней вложенные задачи.

Массив задач

  • Также как и с потоками, мы можем создать и запустить массив задач. Можно определить все задачи в массиве непосредственно через объект Task:
Task[] tasks1 = new Task[3]
{
    new Task(() => Console.WriteLine("First Task")),
    new Task(() => Console.WriteLine("Second Task")),
    new Task(() => Console.WriteLine("Third Task"))
};
// запуск задач в массиве
foreach (var t in tasks1)
    t.Start();
  • Если необходимо завершить выполнение программы или вообще выполнять некоторый код лишь после того, как все задачи из массива завершатся, то применяется метод Task.WaitAll(tasks):
Task[] tasks = new Task[3];
for(var i = 0; i < tasks.Length; i++)
{
    tasks[i] = new Task(() =>
    {
        Thread.Sleep(1000); // эмуляция долгой работы
        Console.WriteLine($"Task{i} finished");
    });
    tasks[i].Start();   // запускаем задачу
}
Console.WriteLine("Завершение метода Main");
 
Task.WaitAll(tasks); // ожидаем завершения всех задач

Возвращение результатов из задач

  • Задачи могут не только выполняться как процедуры, но и возвращать определенные результаты:
int n1 = 4, n2 = 5;
Task<int> sumTask = new Task<int>(() => Sum(n1, n2));
sumTask.Start();
 
int result = sumTask.Result;
Console.WriteLine($"{n1} + {n2} = {result}"); // 4 + 5 = 9
 
int Sum(int a, int b) => a + b;
  • Во-первых, чтобы получать из задачи не который результат, необходимо типизировать объект Task тем типом, объект которого мы хотим получить из задачи. Например, в примере выше мы ожидаем из задачи sumTask получить число типа int, соответственно типизируем объект Task данным типом - Task<int>.
  • И, во-вторых, в качестве задачи должен выполняться метод, который возвращает данный тип объекта. Так, в данном случае у нас в качестве задачи выполняется метод Sum, которая принимаетдва числа и на выходе возвращает их сумму - значение типа int.
  • Возвращаемое число будет храниться в свойстве Result: sumTask.Result. Нам не надо его приводить к типу int, оно уже само по себе будет представлять число.

Задачи продолжения

  • Задачи продолжения или continuation task позволяют определить задачи, которые выполняются после завершения других задач. Благодаря этому мы можем вызвать после выполнения одной задачи несколько других, определить условия их вызова, передать из предыдущей задачи в следующую некоторые данные.
  • Задачи продолжения похожи на методы обратного вызова, но фактически являются обычными задачами Task. Посмотрим на примере:
Task task1 = new Task(() =>
{
    Console.WriteLine($"Id задачи: {Task.CurrentId}");
});
 
// задача продолжения - task2 выполняется после task1
Task task2 = task1.ContinueWith(PrintTask);
 
task1.Start();
 
// ждем окончания второй задачи
task2.Wait();
Console.WriteLine("Конец метода Main");
 
  
void PrintTask(Task t)
{
    Console.WriteLine($"Id задачи: {Task.CurrentId}");
    Console.WriteLine($"Id предыдущей задачи: {t.Id}");
    Thread.Sleep(3000);
}
  • Первая задача задается с помощью лямбда-выражения, которое просто выводит id этой задачи. Вторая задача - задача продолжения задается с помощью метода ContinueWith, который в качестве параметра принимает делегат Action<Task>. То есть метод PrintTask, который передается в вызов ContinueWith, должен принимать параметр типа Task.
  • Благодаря передачи в метод параметра Task, мы можем получить различные свойства предыдущей задачи, как например, в данном случае получает ее Id.
  • И после завершения задачи task1 сразу будет вызываться задача task2. Консольный вывод программы:
Id задачи: 1
Id задачи: 2
Id предыдущей задачи: 1
Конец метода Main
  • Также мы можем передавать конкретный результат работы предыдущей задачи:
Task<int> sumTask = new Task<int>(() => Sum(4, 5));
 
// задача продолжения
Task printTask = sumTask.ContinueWith(task => PrintResult(task.Result));
 
sumTask.Start();
 
// ждем окончания второй задачи
printTask.Wait();
Console.WriteLine("Конец метода Main");
 
int Sum(int a, int b) => a + b;
void PrintResult(int sum) => Console.WriteLine($"Sum: {sum}");
Task task1 = new Task(() => Console.WriteLine($"Current Task: {Task.CurrentId}"));
 
// задача продолжения
Task task2 = task1.ContinueWith(t =>
    Console.WriteLine($"Current Task: {Task.CurrentId}  Previous Task: {t.Id}"));
 
Task task3 = task2.ContinueWith(t =>
    Console.WriteLine($"Current Task: {Task.CurrentId}  Previous Task: {t.Id}"));
 
 
Task task4 = task3.ContinueWith(t =>
    Console.WriteLine($"Current Task: {Task.CurrentId}  Previous Task: {t.Id}"));
 
task1.Start();
 
task4.Wait();   //  ждем завершения последней задачи
Console.WriteLine("Конец метода Main");
Current Task: 1
Current Task: 2  Previous Task: 1
Current Task: 3  Previous Task: 2
Current Task: 4  Previous Task: 3
Конец метода Main``

Parallel

  • Класс Parallel также является частью TPL и предназначен для упрощения параллельного выполнения кода. Parallel имеет ряд методов, которые позволяют распараллелить задачу.
 / метод Parallel.Invoke выполняет три метода
 Parallel.Invoke(
     Print,
     () =>
     {
         Console.WriteLine($"Выполняется задача {Task.CurrentId}");
         Thread.Sleep(3000);
     },
     () => Square(5)
 );

 void Print()
 {
     Console.WriteLine($"Выполняется задача {Task.CurrentId}");
     Thread.Sleep(3000);
 }
 // вычисляем квадрат числа
 void Square(int n)
 {
     Console.WriteLine($"Выполняется задача {Task.CurrentId}");
     Thread.Sleep(3000);
     Console.WriteLine($"Результат {n * n}");
 }

Parallel.For

Parallel.For(1, 5, Square);
 
// вычисляем квадрат числа
void Square(int n)
{
    Console.WriteLine($"Выполняется задача {Task.CurrentId}");
    Console.WriteLine($"Квадрат числа {n} равен {n * n}");
    Thread.Sleep(2000);
}

Parallel.ForEach

ParallelLoopResult result = Parallel.ForEach<int>(
       new List<int>() { 1, 3, 5, 8 },
       Square
);
 
// вычисляем квадрат числа
void Square(int n)
{
    Console.WriteLine($"Выполняется задача {Task.CurrentId}");
    Console.WriteLine($"Квадрат числа {n} равен {n * n}");
    Thread.Sleep(2000);
}
Выполняется задача 1
Выполняется задача 3
Квадрат числа 8 равен 64
Выполняется задача 4
Квадрат числа 3 равен 9
Выполняется задача 2
Квадрат числа 5 равен 25
Квадрат числа 1 равен 1
  • Выход из цикла
ParallelLoopResult result = Parallel.For(1, 10, Square);
if (!result.IsCompleted)
    Console.WriteLine($"Выполнение цикла завершено на итерации {result.LowestBreakIteration}");
 
// вычисляем квадрат числа
void Square(int n, ParallelLoopState pls)
{
    if (n == 5) pls.Break();    // если передано 5, выходим из цикла
 
    Console.WriteLine($"Квадрат числа {n} равен {n * n}");
    Thread.Sleep(2000);
}

Отмена задач и параллельных операций. CancellationToken

  • Параллельное выполнение задач может занимать много времени. И иногда может возникнуть необходимость прервать выполняемую задачу. Для этого платформа .NET предоставляет структуру CancellationToken из пространства имен System.Threading.
  • Общий алгоритм отмены задачи обычно предусматривает следующий порядок действий:
    • Создание объекта CancellationTokenSource, который управляет и посылает уведомление об отмене токену.

    • С помощью свойства CancellationTokenSource.Token получаем собственно токен - объект структуры CancellationToken и передаем его в задачу, которая может быть отменена. Определяем в задаче действия на случай ее отмены.

    • Вызываем метод CancellationTokenSource.Cancel(), который устанавливает для свойства CancellationToken.IsCancellationRequested значение true. Стоит понимать, что сам по себе метод CancellationTokenSource.Cancel() не отменяет задачу, он лишь посылает уведомление об отмене через установку свойства CancellationToken.IsCancellationRequested. Каким образом будет происходить выход из задачи, это решает сам разработчик.

    • Класс CancellationTokenSource реализует интерфейс IDisposable. И когда работа с объектом CancellationTokenSource завершена, у него следует вызвать метод Dispose для освобождения всех связанных с ним используемых ресурсов. (Вместо явного вызова метода Dispose можно использовать конструкцию using).

  • Теперь касательно третьего пункта - определения действий отмены задачи. Как именно завершить задачу? Конкретные действия на лежат целиком на разработчике, тем не менее есть два общих варианта выхода:
    CancellationTokenSource cancelTokenSource = new CancellationTokenSource(); 
    CancellationToken token = cancelTokenSource.Token;
    
    CancellationTokenSource cancelTokenSource = new CancellationTokenSource(); 
    CancellationToken token = cancelTokenSource.Token;
    Task task = new Task(() => { выполняемые_действия}, token); //
    
  • При получении сигнала отмены выйти из метода задачи, например, с помощью оператора return или построив логику метода соответствующим образом. Но следует учитывать, что в этом случае задача перейдет в состояние TaskStatus.RanToCompletion, а не в состояние TaskStatus.Canceled.
    • При получении сигнала отмены сгенерировать исключение OperationCanceledException, вызвав у токена метод ThrowIfCancellationRequested(). После этого задача перейдет в состояние TaskStatus.Canceled.
    • При получении сигнала отмены сгенерировать исключение OperationCanceledException, вызвав у токена метод ThrowIfCancellationRequested(). После этого задача перейдет в состояние TaskStatus.Canceled.
CancellationTokenSource cancelTokenSource = new CancellationTokenSource();
CancellationToken token = cancelTokenSource.Token;
 
// задача вычисляет квадраты чисел
Task task = new Task(() =>
{
    for (int i = 1; i < 10; i++)
    {
        if (token.IsCancellationRequested)  // проверяем наличие сигнала отмены задачи
        {
            Console.WriteLine("Операция прервана");
            return;     //  выходим из метода и тем самым завершаем задачу
        }
        Console.WriteLine($"Квадрат числа {i} равен {i * i}");
        Thread.Sleep(200);
    }
}, token);
task.Start();
 
Thread.Sleep(1000);
// после задержки по времени отменяем выполнение задачи
cancelTokenSource.Cancel();
// ожидаем завершения задачи
Thread.Sleep(1000);
//  проверяем статус задачи
Console.WriteLine($"Task Status: {task.Status}");
cancelTokenSource.Dispose(); // освобождаем ресурсы

Aсинхронное программирование

  • Асинхронность позволяет вынести отдельные задачи из основного потока в специальные асинхронные методы и при этом более экономно использовать потоки. Асинхронные методы выполняются в отдельных потоках. Однако при выполнении продолжительной операции поток асинхронного метода возвратится в пул потоков и будет использоваться для других задач. А когда продолжительная операция завершит свое выполнение, для асинхронного метода опять выделяется поток из пула потоков, и асинхронный метод продолжает свою работу.

  • Ключевыми для работы с асинхронными вызовами в C# являются два оператора: async и await, цель которых - упростить написание асинхронного кода. Они используются вместе для создания асинхронного метода.

PrintAsync();

Console.WriteLine($"Продолжаем работу в методе Main [{Thread.CurrentThread.ManagedThreadId}]"); //

Thread.Sleep(5000);

Console.WriteLine($"Завершаем работу Main [{Thread.CurrentThread.ManagedThreadId}]");

void Print()
{
    Thread.Sleep(3000);
    Console.WriteLine($"Hello! [{Thread.CurrentThread.ManagedThreadId}]");
}

async void PrintAsync()
{
    Console.WriteLine($"Начало работы PrintAsync [{Thread.CurrentThread.ManagedThreadId}]");
    await Task.Run(() => Print());
    Console.WriteLine($"Конец метода PrintAsync [{Thread.CurrentThread.ManagedThreadId}]");
}
  • Поток Main вызывает в своём метод PrintAsync.
  • Выводит сообщение о начале работы в методе PrintAsync и пишет свой Id (допустим [1]).
  • Натыкается на await и запускает Task в другом потоке (допустим[2]). И самое главное оставляет метод PrintAsync ждать (await) завершения Task'a в том же потоке, что и сам Task - [2]. А сам возвращается к выполнению метода Main.
  • Так как в этом примере await не стоит перед вызовом PrintAsync, то поток[1] продолжает работу в Main, выводит сообщение о продолжении работы и засыпает на 5 секунд.
  • В это время Task завершает свою работу, выводит "Hello!". Пишет Id потока в котором он работает - [2].
  • Сразу после этого метод PrintAsync продолжает свою работу в потоке [2]. Хотя запускался изначально в потоке [1], но из за операции await был перемещён в поток [2].
  • Поток Main просыпается и выводит сообщение о завершении работы в потоке [1].
Начало работы PrintAsync [1]
Продолжаем работу в методе Main [1]
Hello! [2]
Конец метода PrintAsync [2]
Завершаем работу Main [1]
  • Еще пример
var tomTask = PrintNameAsync("Tom");
var bobTask = PrintNameAsync("Bob");
var samTask = PrintNameAsync("Sam");
 
await tomTask;
await bobTask;
await samTask;
// определение асинхронного метода
async Task PrintNameAsync(string name)
{
    await Task.Delay(3000);     // имитация продолжительной работы
    Console.WriteLine(name);
}

Опеределение асинхронного лямбда-выражения

// асинхронное лямбда-выражение
Func<string, Task> printer = async (message) =>
{
    await Task.Delay(1000);
    Console.WriteLine(message);
};
 
await printer("Hello World");

Возвращение результата из асинхронного метода

  • void-методов следует избегать и следует использовать только там, где эти подобные методы представляют единственный возможный способ определения асинхронного метода. Прежде всего, мы не можем применить к подобным методам оператор await. Также потому что исключения в таких методах сложно обрабатывать, так как они не могут быть перехвачены вне метода. Кроме того, подобные void-методы сложно тестировать.

Task

await PrintAsync("Hello Metanit.com");
 
// определение асинхронного метода
async Task PrintAsync(string message)
{
    await Task.Delay(1000);     // имитация продолжительной работы
    Console.WriteLine(message);
}
  • Здесь формально метод PrintAsync не использует оператор return для возвращения результата. Однако если в асинхронном методе выполняется в выражении await асинхронная операция, то мы можем возвращать из метода объект Task.
  • Для ожидания завершения асинхронной задачи можно применить оператор await. Причем его необязательно использовать непосредственно при вызове задачи. Его можно применить лишь там, где нам нужно гарантировано получить результат задачи или удостовериться, что задача завершена.

Task<T>

  • Метод может возвращать некоторое значение. Тогда возвращаемое значение оборачивается в объект Task, а возвращаемым типом является Task<T>:
int n1 = await SquareAsync(5);
int n2 = await SquareAsync(6);
Console.WriteLine($"n1={n1}  n2={n2}"); // n1=25  n2=36
 
async Task<int> SquareAsync(int n)
{
    await Task.Delay(0);
    return n * n;
}

ValueTask<T>

  • Использование типа ValueTask<T> во многом аналогично применению Task<T> за исключением некоторых различий в работе с памятью, поскольку ValueTask - структура, которая содержит большее количество полей. Поэтому применение ValueTask вместо Task приводит к копированию большего количества данны и соответственно создает некоторые дополнительные издержки.
  • Преимуществом ValueTask перед Task является то, что данный тип позволяет избежать дополнительных выделений памяти в хипе. Например, иногда требуется синхронно возвратить некоторое значение. Так, возьмем следующий пример:
var result = await AddAsync(4, 5);
Console.WriteLine(result);
 
Task<int> AddAsync(int a, int b)
{
    return Task.FromResult(a + b);
}
var result = await AddAsync(4, 5);
Console.WriteLine(result);
 
ValueTask<int> AddAsync(int a, int b)
{
    return new ValueTask<int>(a + b);
}
  • При необходимости также можно преобразовать ValueTask в объект Task с помощью метода AsTask():
var getMessage = GetMessageAsync();
string message = await getMessage.AsTask();
Console.WriteLine(message); // Hello
 
async ValueTask<string> GetMessageAsync()
{
    await Task.Delay(0);
    return "Hello";
}

Последовательное и параллельное выполнение. Task.WhenAll и Task.WhenAny

  • .NET позволяет упростить отслеживание выполнения набора задач с помощью метода Task.WhenAll. Этот метод принимает набор асинхронных задач и ожидает завершения всех этих задач. Этот метод является аналогом статического метода Task.WaitAll(), однако предназначен непосредственно для асинхронных методов и позволяет применять оператор await:
// определяем и запускаем задачи
var task1 = PrintAsync("Hello C#");
var task2 = PrintAsync("Hello World");
var task3 = PrintAsync("Hello METANIT.COM");
 
// ожидаем завершения всех задач
await Task.WhenAll(task1, task2, task3);
 
async Task PrintAsync(string message)
{
    await Task.Delay(2000);     // имитация продолжительной операции
    Console.WriteLine(message);
}
  • Если нам надо дождаться, когда будет выполнена хотя бы одна задача из некоторого набора задач, то можно применять метод Task.WhenAny(). Это аналог метода Task.WaitAny() - он завершает выполнение, когда завершается хотя бы одна задача. Но для ожидания выполнения к Task.WhenAny()применяется оператор await:
// определяем и запускаем задачи
var task1 = PrintAsync("Hello C#");
var task2 = PrintAsync("Hello World");
var task3 = PrintAsync("Hello METANIT.COM");
 
// ожидаем завершения хотя бы одной задачи
await Task.WhenAny(task1, task2, task3);
 
async Task PrintAsync(string message)
{   
    await Task.Delay(new Random().Next(1000, 2000));     // имитация продолжительной операции
    Console.WriteLine(message);
}

Получение результата

// определяем и запускаем задачи
var task1 = SquareAsync(4);
var task2 = SquareAsync(5);
var task3 = SquareAsync(6);
 
// ожидаем завершения всех задач
int[] results = await Task.WhenAll(task1, task2, task3);
// получаем результаты:
foreach (int result in results)
    Console.WriteLine(result);
 
async Task<int> SquareAsync(int n)
{
    await Task.Delay(1000);
    return n * n;
}

Барьер

  • Класс C# Barrier — это примитивы синхронизации, используемые в многопоточности .NET. Барьер используется в алгоритме, состоящем из нескольких этапов.
  • В этой барьерной синхронизации у нас есть несколько потоков, работающих по одному алгоритму. Алгоритм работает поэтапно. Все потоки должны завершить фазу 1, после чего они могут перейти к фазе 2. Пока все потоки не завершат фазу 1, все потоки должны ждать, пока все потоки не достигнут фазы 1.
class Program
{
    static Barrier barrier = new Barrier(participantCount: 5);
    static void Main(string[] args)
    {
        Task[] tasks = new Task[5];

        for (int i = 0; i < 5; ++i)
        {
            int j = i;
            tasks[j] = Task.Factory.StartNew(() =>
            {
                GetDataAndStoreData(j);
            });
        }

        Task.WaitAll(tasks);

        Console.WriteLine("Backup completed");
        Console.ReadLine();

            
    }

    static void GetDataAndStoreData(int index)
    {
        Console.WriteLine("Getting data from server: " + index);
        Thread.Sleep(TimeSpan.FromSeconds(2));

        barrier.SignalAndWait();

        Console.WriteLine("Send data to Backup server: " + index);

        barrier.SignalAndWait();
    }
}

/* Result
                * Getting data from server: 0
                * Getting data from server: 2
                * Getting data from server: 1
                * Getting data from server: 3
                * Getting data from server: 4
                * Send data to Backup server: 1
                * Send data to Backup server: 4
                * Send data to Backup server: 2
                * Send data to Backup server: 0
                * Send data to Backup server: 3
                * Backup completed */
  • AddParticipant : этот метод добавляет одного участника в класс Barrier.
  • AddParticipants(int memberCount) : этот метод добавляет количество участников, указанное в параметре подсчета участников, в класс Barrier.
  • RemoveParticipant : этот метод удаляет одного участника из класса Barrier.
  • RemoveParticipants(int memberCount) : этот метод удаляет количество участников, указанное в параметре количества участников, из класса Barrier.
class Program
{
    static Barrier barrier = new Barrier(participantCount: 0);
    static void Main(string[] args)
    {
        int totalRecords = GetNumberOfRecords();

        Task[] tasks = new Task[totalRecords];

        for (int i = 0; i < totalRecords; ++i)
        {
            barrier.AddParticipant();

            int j = i;
            tasks[j] = Task.Factory.StartNew(() =>
            {
                GetDataAndStoreData(j);
            });
        }

        Task.WaitAll(tasks);

        Console.WriteLine("Backup completed");
        Console.ReadLine();
    }

    static int GetNumberOfRecords()
    {
        return 20;
    }

    static void GetDataAndStoreData(int index)
    {
        Console.WriteLine("Getting data from server: " + index);
        Thread.Sleep(TimeSpan.FromSeconds(2));

        barrier.SignalAndWait();

        Console.WriteLine("Send data to Backup server: " + index);

        barrier.SignalAndWait();
    }
}

Barrier(Int32, Action)

image

Класс AutoResetEvent

  • Класс AutoResetEvent также служит целям синхронизации потоков. Этот класс представляет событие синхронизации потоков, который позволяет при получении сигнала переключить данный объект-событие из сигнального в несигнальное состояние.
  • Для управления синхронизацией класс AutoResetEvent предоставляет ряд методов:
  • Reset(): задает несигнальное состояние объекта, блокируя потоки.
  • Set(): задает сигнальное состояние объекта, позволяя одному или несколким ожидающим потокам продолжить работу.
  • WaitOne(): задает несигнальное состояние и блокирует текущий поток, пока текущий объект AutoResetEvent не получит сигнал.
int x = 0;  // общий ресурс
 
AutoResetEvent waitHandler = new AutoResetEvent(true);  // объект-событие
 
// запускаем пять потоков
for (int i = 1; i < 6; i++)
{
    Thread myThread = new(Print);
    myThread.Name = $"Поток {i}";
    myThread.Start();
}
 
 
void Print()
{
    waitHandler.WaitOne();  // ожидаем сигнала
    x = 1;
    for (int i = 1; i < 6; i++)
    {
        Console.WriteLine($"{Thread.CurrentThread.Name}: {x}");
        x++;
        Thread.Sleep(100);
    }
    waitHandler.Set();  //  сигнализируем, что waitHandler в сигнальном состоянии
}

Notes

Mutex

  • Mutexes are of two types: local mutexes, which are unnamed, and named system mutexes. A local mutex exists only within your process. It can be used by any thread in your process that has a reference to the Mutex object that represents the mutex. Each unnamed Mutex object represents a separate local mutex.
  • Named system mutexes are visible throughout the operating system, and can be used to synchronize the activities of processes. You can create a Mutex object that represents a named system mutex by using a constructor that accepts a name. The operating-system object can be created at the same time, or it can exist before the creation of the Mutex object. You can create multiple Mutex objects that represent the same named system mutex, and you can use the OpenExisting method to open an existing named system mutex.

Перевод

  • Мьютексы бывают двух типов: локальные безымянные мьютексы и именованные системные мьютексы. Локальный мьютекс существует только внутри вашего процесса. Его может использовать любой поток вашего процесса, имеющий ссылку на объект Mutex, представляющий мьютекс. Каждый безымянный объект Mutex представляет собой отдельный локальный мьютекс.
  • Именованные системные мьютексы видны во всей операционной системе и могут использоваться для синхронизации действий процессов. Вы можете создать объект Mutex, представляющий именованный системный мьютекс, с помощью конструктора, который принимает имя. Объект операционной системы может быть создан одновременно или может существовать до создания объекта Mutex. Вы можете создать несколько объектов Mutex, которые представляют один и тот же именованный системный мьютекс, и вы можете использовать метод OpenExisting, чтобы открыть существующий именованный системный мьютекс.

Semaphores

  • Semaphores are of two types: local semaphores and named system semaphores. If you create a Semaphore object using a constructor that accepts a name, it is associated with an operating-system semaphore of that name. Named system semaphores are visible throughout the operating system, and can be used to synchronize the activities of processes. You can create multiple Semaphore objects that represent the same named system semaphore, and you can use the OpenExisting method to open an existing named system semaphore.
  • A local semaphore exists only within your process. It can be used by any thread in your process that has a reference to the local Semaphore object. Each Semaphore object is a separate local semaphore.

Перевод

  • Семафоры бывают двух типов: локальные семафоры и именованные системные семафоры. Если объект создается Semaphore с помощью конструктора, принимающего имя, он связывается с семафором операционной системы с таким именем. Именованные системные семафоры видны всей операционной системе и могут использоваться для синхронизации действий процессов. Можно создать несколько Semaphore объектов, представляющих один и тот же именованный системный семафор, и использовать OpenExisting метод для открытия существующего именованного системного семафора.
  • Локальный семафор существует только внутри процесса. Его может использовать любой поток в вашем процессе, имеющий ссылку на локальный объект Semaphore. Каждый Semaphore объект является отдельным локальным семафором.

SemaphoreSlim

  • System.Threading.Semaphore Класс представляет собой именованный (общесистемный) или локальный семафор. Он является тонкой оболочкой вокруг объекта семафора Win32. Семафоры Win32 являются семафорами счета, которые могут быть использованы для управления доступом к пулу ресурсов.
  • SemaphoreSlim Класс представляет упрощенный, быстрый семафор, который можно использовать для ожидания внутри одного процесса, когда предполагается, что времена ожидания будут очень короткими. SemaphoreSlim использует максимально примитивы синхронизации, предоставляемые общеязыковой среды выполнения (CLR). Тем не менее, он также предоставляет неактивно инициализированные дескрипторы ожидания на основе ядра при необходимости поддержки ожидания для нескольких семафоров. SemaphoreSlim также поддерживает использование токенов отмены, но не поддерживает именованные семафоры или использование дескриптора ожидания для синхронизации.

Приятная неожиданность

Parallel LINQ

  • По умолчанию все элементы коллекции в LINQ обрабатываются последовательно, но начиная с .NET 4.0 в пространство имен System.Linq был добавлен класс ParallelEnumerable, который инкапсулирует функциональность PLINQ (Parallel LINQ) и позволяет выполнять обращения к коллекции в параллельном режиме.
  • При обработке коллекции PLINQ использует возможности всех процессоров в системе. Источник данных разделяется на сегменты, и каждый сегмент обрабатывается в отдельном потоке. Это позволяет произвести запрос на многоядерных машинах намного быстрее.
  • В то же время по умолчанию PLINQ выбирает последовательную обработку данных. Переход к параллельной обработке осуществляется в том случае, если это приведет к ускорению работы. Однако, как правило, при параллельных операциях возрастают дополнительные издержки. Поэтому если параллельная обработка потенциально требует больших затрат ресурсов, то PLINK в этом случае может выбрать последовательную обработку, если она не требует больших затрат ресурсов.
  • Поэтому смысл применения PLINQ имеется преимущественно на больших коллекциях или при сложных операциях, где действительно выгода от распараллеливания запросов может перекрыть возникающие при этом издержки.
  • Также следует учитывать, что при доступе к общему разделяемому состоянию в параллельных операциях будет неявно использоваться синхронизация, чтобы избежать взаимоблокировки доступа к этим общим ресурсам. Затраты на синхронизацию ведут к снижению производительности, поэтому желательно избегать или ограничивать применения в параллельных операциях разделяемых ресурсов.

Метод AsParallel

  • Метод AsParallel() позволяет распараллелить запрос к источнику данных. Он реализован как метод расширения LINQ у массивов и коллекций. При вызове данного метода источник данных разделяется на части (если это возможно) и над каждой частью отдельно производятся операции.
int[] numbers = new int[] { 1, 2, 3, 4, 5, 6, 7, 8, };
var squares = from n in numbers.AsParallel()
                 select Square(n);
 
foreach (var n in squares)
    Console.WriteLine(n);
 
int Square(int n) => n * n;
49
1
9
25
64
4
16
36

Метод ForAll

  • Выше рассмотренный код по вычислению квадрата числа можно еще больше оптимизировать с точки зрения параллелизации. В частности, для вывода результата параллельной операции используется цикл foreach. Но его использование приводит к увеличению издержек - необходимо склеить полученные в разных потоках данные в один набор и затем их перебрать в цикле. Более оптимально в данном случае было бы использование метода ForAll(), который выводит данные в том же потоке, в котором они обрабатываются:
int[] numbers = new int[] { 1, 2, 3, 4, 5, 6, 7, 8, };

// с помощью операторов LINQ
(from n in numbers.AsParallel() select Square(n)).ForAll(Console.WriteLine);

// с помощью методов расширения LINQ
numbers.AsParallel().Select(n => Square(n)).ForAll(Console.WriteLine);

int Square(int n) => n * n;

Метод AsOrdered

  • При выполнении параллельного запроса порядок данных в результирующей выборки может быть не предсказуем. В этих случаях мы может применять метод AsOrdered():
 var squares = from n in numbers.AsParallel().AsOrdered()
              where n > 0
              select Square(n);
  • Кроме того, если в программе предстоят какие-нибудь манипуляции с полученным набором, однако упорядочивание больше не требуется, мы можем применить метод AsUnordered():
int[] numbers = new int[] { -2, -1, 0, 1, 2, 3, 4, 5, 6, 7, 8, };
var squares = from n in numbers.AsParallel().AsOrdered()
              where n > 0
              select Square(n);
 
// выбираем числа больще 4 без упорядочивания результата
var query = from n in squares.AsUnordered()
            where n > 4
            select n;
 
foreach (var n in query)
    Console.WriteLine(n);
 
int Square(int n) => n * n;

Обработка ошибок и отмена операции

  • При выполнении параллельных операций также могут возникать ошибки, обработка которых имеет свои особенности. При параллельной обработке коллекция разделяется а части, и каждая часть обрабатывается в отдельном потоке. Однако если возникнет ошибка в одном из потоков, то система прерывает выполнение всех потоков.
  • При генерации исключений все они агрегируются в одном исключении типа AggregateException
  • Например, пусть в метод факториала передается массив объектов, который содержит не только числа, но и строки:
 object[] numbers = new object[] { 1, 2, 3, 4, 5, "6" };
 
var squares = from n in numbers.AsParallel()
                 let x = (int)n
                 select Square(x);
try
{
    squares.ForAll(n => Console.WriteLine(n));
}
catch (AggregateException ex)
{
    foreach (var e in ex.InnerExceptions)
    {
        Console.WriteLine(e.Message);
    }
}
 
int Square(int n) => n * n;

Прерывание параллельной операции

  • Вполне вероятно, что нам может потребоваться прекратить операцию до ее завершения. В этом случае мы можем использовать метод WithCancellation(), которому в качестве параметра передается токен CancellationToken:
CancellationTokenSource cts = new CancellationTokenSource();
// запускаем дополнительную задачу, в которой через 400 миллисек. прерываем операцию
new Task(() =>
{
    Thread.Sleep(400);
    cts.Cancel();
}).Start();
 
try
{
    int[] numbers = new int[] { 1, 2, 3, 4, 5, 6, 7, 8, };
 
    var squares = from n in numbers.AsParallel().WithCancellation(cts.Token)
                     select Square(n);
 
    foreach (var n in squares)
        Console.WriteLine(n);
}
catch (OperationCanceledException)
{
    Console.WriteLine("Операция была прервана");
}
catch (AggregateException ex)
{
    if (ex.InnerExceptions != null)
    {
        foreach (Exception e in ex.InnerExceptions)
            Console.WriteLine(e.Message);
    }
}
finally
{
    cts.Dispose();
}
int Square(int n)
{
    var result = n * n;
    Console.WriteLine($"Квадрат числа {n} равен {result}");
    Thread.Sleep(1000); // имитация долгого вычисления
    return result;
}

Потокобезопасные коллекции

  • В .NET Framework 4 введено пространство имен System.Collections.Concurrent, включающее несколько потокобезопасных и масштабируемых классов коллекций. Несколько потоков могут безопасно и эффективно добавлять и удалять элементы из таких коллекций, не требуя при этом дополнительной синхронизации в пользовательском коде. При написании нового кода пользуйтесь классами параллельных коллекций, когда множество потоков будет вести в коллекцию запись параллельно. Если выполняется только чтение из общей коллекции, вы можете использовать классы в пространстве имен System.Collections.Generic. Мы рекомендуем использовать классы коллекций версии 1.0 только в том случае, если вам нужна среда выполнения .NET Framework до версии 1.1 включительно.

  • Коллекции, представленные в .NET Framework 1.0, находятся в пространстве имен System.Collections. Эти коллекции, которые содержат часто используемые классы ArrayList и Hashtable, обеспечивают определенную степень потокобезопасности посредством свойства Synchronized, которое создает для коллекции потокобезопасную программу-оболочку. Работа программы оболочки заключается в блокировке всей коллекции при каждой операции добавления или удаления. Поэтому каждый поток, который пытается получить доступ к коллекции, должен ждать своей очереди для получения блокировки. Такой подход не является масштабируемым и может привести к значительному снижению производительности для больших коллекций. Также такой подход не защищен полностью от состояния гонки. Дополнительные сведения см. в статье Синхронизация в универсальных коллекциях.

  • Классы коллекций, представленные в .NET Framework 2.0, находятся в пространстве имен System.Collections.Generic. К ним относятся List, Dictionary<TKey,TValue> и др. Эти классы отличаются улучшенной безопасностью типа и производительностью по сравнению с классами .NET Framework 1.0. Однако классы коллекций .NET Framework 2.0 не обеспечивают синхронизацию потоков. Пользовательский код должен обеспечивать всю синхронизацию при параллельном добавлении элементов в несколько потоков или удалении элементов из них.

  • Рекомендуем использовать классы многопоточных коллекций .NET Framework 4, так как они обеспечивают не только безопасность типа, как у классов .NET Framework 2.0, но и большую эффективность и более полную потокобезопасность по сравнению с коллекциями .NET Framework 1.0.

  • Классы ConcurrentQueue<T> и ConcurrentStack<T> не используют блокировку. Вместо этого они используют операции класса Interlocked (реализует атомарные аперации) для обеспечения потокобезопасности.

https://learn.microsoft.com/ru-ru/dotnet/api/system.threading.interlocked?view=net-7.0

image

  • В версию 4.0 среды .NET Framework добавлено новое пространство имен System.Collections.Concurrent. Оно содержит коллекции, которые являются потокобезопасными и специально предназначены для параллельного программирования. Это означает, что они могут безопасно использоваться в многопоточной программе, где возможен одновременный доступ к коллекции со стороны двух или больше параллельно исполняемых потоков.
  • Для безопасного в отношении потоков доступа к коллекциям определен интерфейс IProducerConsumerCollection. Наиболее важными методами этого интерфейса являются TryAdd() и TryTake(). Метод TryAdd() пытается добавить элемент в коллекцию, но это может не получиться, если коллекция заблокирована от добавления элементов. Метод возвращает булевское значение, сообщающее об успехе или неудаче операции.
  • TryTake() работает аналогичным образом, информируя вызывающий код об успехе или неудаче, и в случае успеха возвращает элемент из коллекции. Ниже перечислены классы из пространства имен System.Collections.Concurrent с кратким описанием их функциональности:

ConcurrentQueue<T>

  • Этот класс коллекции реализован со свободным от блокировок алгоритмом и использует 32 массива, которые внутренне скомбинированы в связный список. Для доступа к элементам очереди применяются методы Enqueue(), TryDequeue() и TryPeek(). Имена этих методов очень похожи на уже известные методы Queue, но с добавлением префикса Try к тем из них, которые могут дать сбой. Поскольку этот класс реализует интерфейс IProducerConsumerCollection, методы TryAdd() и TryTake() просто вызывают Enqueue() и TryDequeue().

ConcurrentStack<T>

  • Очень похож на ConcurrentQueue, но с другими методами доступа к элементам. Класс ConcurrentStack определяет методы Push(), PushRange(), TryPeek(), TryPop() и TryPopRange(). Внутри этот класс использует связный список для хранения элементов.

ConcurrentBag<T>

  • Этот класс не определяет никакого порядка для добавления или извлечения элементов. Он реализует концепцию отображения потоков на используемые внутренне массивы, и старается избежать блокировок. Для доступа к элементам применяются методы Add(), TryPeek() и TryTake().

ConcurrentDictionary<TKey, TValue>

  • Безопасная в отношении потоков коллекция ключей и значений. Для доступа к членам в неблокирующем режиме служат методы TryAdd(), TryGetValue(), TryRemove() и TryUpdate(). Поскольку элементы основаны на ключах и значениях, ConcurrentDictionary<TKey, TValue> не реализует интерфейс IProducerConsumerCollection.

BlockingCollection<T>

  • Коллекция, которая осуществляет блокировку и ожидает, пока не появится возможность выполнить действие по добавлению или извлечению элемента. BlockingCollection предлагает интерфейс для добавления и извлечения элементов методами Add() и Take(). Эти методы блокируют поток и затем ожидают, пока не появится возможность выполнить задачу.
  • Метод Add() имеет перегрузку, которой можно также передать CancellationToken. Эта лексема всегда отменяет блокирующий вызов.
  • Если не нужно, чтобы поток ожидал бесконечное время, и не хотите отменять вызов извне, доступны также методы TryAdd() и TryTake(). В них можно указать значение таймаута — максимального периода времени, в течение которого вы готовы блокировать поток и ждать, пока вызов не даст сбой.