суббота, 29 февраля 2020 г.

Job system

Понемногу пишу движок, на котром обкатываю новые фичи. Среди них - планировщик задач (thread pool, job system, ...).
Какой требуется функционал:
  • Зависимости между тасками - то есть таск запускается только когда все его зависимости завершились.
  • Отмена тасков - до захвата таска потоком отмену обрабатывает сам планировщик, после - поток в котором выполняется таск.
  • Слабые и сильные зависимости - определяет что будет при отмене зависимого таска. При слабой зависимости следующий таск запустится даже при отмене зависимого таска. При сильной зависимости следующий таск также отменяется.
  • Кастомные зависимости - таким образом можно добавить зависимость между таском и GPU через VkFence, например.
  • Совместимость с другими системами - работа с сетью также сделана на тасках, многопоточный ECS и рендер тоже будет на них и тд.
  • Поддержка блокировок - для этого сделана отдельная зависимость InterlockDependency, куда передается callback, который может захватить мютекс или любой другой примитив синхронизации. Особенность в том, что захват идет через try_lock и если захватить не получилось, то таск остается в очереди до следующей проверки, если же получилось захватить, то таск отправляется на выполнение, а после выполнения вызывает функцию что бы разблокировать. Это позволяет не блокировать потоки при доступе к общим данным.
Исходники можно посмотреть тут.


Особенности реализации

Thread affinity
Планировщик ОС может перекидывать потоки на разные ядра. Это хорошо, когда несколько потоков работают на одном ядре, когда другие ядра простаивают, тогда ОС перераспределит потоки, что увеличит производительность. Но в моем случае работу по потокам и соответственно ядрам процессора распределяет мой планировщик и ОС здесь только мешает, поэтому каждый поток жестко привязан к конкретному ядру процессора.


Task queue
После тестов производительности я пришел к варианту, когда задачи складываются в несколько очередей, а потоки забирают у них задачи. Достаточно 2-3 потоков на 1 очередь, чтобы потоки долго не простаивали при попытке забрать задачу из очереди. Сейчас у меня используется блокирующий алгоритм, но если очередь заблокирована другим потоком, то и поток и планировщик пытаются заблокировать другую очередь, таким образом уменьшается время блокировок. Для однопоточного рендера (OpenGL например) и сети используется по одной очереди, для остальных задач (в том числе Vulkan рендера) используется минимум 2 очереди, а максимум рассчитывается как половина от количества ядер процессора.

В дальнейшем планируется после выполнения таска проходить по зависящим от него таскам и выполнять их не обращаясь к очереди. С одной стороны это должно ускорить работу и уменьшить кэшпромахи, с другой более важные таски, которые находятся в начале очереди и не имеют зависимостей будут выполняться позже. Поддерживать разную логику работы тоже не хочется, это усложнит код, так что это улучшение пока под вопросом.

Более сложная оптимизация - переход на lock-free алгоритмы. Это даст прирост производительности для большого количества мелких тасков, но будет незаметно на тяжелых тасках.


Thread mask
Для тасков можно явно задать в каких типах потоков он может выполняться. Этого функционала я не видел в сторонних реализациях планировщиков. Таким образом можно использовать систему тасков для задач, которые используют сторонние либы, где хуже поддержка многопоточности, например OpenGL (ES), сеть, доступ к диску и тд. Для каждого типа потока создается своя очередь (одна или несколько), а потоки уже решают из каких очередей они будут брать задачи. Например для главного потока, где окно с OpenGL контекстом может быть маска (EThread::Main | EThread::Render).


Task dependencies
В отличие от того что было в FG, где на каждый таск запускался цикл по всем его зависимостям, здесь я ограничил количество зависимостей до 64 на таск чтобы они уместились в один атомик. Таким образом проверка можно ли выполнять таск состоит всего из двух чтений атомиков. А цикл по зависимостям переместился в этап после выполнения таска.

// check input dependencies
if ( curr->_canceledDepsCount.load( EMemoryOrder::Relaxed ) > 0 or
     curr->_waitBits.load( EMemoryOrder::Relaxed ) == 0 )
{
    ready = true;
}


Promise
Поверх Job system реализована промисы - граф функций, где функции могут возвращать значения, а зависимые от них функции принимают их как аргументы. Промисы можно скастить к таскам и использовать как зависимости в job system.

auto p0 = MakePromise( [] () { return "a"s; });
auto p1 = MakePromise( [] () -> String { return "b"s; });
auto p2 = MakePromise( [] () { return 1u; });
auto p3 = MakePromiseFromTuple( MakeTuple( p0, p1, p2 ));
auto p4 = p3.Then( [] (const Tuple<String, String, uint> &in) {
            return in.Get<0>() + in.Get<1>() + ToString( in.Get<2>() );
        });

Комментариев нет:

Отправить комментарий