From 9525437499311d154198bc9b8e1e22d95986c600 Mon Sep 17 00:00:00 2001 From: kssenii Date: Sat, 16 Oct 2021 13:17:00 +0300 Subject: [PATCH 1/4] Less threads in local, fix Ok. printing --- programs/local/LocalServer.cpp | 5 +- src/Client/LocalConnection.cpp | 17 +++--- src/Interpreters/Context.cpp | 9 ++++ src/Interpreters/Context.h | 3 ++ src/Interpreters/InterpreterCreateQuery.cpp | 9 ++++ .../02049_clickhouse_local_merge_tree.expect | 53 +++++++++++++++++++ ...2049_clickhouse_local_merge_tree.reference | 0 7 files changed, 84 insertions(+), 12 deletions(-) create mode 100755 tests/queries/0_stateless/02049_clickhouse_local_merge_tree.expect create mode 100644 tests/queries/0_stateless/02049_clickhouse_local_merge_tree.reference diff --git a/programs/local/LocalServer.cpp b/programs/local/LocalServer.cpp index 30082caaac1..cdd5ae13f99 100644 --- a/programs/local/LocalServer.cpp +++ b/programs/local/LocalServer.cpp @@ -514,19 +514,16 @@ void LocalServer::processConfig() format = config().getString("output-format", config().getString("format", is_interactive ? "PrettyCompact" : "TSV")); insert_format = "Values"; + /// Setting value from cmd arg overrides one from config if (global_context->getSettingsRef().max_insert_block_size.changed) insert_format_max_block_size = global_context->getSettingsRef().max_insert_block_size; else insert_format_max_block_size = config().getInt("insert_format_max_block_size", global_context->getSettingsRef().max_insert_block_size); - /// Skip networking - /// Sets external authenticators config (LDAP, Kerberos). global_context->setExternalAuthenticatorsConfig(config()); - global_context->initializeBackgroundExecutors(); - setupUsers(); /// Limit on total number of concurrently executing queries. diff --git a/src/Client/LocalConnection.cpp b/src/Client/LocalConnection.cpp index efd302622dd..e1324146330 100644 --- a/src/Client/LocalConnection.cpp +++ b/src/Client/LocalConnection.cpp @@ -266,19 +266,19 @@ bool LocalConnection::poll(size_t) } } - if (state->is_finished && send_progress && !state->sent_progress) - { - state->sent_progress = true; - next_packet_type = Protocol::Server::Progress; - return true; - } - if (state->is_finished) { finishQuery(); return true; } + if (send_progress && !state->sent_progress) + { + state->sent_progress = true; + next_packet_type = Protocol::Server::Progress; + return true; + } + if (state->block && state->block.value()) { next_packet_type = Protocol::Server::Data; @@ -292,7 +292,8 @@ bool LocalConnection::pollImpl() { Block block; auto next_read = pullBlock(block); - if (block) + + if (block && !state->io.null_format) { state->block.emplace(block); } diff --git a/src/Interpreters/Context.cpp b/src/Interpreters/Context.cpp index 98acc786aa9..0ef92eaed39 100644 --- a/src/Interpreters/Context.cpp +++ b/src/Interpreters/Context.cpp @@ -2895,8 +2895,15 @@ void Context::setAsynchronousInsertQueue(const std::shared_ptrasync_insert_queue = ptr; } +bool Context::isBackgroundExecutorsInitialized() const +{ + return is_background_executors_initialized; +} + void Context::initializeBackgroundExecutors() { + assert(!is_background_executors_initialized); + const size_t max_merges_and_mutations = getSettingsRef().background_pool_size * getSettingsRef().background_merges_mutations_concurrency_ratio; /// With this executor we can execute more tasks than threads we have @@ -2943,6 +2950,8 @@ void Context::initializeBackgroundExecutors() LOG_INFO(shared->log, "Initialized background executor for common operations (e.g. clearing old parts) with num_threads={}, num_tasks={}", getSettingsRef().background_common_pool_size, getSettingsRef().background_common_pool_size); + + is_background_executors_initialized = true; } diff --git a/src/Interpreters/Context.h b/src/Interpreters/Context.h index 247dbc74f22..15c4376aa6d 100644 --- a/src/Interpreters/Context.h +++ b/src/Interpreters/Context.h @@ -293,6 +293,8 @@ private: /// A flag, used to distinguish between user query and internal query to a database engine (MaterializedPostgreSQL). bool is_internal_query = false; + /// Has initializeBackgroundExecutors() method been executed? + bool is_background_executors_initialized = false; public: @@ -862,6 +864,7 @@ public: /// Background executors related methods void initializeBackgroundExecutors(); + bool isBackgroundExecutorsInitialized() const; MergeMutateBackgroundExecutorPtr getMergeMutateExecutor() const; OrdinaryBackgroundExecutorPtr getMovesExecutor() const; diff --git a/src/Interpreters/InterpreterCreateQuery.cpp b/src/Interpreters/InterpreterCreateQuery.cpp index 6d38c55bd62..5b993bce724 100644 --- a/src/Interpreters/InterpreterCreateQuery.cpp +++ b/src/Interpreters/InterpreterCreateQuery.cpp @@ -833,6 +833,15 @@ BlockIO InterpreterCreateQuery::createTable(ASTCreateQuery & create) String current_database = getContext()->getCurrentDatabase(); auto database_name = create.database.empty() ? current_database : create.database; + auto global_context = getContext()->getGlobalContext(); + if (global_context + && global_context->getApplicationType() == Context::ApplicationType::LOCAL + && !global_context->isBackgroundExecutorsInitialized() + && create.storage && endsWith(create.storage->engine->name, "MergeTree")) + { + global_context->initializeBackgroundExecutors(); + } + // If this is a stub ATTACH query, read the query definition from the database if (create.attach && !create.storage && !create.columns_list) { diff --git a/tests/queries/0_stateless/02049_clickhouse_local_merge_tree.expect b/tests/queries/0_stateless/02049_clickhouse_local_merge_tree.expect new file mode 100755 index 00000000000..17b98b077d5 --- /dev/null +++ b/tests/queries/0_stateless/02049_clickhouse_local_merge_tree.expect @@ -0,0 +1,53 @@ +#!/usr/bin/expect -f +# Tags: no-fasttest + +log_user 0 +set timeout 20 +match_max 100000 + +# A default timeout action is to fail +expect_after { + timeout { + exit 1 + } + +} + +set basedir [file dirname $argv0] +spawn bash -c "source $basedir/../shell_config.sh ; \$CLICKHOUSE_LOCAL --disable_suggestion" +expect ":) " + +send -- "drop table if exists t\r" +expect "Ok." + +send -- "create table t engine=MergeTree() order by tuple() as select 1\r" +expect "Ok." + +send -- "set optimize_on_insert = 0\r" +expect "Ok." + +send -- "drop table if exists tt\r" +expect "Ok." + +send -- "create table tt (date Date, version UInt64, val UInt64) engine = ReplacingMergeTree(version) partition by date order by date\r" +expect "Ok." + +send -- "insert into tt values ('2020-01-01', 2, 2), ('2020-01-01', 1, 1)\r" +expect "Ok." + +send -- "insert into tt values ('2020-01-01', 0, 0)\r" +expect "Ok." + +send -- "OPTIMIZE TABLE tt\r" +expect "Ok." + +send -- "select * from tt order by version format TSV\r" +expect "2020-01-01\t2\t2" + +send -- "drop table tt\r" +expect "Ok." +send -- "drop table t\r" +expect "Ok." + +send -- "\4" +expect eof diff --git a/tests/queries/0_stateless/02049_clickhouse_local_merge_tree.reference b/tests/queries/0_stateless/02049_clickhouse_local_merge_tree.reference new file mode 100644 index 00000000000..e69de29bb2d From d34d752688ec706694a05c5fd0c568c651b57c14 Mon Sep 17 00:00:00 2001 From: kssenii Date: Sat, 16 Oct 2021 18:37:46 +0000 Subject: [PATCH 2/4] Fix tests --- programs/local/LocalServer.cpp | 5 +++++ 1 file changed, 5 insertions(+) diff --git a/programs/local/LocalServer.cpp b/programs/local/LocalServer.cpp index cdd5ae13f99..0c5f64ea913 100644 --- a/programs/local/LocalServer.cpp +++ b/programs/local/LocalServer.cpp @@ -579,6 +579,11 @@ void LocalServer::processConfig() { String path = global_context->getPath(); + /// When tables are loaded from .sql we initialize background executors + /// regardless there are MergeTree tables or not, because no better place was found. + /// In other cases it will be initialized only when there are mergeTree tables. + global_context->initializeBackgroundExecutors(); + /// Lock path directory before read status.emplace(fs::path(path) / "status", StatusFile::write_full_info); From ab9d5d8cc789438ab0b01f6b0a4d712e190fed6f Mon Sep 17 00:00:00 2001 From: kssenii Date: Mon, 18 Oct 2021 06:06:38 +0000 Subject: [PATCH 3/4] Better --- programs/local/LocalServer.cpp | 5 ----- src/Databases/DatabaseOnDisk.cpp | 9 +++++++++ src/Interpreters/loadMetadata.cpp | 2 +- 3 files changed, 10 insertions(+), 6 deletions(-) diff --git a/programs/local/LocalServer.cpp b/programs/local/LocalServer.cpp index 0c5f64ea913..cdd5ae13f99 100644 --- a/programs/local/LocalServer.cpp +++ b/programs/local/LocalServer.cpp @@ -579,11 +579,6 @@ void LocalServer::processConfig() { String path = global_context->getPath(); - /// When tables are loaded from .sql we initialize background executors - /// regardless there are MergeTree tables or not, because no better place was found. - /// In other cases it will be initialized only when there are mergeTree tables. - global_context->initializeBackgroundExecutors(); - /// Lock path directory before read status.emplace(fs::path(path) / "status", StatusFile::write_full_info); diff --git a/src/Databases/DatabaseOnDisk.cpp b/src/Databases/DatabaseOnDisk.cpp index 97e59f53f64..a71d539e3c5 100644 --- a/src/Databases/DatabaseOnDisk.cpp +++ b/src/Databases/DatabaseOnDisk.cpp @@ -53,6 +53,15 @@ std::pair createTableFromAST( ast_create_query.attach = true; ast_create_query.database = database_name; + auto global_context = context->getGlobalContext(); + if (global_context + && global_context->getApplicationType() == Context::ApplicationType::LOCAL + && !global_context->isBackgroundExecutorsInitialized() + && ast_create_query.storage && endsWith(ast_create_query.storage->engine->name, "MergeTree")) + { + global_context->initializeBackgroundExecutors(); + } + if (ast_create_query.as_table_function) { const auto & factory = TableFunctionFactory::instance(); diff --git a/src/Interpreters/loadMetadata.cpp b/src/Interpreters/loadMetadata.cpp index 6a3db48e835..65b2065b2ad 100644 --- a/src/Interpreters/loadMetadata.cpp +++ b/src/Interpreters/loadMetadata.cpp @@ -161,7 +161,7 @@ void loadMetadata(ContextMutablePtr context, const String & default_database_nam bool create_default_db_if_not_exists = !default_database_name.empty(); bool metadata_dir_for_default_db_already_exists = databases.count(default_database_name); if (create_default_db_if_not_exists && !metadata_dir_for_default_db_already_exists) - databases.emplace(default_database_name, path + "/" + escapeForFileName(default_database_name)); + databases.emplace(default_database_name, std::filesystem::path(path) / escapeForFileName(default_database_name)); TablesLoader::Databases loaded_databases; for (const auto & [name, db_path] : databases) From e53335bc6fc061ce47a40b94d3b5a91ac042717f Mon Sep 17 00:00:00 2001 From: kssenii Date: Tue, 19 Oct 2021 08:19:43 +0000 Subject: [PATCH 4/4] Better way --- programs/server/Server.cpp | 2 +- src/Databases/DatabaseOnDisk.cpp | 9 --------- src/Interpreters/Context.cpp | 11 ++++------- src/Interpreters/Context.h | 3 +-- src/Interpreters/InterpreterCreateQuery.cpp | 9 --------- src/Storages/MergeTree/MergeTreeData.cpp | 2 ++ 6 files changed, 8 insertions(+), 28 deletions(-) diff --git a/programs/server/Server.cpp b/programs/server/Server.cpp index 2b526608715..bbd9af1e97e 100644 --- a/programs/server/Server.cpp +++ b/programs/server/Server.cpp @@ -919,7 +919,7 @@ if (ThreadFuzzer::instance().isEffective()) /// Initialize background executors after we load default_profile config. /// This is needed to load proper values of background_pool_size etc. - global_context->initializeBackgroundExecutors(); + global_context->initializeBackgroundExecutorsIfNeeded(); if (settings.async_insert_threads) global_context->setAsynchronousInsertQueue(std::make_shared( diff --git a/src/Databases/DatabaseOnDisk.cpp b/src/Databases/DatabaseOnDisk.cpp index a71d539e3c5..97e59f53f64 100644 --- a/src/Databases/DatabaseOnDisk.cpp +++ b/src/Databases/DatabaseOnDisk.cpp @@ -53,15 +53,6 @@ std::pair createTableFromAST( ast_create_query.attach = true; ast_create_query.database = database_name; - auto global_context = context->getGlobalContext(); - if (global_context - && global_context->getApplicationType() == Context::ApplicationType::LOCAL - && !global_context->isBackgroundExecutorsInitialized() - && ast_create_query.storage && endsWith(ast_create_query.storage->engine->name, "MergeTree")) - { - global_context->initializeBackgroundExecutors(); - } - if (ast_create_query.as_table_function) { const auto & factory = TableFunctionFactory::instance(); diff --git a/src/Interpreters/Context.cpp b/src/Interpreters/Context.cpp index 1602e6a6a31..bbad7e782ed 100644 --- a/src/Interpreters/Context.cpp +++ b/src/Interpreters/Context.cpp @@ -2895,14 +2895,11 @@ void Context::setAsynchronousInsertQueue(const std::shared_ptrasync_insert_queue = ptr; } -bool Context::isBackgroundExecutorsInitialized() const +void Context::initializeBackgroundExecutorsIfNeeded() { - return is_background_executors_initialized; -} - -void Context::initializeBackgroundExecutors() -{ - assert(!is_background_executors_initialized); + auto lock = getLock(); + if (is_background_executors_initialized) + return; const size_t max_merges_and_mutations = getSettingsRef().background_pool_size * getSettingsRef().background_merges_mutations_concurrency_ratio; diff --git a/src/Interpreters/Context.h b/src/Interpreters/Context.h index 93be367e46d..b20274c2cb8 100644 --- a/src/Interpreters/Context.h +++ b/src/Interpreters/Context.h @@ -862,8 +862,7 @@ public: void setReadTaskCallback(ReadTaskCallback && callback); /// Background executors related methods - void initializeBackgroundExecutors(); - bool isBackgroundExecutorsInitialized() const; + void initializeBackgroundExecutorsIfNeeded(); MergeMutateBackgroundExecutorPtr getMergeMutateExecutor() const; OrdinaryBackgroundExecutorPtr getMovesExecutor() const; diff --git a/src/Interpreters/InterpreterCreateQuery.cpp b/src/Interpreters/InterpreterCreateQuery.cpp index 5b993bce724..6d38c55bd62 100644 --- a/src/Interpreters/InterpreterCreateQuery.cpp +++ b/src/Interpreters/InterpreterCreateQuery.cpp @@ -833,15 +833,6 @@ BlockIO InterpreterCreateQuery::createTable(ASTCreateQuery & create) String current_database = getContext()->getCurrentDatabase(); auto database_name = create.database.empty() ? current_database : create.database; - auto global_context = getContext()->getGlobalContext(); - if (global_context - && global_context->getApplicationType() == Context::ApplicationType::LOCAL - && !global_context->isBackgroundExecutorsInitialized() - && create.storage && endsWith(create.storage->engine->name, "MergeTree")) - { - global_context->initializeBackgroundExecutors(); - } - // If this is a stub ATTACH query, read the query definition from the database if (create.attach && !create.storage && !create.columns_list) { diff --git a/src/Storages/MergeTree/MergeTreeData.cpp b/src/Storages/MergeTree/MergeTreeData.cpp index 10fa18186ee..8b03c1e614d 100644 --- a/src/Storages/MergeTree/MergeTreeData.cpp +++ b/src/Storages/MergeTree/MergeTreeData.cpp @@ -205,6 +205,8 @@ MergeTreeData::MergeTreeData( , background_operations_assignee(*this, BackgroundJobsAssignee::Type::DataProcessing, getContext()) , background_moves_assignee(*this, BackgroundJobsAssignee::Type::Moving, getContext()) { + context_->getGlobalContext()->initializeBackgroundExecutorsIfNeeded(); + const auto settings = getSettings(); allow_nullable_key = attach || settings->allow_nullable_key;