Merge pull request #30282 from kssenii/fix-local-less-threads

Less threads in clickhouse-local, fix Ok. printing
This commit is contained in:
Kseniia Sumarokova 2021-10-19 22:54:28 +03:00 committed by GitHub
commit 5324cc8359
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
9 changed files with 77 additions and 16 deletions

View File

@ -502,19 +502,16 @@ void LocalServer::processConfig()
format = config().getString("output-format", config().getString("format", is_interactive ? "PrettyCompact" : "TSV"));
insert_format = "Values";
/// Setting value from cmd arg overrides one from config
if (global_context->getSettingsRef().max_insert_block_size.changed)
insert_format_max_block_size = global_context->getSettingsRef().max_insert_block_size;
else
insert_format_max_block_size = config().getInt("insert_format_max_block_size", global_context->getSettingsRef().max_insert_block_size);
/// Skip networking
/// Sets external authenticators config (LDAP, Kerberos).
global_context->setExternalAuthenticatorsConfig(config());
global_context->initializeBackgroundExecutors();
setupUsers();
/// Limit on total number of concurrently executing queries.

View File

@ -919,7 +919,7 @@ if (ThreadFuzzer::instance().isEffective())
/// Initialize background executors after we load default_profile config.
/// This is needed to load proper values of background_pool_size etc.
global_context->initializeBackgroundExecutors();
global_context->initializeBackgroundExecutorsIfNeeded();
if (settings.async_insert_threads)
global_context->setAsynchronousInsertQueue(std::make_shared<AsynchronousInsertQueue>(

View File

@ -267,19 +267,19 @@ bool LocalConnection::poll(size_t)
}
}
if (state->is_finished && send_progress && !state->sent_progress)
{
state->sent_progress = true;
next_packet_type = Protocol::Server::Progress;
return true;
}
if (state->is_finished)
{
finishQuery();
return true;
}
if (send_progress && !state->sent_progress)
{
state->sent_progress = true;
next_packet_type = Protocol::Server::Progress;
return true;
}
if (state->block && state->block.value())
{
next_packet_type = Protocol::Server::Data;
@ -293,7 +293,8 @@ bool LocalConnection::pollImpl()
{
Block block;
auto next_read = pullBlock(block);
if (block)
if (block && !state->io.null_format)
{
state->block.emplace(block);
}

View File

@ -2971,8 +2971,12 @@ void Context::setAsynchronousInsertQueue(const std::shared_ptr<AsynchronousInser
shared->async_insert_queue = ptr;
}
void Context::initializeBackgroundExecutors()
void Context::initializeBackgroundExecutorsIfNeeded()
{
auto lock = getLock();
if (is_background_executors_initialized)
return;
const size_t max_merges_and_mutations = getSettingsRef().background_pool_size * getSettingsRef().background_merges_mutations_concurrency_ratio;
/// With this executor we can execute more tasks than threads we have
@ -3019,6 +3023,8 @@ void Context::initializeBackgroundExecutors()
LOG_INFO(shared->log, "Initialized background executor for common operations (e.g. clearing old parts) with num_threads={}, num_tasks={}",
getSettingsRef().background_common_pool_size, getSettingsRef().background_common_pool_size);
is_background_executors_initialized = true;
}

View File

@ -293,6 +293,8 @@ private:
/// A flag, used to distinguish between user query and internal query to a database engine (MaterializedPostgreSQL).
bool is_internal_query = false;
/// Has initializeBackgroundExecutors() method been executed?
bool is_background_executors_initialized = false;
public:
@ -867,7 +869,7 @@ public:
void setReadTaskCallback(ReadTaskCallback && callback);
/// Background executors related methods
void initializeBackgroundExecutors();
void initializeBackgroundExecutorsIfNeeded();
MergeMutateBackgroundExecutorPtr getMergeMutateExecutor() const;
OrdinaryBackgroundExecutorPtr getMovesExecutor() const;

View File

@ -161,7 +161,7 @@ void loadMetadata(ContextMutablePtr context, const String & default_database_nam
bool create_default_db_if_not_exists = !default_database_name.empty();
bool metadata_dir_for_default_db_already_exists = databases.count(default_database_name);
if (create_default_db_if_not_exists && !metadata_dir_for_default_db_already_exists)
databases.emplace(default_database_name, path + "/" + escapeForFileName(default_database_name));
databases.emplace(default_database_name, std::filesystem::path(path) / escapeForFileName(default_database_name));
TablesLoader::Databases loaded_databases;
for (const auto & [name, db_path] : databases)

View File

@ -205,6 +205,8 @@ MergeTreeData::MergeTreeData(
, background_operations_assignee(*this, BackgroundJobsAssignee::Type::DataProcessing, getContext())
, background_moves_assignee(*this, BackgroundJobsAssignee::Type::Moving, getContext())
{
context_->getGlobalContext()->initializeBackgroundExecutorsIfNeeded();
const auto settings = getSettings();
allow_nullable_key = attach || settings->allow_nullable_key;

View File

@ -0,0 +1,53 @@
#!/usr/bin/expect -f
# Tags: no-fasttest
log_user 0
set timeout 20
match_max 100000
# A default timeout action is to fail
expect_after {
timeout {
exit 1
}
}
set basedir [file dirname $argv0]
spawn bash -c "source $basedir/../shell_config.sh ; \$CLICKHOUSE_LOCAL --disable_suggestion"
expect ":) "
send -- "drop table if exists t\r"
expect "Ok."
send -- "create table t engine=MergeTree() order by tuple() as select 1\r"
expect "Ok."
send -- "set optimize_on_insert = 0\r"
expect "Ok."
send -- "drop table if exists tt\r"
expect "Ok."
send -- "create table tt (date Date, version UInt64, val UInt64) engine = ReplacingMergeTree(version) partition by date order by date\r"
expect "Ok."
send -- "insert into tt values ('2020-01-01', 2, 2), ('2020-01-01', 1, 1)\r"
expect "Ok."
send -- "insert into tt values ('2020-01-01', 0, 0)\r"
expect "Ok."
send -- "OPTIMIZE TABLE tt\r"
expect "Ok."
send -- "select * from tt order by version format TSV\r"
expect "2020-01-01\t2\t2"
send -- "drop table tt\r"
expect "Ok."
send -- "drop table t\r"
expect "Ok."
send -- "\4"
expect eof