[C++] часть 2: МЬЮТЕКС. Пишем наш первый код для многопоточной среды



Книга [C++] часть 2: МЬЮТЕКС. Пишем наш первый код для многопоточной среды

Часть 1, Часть 2


В прошлой статье мы разобрались с тем, что такое конкурентность/параллелизм и зачем нужна синхронизация. Настала пора изучить примитивы синхронизации, которые предлагает нам стандартная библиотека шаблонов C++.


Первым из них будет std::mutex. Но сначала ознакомьтесь с картой статьи (она пригодится, если вы вдруг запутаетесь).



Итак, начнём.


Что такое мьютекс?


Мьютекс (англ. mutex, от mutual exclusion — «взаимное исключение») — это базовый механизм синхронизации. Он предназначен для организации взаимоисключающего доступа к общим данным для нескольких потоков с использованием барьеров памяти (для простоты можно считать мьютекс дверью, ведущей к общим данным).


Синтаксис


  • Заголовочный файл | #include <mutex>
  • Объявление | std::mutex mutex_name;
  • Захват мьютекса | mutex_name.lock();
    Поток запрашивает монопольное использование общих данных, защищаемых мьютексом. Дальше два варианта развития событий: происходит захват мьютекса этим потоком (и в этом случае ни один другой поток не сможет получить доступ к этим данным) или поток блокируется (если мьютекс уже захвачен другим потоком).
  • Освобождение мьютекса | mutex_name.unlock();
    Когда ресурс больше не нужен, текущий владелец должен вызвать функцию разблокирования unlock, чтобы и другие потоки могли получить доступ к этому ресурсу. Когда мьютекс освобождается, доступ предоставляется одному из ожидающих потоков.

#include <mutex>
#include <vector>std::mutex door; // объявление мьютекса
std::vector<int> v; // общие данные door.lock();
/*-----------------------*/ /* Это потокобезопасная зона: допускается только один поток за раз
*
* Гарантируется монопольное использование вектора v
*/ /*-----------------------*/
door.unlock();


Как создать потокобезопасную очередь


Разберёмся, как реализовать простейшие потокобезопасные очереди, то есть очереди с безопасным доступом для потоков.
В библиотеке стандартных шаблонов уже есть готовая очередь (rawQueue). Наша реализация будет предполагать: а) извлечение и удаление целочисленного значения из начала очереди и б) добавление нового в конец очереди. И всё это при обеспечении потокобезопасности.


Сначала выясним, почему и как эти две операции могут создавать проблемы для многопоточности.


  • Извлечение и удаление
    Для извлечения и удаления значения из начала очереди необходимо выполнить три операции:
    1. Проверить, не пуста ли очередь.
    2. Если нет, получается ссылка на начало очереди (rawQueue.front()). 
    3. Удаляется начало очереди (rawQueue.pop()). 
    В промежутках между этими этапами к очереди могут получать доступ и другие потоки с целью внесения изменений или чтения. Попробуйте сами.

Например:


Удалили “1”, хотя до его извлечения даже не дошли, потому что поток B извлекает 0 и удаляет 1.
Дальше — больше: если rawQueue состоит из одного элемента, поток B видит непустую очередь, и тут же поток A удаляет последнее значение. Теперь поток B пытается удалить первое значение из пустой очереди, приводя к неопределённому поведению. Настоящая страшилка!

  • Добавление
    Рассмотрим теперь добавление нового значения с помощью rawQueue.push(): новый элемент добавляется в конец контейнера и становится следующим за последним на данный момент элементом. Дальше на единицу увеличивается размер. Заметили здесь проблему? А что, если два потока одновременно добавят новое значение, увидев этот последний элемент? И что может произойти в интервале между добавлением нового элемента и увеличением размера? Кто-нибудь возьмёт да и прочитает неправильный размер. 

Получается, мы должны быть уверены, что никто не будет трогать очередь, пока мы выполняем наши задачи. Используем мьютекс для защиты этих многоступенчатых операций и сделаем так, чтобы все вместе они смотрелись как одна атомарная операция.


#include <mutex>
#include <queue>

class threadSafe_queue {

std::queue<int> rawQueue; // структура, общая для всех потоков
std::mutex m; // красная дверь rawQueue

public:

int& retrieve_and_delete() {
int front_value = 0; // если пустая, возвращает 0
m.lock();
// Отныне текущий поток единственный, который имеет доступ к rawQueue
if( !rawQueue.empty() ) {
front_value = rawQueue.front();
rawQueue.pop();
}
m.unlock();
// теперь другие потоки могут захватить мьютекс
return front_value;
};

void push(int val) {
m.lock();
rawQueue.push(val);
m.unlock();
};

};

