Initialize GlobalThreadPool explicitly

This commit is contained in:
Alexander Kuzmenkov 2020-06-22 22:04:12 +03:00
parent bd5ab9c686
commit 96d2e9c997
6 changed files with 40 additions and 12 deletions

View File

@ -439,6 +439,9 @@ private:
registerFunctions();
registerAggregateFunctions();
// Needed for parallel parsing.
GlobalThreadPool::initialize();
/// Batch mode is enabled if one of the following is true:
/// - -e (--query) command line option is present.
/// The value of the option is used as the text of query (or of multiple queries).

View File

@ -1040,6 +1040,9 @@ try
return 0;
}
// Needed for parallel parsing.
GlobalThreadPool::initialize();
UInt64 seed = sipHash64(options["seed"].as<std::string>());
std::string structure = options["structure"].as<std::string>();

View File

@ -431,6 +431,8 @@ int Server::main(const std::vector<std::string> & /*args*/)
DateLUT::instance();
LOG_TRACE(log, "Initialized DateLUT with time zone '{}'.", DateLUT::instance().getTimeZone());
/// Initialize global thread pool
GlobalThreadPool::initialize(config().getUInt("max_thread_pool_size", 10000));
/// Storage with temporary data for processing of heavy queries.
{

View File

@ -1,6 +1,7 @@
#include <Common/ThreadPool.h>
#include <Common/Exception.h>
#include <cassert>
#include <type_traits>
#include <Poco/Util/Application.h>
@ -11,6 +12,7 @@ namespace DB
namespace ErrorCodes
{
extern const int CANNOT_SCHEDULE_TASK;
extern const int LOGICAL_ERROR;
}
}
@ -263,17 +265,25 @@ void ThreadPoolImpl<Thread>::worker(typename std::list<Thread>::iterator thread_
template class ThreadPoolImpl<std::thread>;
template class ThreadPoolImpl<ThreadFromGlobalPool>;
std::unique_ptr<GlobalThreadPool> GlobalThreadPool::the_instance;
void GlobalThreadPool::initialize(size_t max_threads)
{
// There should be an assert, but we can't add it because of the unit tests...
// assert(!the_instance);
the_instance.reset(new GlobalThreadPool(max_threads,
1000 /*max_free_threads*/, 10000 /*max_queue_size*/,
false /*shutdown_on_exception*/));
}
GlobalThreadPool & GlobalThreadPool::instance()
{
const Poco::Util::LayeredConfiguration & config = Poco::Util::Application::instance().config();
if (!the_instance)
{
throw DB::Exception(DB::ErrorCodes::LOGICAL_ERROR,
"The global thread pool is not initalized");
}
UInt64 max_threads = config.getUInt64("max_thread_pool_size", 10000);
size_t max_free_threads = 1000;
size_t max_queue_size = 10000;
const bool shutdown_on_exception = false;
static GlobalThreadPool ret(max_threads, max_free_threads, max_queue_size, shutdown_on_exception);
return ret;
return *the_instance;
}

View File

@ -128,10 +128,16 @@ using FreeThreadPool = ThreadPoolImpl<std::thread>;
*/
class GlobalThreadPool : public FreeThreadPool, private boost::noncopyable
{
static std::unique_ptr<GlobalThreadPool> the_instance;
GlobalThreadPool(size_t max_threads_, size_t max_free_threads_,
size_t queue_size_, const bool shutdown_on_exception_)
: FreeThreadPool(max_threads_, max_free_threads_, queue_size_,
shutdown_on_exception_)
{}
public:
GlobalThreadPool(size_t max_threads_, size_t max_free_threads_, size_t queue_size_,
const bool shutdown_on_exception_) :
FreeThreadPool(max_threads_, max_free_threads_, queue_size_, shutdown_on_exception_) {}
static void initialize(size_t max_threads = 10000);
static GlobalThreadPool & instance();
};

View File

@ -13,6 +13,8 @@
TEST(ThreadPool, GlobalFull1)
{
GlobalThreadPool::initialize();
GlobalThreadPool & global_pool = GlobalThreadPool::instance();
static constexpr size_t capacity = 5;
@ -49,6 +51,8 @@ TEST(ThreadPool, GlobalFull1)
TEST(ThreadPool, GlobalFull2)
{
GlobalThreadPool::initialize();
GlobalThreadPool & global_pool = GlobalThreadPool::instance();
static constexpr size_t capacity = 5;