Use boost::lockfree::stack instead of boost::lockfree::queue

This commit is contained in:
Nikolai Kochetov 2019-07-03 15:27:28 +03:00
parent 3ff5495fba
commit acf382f9e5
5 changed files with 53 additions and 7 deletions

View File

@ -18,11 +18,11 @@ if (SANITIZE)
set (CMAKE_EXE_LINKER_FLAGS "${CMAKE_EXE_LINKER_FLAGS} -static-libmsan")
endif ()
elseif (SANITIZE STREQUAL "thread")
set (CMAKE_CXX_FLAGS "${CMAKE_CXX_FLAGS} ${SAN_FLAGS} -fsanitize=thread")
set (CMAKE_CXX_FLAGS "${CMAKE_CXX_FLAGS} ${SAN_FLAGS} -fsanitize=thread -fsanitize-blacklist=${CMAKE_CURRENT_SOURCE_DIR}/cmake/tsan-suppressions.txt")
set (CMAKE_C_FLAGS "${CMAKE_C_FLAGS} ${SAN_FLAGS} -fsanitize=thread")
set (CMAKE_EXE_LINKER_FLAGS "${CMAKE_EXE_LINKER_FLAGS} -fsanitize=thread")
set (CMAKE_EXE_LINKER_FLAGS "${CMAKE_EXE_LINKER_FLAGS} -fsanitize=thread -fsanitize-blacklist=${CMAKE_CURRENT_SOURCE_DIR}/cmake/tsan-suppressions.txt")
if (MAKE_STATIC_LIBRARIES AND CMAKE_CXX_COMPILER_ID STREQUAL "GNU")
set (CMAKE_EXE_LINKER_FLAGS "${CMAKE_EXE_LINKER_FLAGS} -static-libtsan")
set (CMAKE_EXE_LINKER_FLAGS "${CMAKE_EXE_LINKER_FLAGS} -static-libtsan -fsanitize-blacklist=${CMAKE_CURRENT_SOURCE_DIR}/cmake/tsan-suppressions.txt")
endif ()
elseif (SANITIZE STREQUAL "undefined")
set (CMAKE_CXX_FLAGS "${CMAKE_CXX_FLAGS} ${SAN_FLAGS} -fsanitize=undefined -fno-sanitize-recover=all")

View File

@ -0,0 +1,7 @@
# Wrappers for boost lockfree.
# https://github.com/boostorg/lockfree/issues/22
# https://stackoverflow.com/questions/53996201/boost-lock-free-queue-is-triggering-clangs-thread-sanitizer
race:DB::PipelineExecutor::TaskQueue::push
race:DB::PipelineExecutor::TaskQueue::pop
race:dbms/src/Processors/Executors/PipelineExecutorTaskQueue.cpp
race:../dbms/src/Processors/Executors/PipelineExecutorTaskQueue.cpp

View File

@ -30,7 +30,6 @@ static bool checkCanAddAdditionalInfoToException(const DB::Exception & exception
PipelineExecutor::PipelineExecutor(Processors & processors)
: processors(processors)
, task_queue(min_task_queue_size)
, num_tasks_and_active_threads(0)
, cancelled(false)
, finished(false)
@ -360,7 +359,7 @@ void PipelineExecutor::doExpandPipeline(ExpandPipelineTask * task)
task->condvar.wait(lock, [&]()
{
return task->num_waiting_threads == num_processing_executors || expand_pipeline_task != task;
return task->num_waiting_threads >= num_processing_executors || expand_pipeline_task != task;
});
/// After condvar.wait() task may point to trash. Can change it only if it is still in expand_pipeline_task.

View File

@ -8,7 +8,7 @@
#include <Common/EventCounter.h>
#include <common/logger_useful.h>
#include <boost/lockfree/queue.hpp>
#include <boost/lockfree/stack.hpp>
namespace DB
{
@ -119,7 +119,21 @@ private:
Nodes graph;
using Stack = std::stack<UInt64>;
using TaskQueue = boost::lockfree::queue<ExecutionState *>;
class TaskQueue
{
public:
TaskQueue() : container(0) {}
bool push(ExecutionState * value);
bool pop(ExecutionState *& value);
void reserve(size_t size);
void reserve_unsafe(size_t size);
private:
boost::lockfree::stack<ExecutionState *> container;
};
/// Queue with pointers to tasks. Each thread will concurrently read from it until finished flag is set.
/// Stores processors need to be prepared. Preparing status is already set for them.

View File

@ -0,0 +1,26 @@
#include <Processors/Executors/PipelineExecutor.h>
namespace DB
{
bool PipelineExecutor::TaskQueue::push(ExecutionState * value)
{
return container.push(value);
}
bool PipelineExecutor::TaskQueue::pop(ExecutionState *& value)
{
return container.pop(value);
}
void PipelineExecutor::TaskQueue::reserve(size_t size)
{
container.reserve(size);
}
void PipelineExecutor::TaskQueue::reserve_unsafe(size_t size)
{
container.reserve_unsafe(size);
}
}