Обратите внимание:


  1. Связь между мьютексом и защищаемым ресурсом — только в голове программиста.
    Мы знаем, что мьютекс m защищает rawQueue, но напрямую это не указывается.
  2. Захват с необходимой степенью распараллеливания. 
    Использование мьютекса уменьшает параллелизм. Предположим, у нас только один мьютекс для защиты вектора и строки без каких-либо зависимостей (например, значение переменной не зависит от вектора и наоборот). Поток А захватывает мьютекс, читает строку и начинает обработку каких-то данных перед добавлением нового значения в вектор и освобождением мьютекса. И тут появляется поток B, который хочет внести пару изменений в строку, но при попытке захватить мьютекс (какая досада!) оказывается блокированным до того момента, пока не будут завершены все операции с вектором. Проблему решаем дополнительным мьютексом для строки (захваченным перед чтением и освобождённым сразу по завершении).
    → Всегда прикидывайте, какой объём данных будет защищён одним мьютексом.
  3. Проводите захват только для тех операций, которым это необходимо. 
    См. предыдущий пункт. 
  4. Не вызывайте lock(), если мьютекс у вас уже есть.
    Мьютекс уже заблокирован, и попытки повторного захвата приведут вас к состоянию бесконечного ожидания. Если он вам так нужен, можно воспользоваться классом std::recursive_mutex. Рекурсивный мьютекс можно получить одним и тем же потоком много раз, но столько же раз он должен быть и освобождён.
  5. Используйте try_lock() или std::timed_mutex, если не хотите блокироваться и ожидать неопределённое время.
    try_lock() — это неблокирующий метод в std::mutex. Он возвращает немедленно, даже если владение не получено, причём со значением true, если мьютекс захвачен, и false — если нет. 
    std::timed_mutex предлагает два неблокирующих метода для захвата мьютекса: try_lock_for() и try_lock_until(), причём оба возвращаются по истечении времени со значением true или false в зависимости от успешности захвата.
  6. Не забывайте вызывать unlock() или используйте std::lock_guard (или другие шаблонные классы), когда есть возможность. 
    См. ниже.

Lock guard и парадигма RAII


У нас две большие проблемы с этим простым мьютексом:


  • Что произойдёт, если мы забудем вызвать unlock()? Ресурс будет недоступен в течение всего времени существования мьютекса, и уничтожение последнего в неразблокированном состоянии приведёт к неопределённому поведению.
  • Что произойдёт, если до вызова unlock() будет выброшено исключение? unlock() так и не будет исполнен, а у нас будут все перечисленные выше проблемы.

К счастью, проблемы можно решить с помощью класса std::lock_guard. Он всегда гарантированно освобождает мьютекс, используя парадигму RAII (Resource Acquisition Is Initialization, что означает «получение ресурса есть инициализация»). Вот как это происходит: мьютекс инкапсулируется внутри lock_guard, который вызывает lock() в его конструкторе и unlock() в деструкторе при выходе из области видимости. Это безопасно даже при исключениях: раскрутка стека уничтожит lock_guard, вызывая деструктор и таким образом освобождая мьютекс.


std::lock_guard<std::mutex> lock_guard_name(raw_mutex);
#include <mutex>
#include <vector>
std::mutex door; // объявление мьютекса
std::vector<int> v;
{
std::lock_guard<std::mutex> lg(door);
/* Вызывается конструктор lg, эквивалентный door.lock();
* lg, размещается в стеке */
/*-----------------------*/

/* Гарантируется монопольное использование вектора v */

/*-----------------------*/
} /* lg выходит из области видимости. Вызывается деструктор, эквивалентный door.unlock(); */

Посмотрим теперь, как можно изменить нашу потокобезопасную очередь threadSafe_queue (на этот раз обращаем внимание на то, где освобождается мьютекс).


#include <mutex>
#include <queue>

class threadSafe_queue {

std::queue<int> rawQueue; // структура, общая для всех потоков
std::mutex m; // красная дверь rawQueue

public:

int& retrieve_and_delete() {
int front_value = 0; // если пустая, return будет 0
std::lock_guard<std::mutex> lg(m);
// Отныне текущий поток единственный, который имеет доступ к rawQueue
if( !rawQueue.empty() ) {
front_value = rawQueue.front();
rawQueue.pop();
}
return front_value;
}; // теперь другие потоки могут захватить мьютекс

void push(int val) {
std::lock_guard<std::mutex> lg(m);
// отныне текущий поток единственный, который имеет доступ к rawQueue
rawQueue.push(val);
}; // теперь другие потоки могут захватить мьютекс
};

