diff --git a/programs/client/Client.cpp b/programs/client/Client.cpp index 5e0bc081526..b778a68a06a 100644 --- a/programs/client/Client.cpp +++ b/programs/client/Client.cpp @@ -439,6 +439,9 @@ private: registerFunctions(); registerAggregateFunctions(); + // Needed for parallel parsing. + GlobalThreadPool::initialize(); + /// Batch mode is enabled if one of the following is true: /// - -e (--query) command line option is present. /// The value of the option is used as the text of query (or of multiple queries). diff --git a/programs/obfuscator/Obfuscator.cpp b/programs/obfuscator/Obfuscator.cpp index f3ac0549573..761373c2212 100644 --- a/programs/obfuscator/Obfuscator.cpp +++ b/programs/obfuscator/Obfuscator.cpp @@ -1040,6 +1040,9 @@ try return 0; } + // Needed for parallel parsing. + GlobalThreadPool::initialize(); + UInt64 seed = sipHash64(options["seed"].as()); std::string structure = options["structure"].as(); diff --git a/programs/server/Server.cpp b/programs/server/Server.cpp index 9a5dc55ded2..17756ea232f 100644 --- a/programs/server/Server.cpp +++ b/programs/server/Server.cpp @@ -431,6 +431,8 @@ int Server::main(const std::vector & /*args*/) DateLUT::instance(); LOG_TRACE(log, "Initialized DateLUT with time zone '{}'.", DateLUT::instance().getTimeZone()); + /// Initialize global thread pool + GlobalThreadPool::initialize(config().getUInt("max_thread_pool_size", 10000)); /// Storage with temporary data for processing of heavy queries. { diff --git a/src/Common/ThreadPool.cpp b/src/Common/ThreadPool.cpp index 3a669056f21..840ef7dd283 100644 --- a/src/Common/ThreadPool.cpp +++ b/src/Common/ThreadPool.cpp @@ -1,6 +1,7 @@ #include #include +#include #include #include @@ -11,6 +12,7 @@ namespace DB namespace ErrorCodes { extern const int CANNOT_SCHEDULE_TASK; + extern const int LOGICAL_ERROR; } } @@ -263,17 +265,25 @@ void ThreadPoolImpl::worker(typename std::list::iterator thread_ template class ThreadPoolImpl; template class ThreadPoolImpl; +std::unique_ptr GlobalThreadPool::the_instance; + +void GlobalThreadPool::initialize(size_t max_threads) +{ + // There should be an assert, but we can't add it because of the unit tests... + // assert(!the_instance); + + the_instance.reset(new GlobalThreadPool(max_threads, + 1000 /*max_free_threads*/, 10000 /*max_queue_size*/, + false /*shutdown_on_exception*/)); +} GlobalThreadPool & GlobalThreadPool::instance() { - const Poco::Util::LayeredConfiguration & config = Poco::Util::Application::instance().config(); + if (!the_instance) + { + throw DB::Exception(DB::ErrorCodes::LOGICAL_ERROR, + "The global thread pool is not initalized"); + } - UInt64 max_threads = config.getUInt64("max_thread_pool_size", 10000); - size_t max_free_threads = 1000; - size_t max_queue_size = 10000; - const bool shutdown_on_exception = false; - - static GlobalThreadPool ret(max_threads, max_free_threads, max_queue_size, shutdown_on_exception); - - return ret; + return *the_instance; } diff --git a/src/Common/ThreadPool.h b/src/Common/ThreadPool.h index 3d1169d618d..c1304051ea7 100644 --- a/src/Common/ThreadPool.h +++ b/src/Common/ThreadPool.h @@ -128,10 +128,16 @@ using FreeThreadPool = ThreadPoolImpl; */ class GlobalThreadPool : public FreeThreadPool, private boost::noncopyable { + static std::unique_ptr the_instance; + + GlobalThreadPool(size_t max_threads_, size_t max_free_threads_, + size_t queue_size_, const bool shutdown_on_exception_) + : FreeThreadPool(max_threads_, max_free_threads_, queue_size_, + shutdown_on_exception_) + {} + public: - GlobalThreadPool(size_t max_threads_, size_t max_free_threads_, size_t queue_size_, - const bool shutdown_on_exception_) : - FreeThreadPool(max_threads_, max_free_threads_, queue_size_, shutdown_on_exception_) {} + static void initialize(size_t max_threads = 10000); static GlobalThreadPool & instance(); }; diff --git a/src/Common/tests/gtest_thread_pool_global_full.cpp b/src/Common/tests/gtest_thread_pool_global_full.cpp index 583d43be1bb..2fc291c1e1a 100644 --- a/src/Common/tests/gtest_thread_pool_global_full.cpp +++ b/src/Common/tests/gtest_thread_pool_global_full.cpp @@ -13,6 +13,8 @@ TEST(ThreadPool, GlobalFull1) { + GlobalThreadPool::initialize(); + GlobalThreadPool & global_pool = GlobalThreadPool::instance(); static constexpr size_t capacity = 5; @@ -49,6 +51,8 @@ TEST(ThreadPool, GlobalFull1) TEST(ThreadPool, GlobalFull2) { + GlobalThreadPool::initialize(); + GlobalThreadPool & global_pool = GlobalThreadPool::instance(); static constexpr size_t capacity = 5;