Многопоточность здорового человека и многопоточность курильщика

В прошлой серии своих упражнений я дошел до того, что уперся в malloc. И успокоился. Но нет. Что-то чесалось и я решил таки продолжить, как выяснилось - не зря.

Чтобы не упираться в malloc, я перешел к такой единичной задаче:

  • Передаем в worker указатель на данные (и размер)
  • Считаем fnv64
  • Результат - 64-битное число, которое копируется без аллокации данных и подобного.

Кроме того, мне очень хотелось добиться двух вещей

  • Вписать всю конструкцию в Qt-шные signal/slot, то есть каждый результат обработки должен вызывать slot в main thread (в моих тестах этот слот только считал "сколько задач осталось" ну и минимальную статистику)
  • Обойтись без батчинга, потому что усложняет сильно.

На моей машине fnv64 по 100 килобайтам данных считается примерно 0.1ms, в качестве размера задачи я брал 100, 200, 400, 800, 1600кб, время исполнения единичной задачи вы можете легко почитать в уме.

Делаем раз

Стандартная для Qt реализация многопоточной обработки выглядит как-то так:

  • Controller: запускает threads, передает им задачи через signal/slot (QMetaObject::invokeMethod(thread[id],"runSingleJob"....)
  • Worker: наследник QObject со слотом runSingleJob, которому сделано moveToThread(...)
  • Worker, обработав задание, возвращает его через emit processed(....) в Controller
  • Controller форвардит этот сигнал в какой-то наш GUI (или не GUI) объект, который все и обрабатывает.

Эта схема дает два signal/slot Qt::QueuedConnection (в Worker и обратно) и один signal/slot из контроллера в куда-то там еще.

Последние два шага можно объединить в один, ценой потери инкапсуляции, про worker/threads начинает знать не только Controller. Я так пробовал, на производительности это не отражается (signal/slot с Qt::LocalConnection очень быстрый) и оставил две ступени в целях инкапсуляции.

Вот если все это собрать воедино и запустить, то получится такая вот картинка производительности:

Как мы видим, начиная с ~30000 операций в секунду все становится очень плохо.  Для самой мелкой задачи удается разогнаться до 60 krps, задачи чуть большего размера - упираются примерно в 30 и дальше, с ростом количества потоков ВСЕ ОТВРАТИТЕЛЬНО: производительность не просто не растет, а резко падает (это похоже на насыщение какого-то mutex/spinlock, который слишком сильно теребят), даже не в разы падает а на порядок-полтора-почти два (с 60к до 2k rps).

Если "собственно задача" большая и до 30 krps не доходит - то скейлинг близок к линейному (что ожидаемо: расходы на коммуникации невелики относительно самой задачи).

Профайлинг показал, что упираемся мы на этот раз не в malloc (его нет) и не в Qt-шный QMutex на очереди событий (объекты мелкие и похоже проскакивают быстро), а в MsgWaitForMultipleObjectsEx (из User32.dll) которой и передаются события между потоками в Qt. Там внутри наверное какой-то свой семафор или mutex.

И, да, эти ~2000 op/sec я вижу на своей машине при исполнении каких-то быстрых массовых операций (типа чтения метадаты по N-му разу) если в одной программе запущено много рабочих потоков.

Делаем два

Мы так уже делали, точнее не совсем так, но близко

  • Controller - складывает задачи в tbb::concurrent_bounded_queue
  • Worker: теперь наследник QThread с переопределенным методом run() в котором:
    • queue->pop
    • process data
    • emit result(..)
  • (да, как я теперь знаю, из QThread без event loop можно делать emit)
  • Остальное все как было - все эти result собираются в Controller и форвардятся в main (gui) object, если убрать этот этап форвардинга - производительность не меняется значимо.

У нас стал один signal/slot между потоками. Запускаем, смотрим:

Стало, как видим, ГОРАЗДО ЛУЧШЕ. Насыщение все еще происходит, но катастрофического падения производительности нет.

Из общих соображений понятно, что если мнго потоков теребят один семафор (или mutex) то это плохо. А если таких теребящих мало - то лучше.

Делаем три

  • Worker теперь не делает emit result(...) а выливает результаты в такую же tbb:concurrent_bounded_queue
  • Появляется один (дополнительный) thread Receiver, который читает вышеупомянутую очередь и уже сам делает emit:
    while(1) { Result r; results_queue->pop(r); emit processed(r); }
  • этот emit проходи по пути Controller - (local connection) - main object.

Количество событий (на единичную работу) не изменилось, 1 remote + 1 local signal/slot, но все remote (queued) сигналы идут из одного потока.

Запускаем, смотрим:

Стало практически идеально. Нет, при переходе от 24 к 32 потокам прямая маленько загибается (но не исключено что это влияние дополнительного потока Receiver, который тоже ж жрет немного CPU. Проверять на промежуточном между 24 и 32 количестве потоков не буду, не вижу смысла).

При этом:

  • Все хитрости спрятаны внутри Controller (который запускает threads, создает очереди, совокупляет все)
  • Controller выглядит снаружи как QObject: туда задачи, оттуда сигналы по числу выполненных задач (никакого батчинга).
  • И я счастлив, теперь всякие кэши и префетчи у одной программы постепенно переделаем на эту схему.

Код не публикую. Сказано достаточно, а на код и эксперименты ушло немало времени, можно считать детальный результат нашим know how :)
 

Comments

Здорово - кода и не надо, вроде и так все понятно. Выбрасываем QTшный механизм коммуникаций и заменяем TBB очередями в обе стороны. Беру на заметку...

Ну не вполне так:
1) Qt-шный механизм коммуникаций таки нужен: если мы хотим по обработке чего-то таки позвать какой-то Qt-шный слот из main thread (а альтернативы этому довольно плохие: или блокировать event loop у main thread и читать из очереди; или поллинг по таймеру).
То есть выливать из main thread задачи в worker-ов можно через очереди, поскольку worker-ам можно (и нужно!) блокироваться если работы нет. А вот забирать результат - так нельзя.

2) "хитрость" в том, что мы зовем slot в контексте main thread не из большого количества рабочих потоков (это работает плохо или очень плохо) а из одного вспомогательного потока. Там наверное тоже где-то встречаются mutex/семафор/что-то подобное, но когда их теребят из одного потока - оно ведет себя гораздо лучше, чем если бы мы теребили из 32).

Понял - спасибо.

Я дописал строчечку там, как выглядит (в первом приближении) Receiver::run()

Ага спасибо - я так и понял.