Unique lock, дающий свободу


Как только владение мьютексом получено (благодаря std::lock_guard), он может быть освобождён. std::unique_lock действует в схожей манере плюс делает возможным многократный захват и освобождение (всегда в таком порядке) мьютекса, используя те же преимущества безопасности парадигмы RAII.


std::unique_lock<std::mutex> unique_lock_name(raw_mutex);
#include <mutex>
#include <vector>
std::mutex door; //объявление мьютекса
std::vector<int> v;
{
std::unique_lock<std::mutex> ul(door);
// Вызывается конструктор ul, эквивалентный door.lock();
// ul, размещённый в стеке
// гарантируется монопольное использование вектора

door.unlock();

// выполнение операций, не связанных с вектором
// ....
// теперь мне снова нужен доступ к вектору

door.lock();
//Снова гарантируется монопольное использование вектора
} /* unique_lock выходит из области видимости. Вызывается деструктор, эквивалентный door.unlock(); */

Когда использовать?


  • Когда вам не всегда нужен захват ресурса.
  • Вместе с std::condition_variable (в следующей статье).
  • При захвате std::shared_mutex в эксклюзивном режиме (см. далее). 

Общий мьютекс + общий захват дают больше читателей


std::mutex  — это мьютекс, которым одномоментно может владеть только один поток. Однако это ограничение не всегда обязательно. Например, потоки могут одновременно и безопасно читать одни и те же общие данные. Просто читать, не производя с ними никаких изменений. Но в случае с доступом к записи только записывающий поток может иметь доступ к данным.


Начиная с C++17, std::shared_mutex формирует доступ двух типов:


  • Общий доступ: потоки вместе могут владеть мьютексом и иметь доступ к одному и тому же ресурсу. Доступ такого типа можно запросить с помощью std::shared_lock (lock guard для общего мьютекса). При таком доступе любой эксклюзивный доступ блокируется.
  • Эксклюзивный доступ: доступ к ресурсу есть только у одного потока. Запрос этого типа осуществляется с помощью класса unique lock.

Синтаксис


  • Заголовочный файл | #include <shared_mutex>;
  • Объявление | std::shared_mutex raw_sharedMutex;
  • Для захвата в общем режиме |
    std::shared_lock<std::shared_mutex> sharedLock_name(raw_sharedMutex);
  • Для захвата в эксклюзивном режиме |
    std::unique_lock<std::shared_mutex> uniqueLock_name(raw_sharedMutex);

#include <shared_mutex>
#include <vector>
std::shared_mutex door; //объявление мьютекса
std::vector<int> v;
int readVectorSize() {
/* потоки могут вызывать эту функцию одновременно
* доступ на запись запрещена, когда получен sl */

std::shared_lock<std::shared_mutex> sl(door);
return v.size();
}
void pushElement(int new_element) {
/* гарантирован эксклюзивный доступ к вектору */

std::unique_lock<std::shared_mutex> ul(door);
v.push_back(new_element);
}

Scoped lock, дающий больше мьютексов (и без клинча)


Впервые появившийся в C++17 и действующий в схожей манере, что и std::lock_guard, он даёт возможность получения нескольких мьютексов. Без std::scoped_lock такая операция очень опасна, так как может привести к взаимной блокировке.


Краткая история взаимоблокировки:


Поток A хочет увести 200$ с банковского счёта Жеки на счёт Бяки в виде одной атомарной операции. Он начинает с того, что захватывает мьютекс, защищающий счёт Жеки, чтобы изъять деньги, а затем пытается захватить счёт Бяки.
В то же время поток B хочет увести 100$ со счёта Бяки на счёт Жеки. Он получает захват счёта Бяки, чтобы изъять деньги и попытаться захватить счёт Жеки. Оба потока блокируются, уснув в ожидании друг друга.


std::scoped_lock одновременно захватывают (а затем освобождают) все мьютексы, передаваемые в качестве аргумента, по принципу «всё или ничего»: если хотя бы один захват выбрасывает исключение, освобождаются все уже захваченные мьютексы.


  • std::scoped_lock<std::mutex> scoped_lock_name(raw_mutex1, raw_mutex2, ..);

Заключение


Если вы вдруг запутались в этом ворохе новой информации:


  • воспользуйтесь картой в начале статьи (или составьте свою);
  • применяйте на практике новые знания и пробуйте писать простенький код.

До встречи в следующей статье, в которой речь пойдёт о condition_variableи вы узнаете, как синхронизировать потоки!


551   0  

Comments

    Ничего не найдено.