Less threads in local, fix Ok. printing

This commit is contained in:
kssenii 2021-10-16 13:17:00 +03:00
parent d1138a8a25
commit 9525437499
7 changed files with 84 additions and 12 deletions

View File

@ -514,19 +514,16 @@ void LocalServer::processConfig()
format = config().getString("output-format", config().getString("format", is_interactive ? "PrettyCompact" : "TSV")); format = config().getString("output-format", config().getString("format", is_interactive ? "PrettyCompact" : "TSV"));
insert_format = "Values"; insert_format = "Values";
/// Setting value from cmd arg overrides one from config /// Setting value from cmd arg overrides one from config
if (global_context->getSettingsRef().max_insert_block_size.changed) if (global_context->getSettingsRef().max_insert_block_size.changed)
insert_format_max_block_size = global_context->getSettingsRef().max_insert_block_size; insert_format_max_block_size = global_context->getSettingsRef().max_insert_block_size;
else else
insert_format_max_block_size = config().getInt("insert_format_max_block_size", global_context->getSettingsRef().max_insert_block_size); insert_format_max_block_size = config().getInt("insert_format_max_block_size", global_context->getSettingsRef().max_insert_block_size);
/// Skip networking
/// Sets external authenticators config (LDAP, Kerberos). /// Sets external authenticators config (LDAP, Kerberos).
global_context->setExternalAuthenticatorsConfig(config()); global_context->setExternalAuthenticatorsConfig(config());
global_context->initializeBackgroundExecutors();
setupUsers(); setupUsers();
/// Limit on total number of concurrently executing queries. /// Limit on total number of concurrently executing queries.

View File

@ -266,19 +266,19 @@ bool LocalConnection::poll(size_t)
} }
} }
if (state->is_finished && send_progress && !state->sent_progress)
{
state->sent_progress = true;
next_packet_type = Protocol::Server::Progress;
return true;
}
if (state->is_finished) if (state->is_finished)
{ {
finishQuery(); finishQuery();
return true; return true;
} }
if (send_progress && !state->sent_progress)
{
state->sent_progress = true;
next_packet_type = Protocol::Server::Progress;
return true;
}
if (state->block && state->block.value()) if (state->block && state->block.value())
{ {
next_packet_type = Protocol::Server::Data; next_packet_type = Protocol::Server::Data;
@ -292,7 +292,8 @@ bool LocalConnection::pollImpl()
{ {
Block block; Block block;
auto next_read = pullBlock(block); auto next_read = pullBlock(block);
if (block)
if (block && !state->io.null_format)
{ {
state->block.emplace(block); state->block.emplace(block);
} }

View File

@ -2895,8 +2895,15 @@ void Context::setAsynchronousInsertQueue(const std::shared_ptr<AsynchronousInser
shared->async_insert_queue = ptr; shared->async_insert_queue = ptr;
} }
bool Context::isBackgroundExecutorsInitialized() const
{
return is_background_executors_initialized;
}
void Context::initializeBackgroundExecutors() void Context::initializeBackgroundExecutors()
{ {
assert(!is_background_executors_initialized);
const size_t max_merges_and_mutations = getSettingsRef().background_pool_size * getSettingsRef().background_merges_mutations_concurrency_ratio; const size_t max_merges_and_mutations = getSettingsRef().background_pool_size * getSettingsRef().background_merges_mutations_concurrency_ratio;
/// With this executor we can execute more tasks than threads we have /// With this executor we can execute more tasks than threads we have
@ -2943,6 +2950,8 @@ void Context::initializeBackgroundExecutors()
LOG_INFO(shared->log, "Initialized background executor for common operations (e.g. clearing old parts) with num_threads={}, num_tasks={}", LOG_INFO(shared->log, "Initialized background executor for common operations (e.g. clearing old parts) with num_threads={}, num_tasks={}",
getSettingsRef().background_common_pool_size, getSettingsRef().background_common_pool_size); getSettingsRef().background_common_pool_size, getSettingsRef().background_common_pool_size);
is_background_executors_initialized = true;
} }

View File

@ -293,6 +293,8 @@ private:
/// A flag, used to distinguish between user query and internal query to a database engine (MaterializedPostgreSQL). /// A flag, used to distinguish between user query and internal query to a database engine (MaterializedPostgreSQL).
bool is_internal_query = false; bool is_internal_query = false;
/// Has initializeBackgroundExecutors() method been executed?
bool is_background_executors_initialized = false;
public: public:
@ -862,6 +864,7 @@ public:
/// Background executors related methods /// Background executors related methods
void initializeBackgroundExecutors(); void initializeBackgroundExecutors();
bool isBackgroundExecutorsInitialized() const;
MergeMutateBackgroundExecutorPtr getMergeMutateExecutor() const; MergeMutateBackgroundExecutorPtr getMergeMutateExecutor() const;
OrdinaryBackgroundExecutorPtr getMovesExecutor() const; OrdinaryBackgroundExecutorPtr getMovesExecutor() const;

View File

@ -833,6 +833,15 @@ BlockIO InterpreterCreateQuery::createTable(ASTCreateQuery & create)
String current_database = getContext()->getCurrentDatabase(); String current_database = getContext()->getCurrentDatabase();
auto database_name = create.database.empty() ? current_database : create.database; auto database_name = create.database.empty() ? current_database : create.database;
auto global_context = getContext()->getGlobalContext();
if (global_context
&& global_context->getApplicationType() == Context::ApplicationType::LOCAL
&& !global_context->isBackgroundExecutorsInitialized()
&& create.storage && endsWith(create.storage->engine->name, "MergeTree"))
{
global_context->initializeBackgroundExecutors();
}
// If this is a stub ATTACH query, read the query definition from the database // If this is a stub ATTACH query, read the query definition from the database
if (create.attach && !create.storage && !create.columns_list) if (create.attach && !create.storage && !create.columns_list)
{ {

View File

@ -0,0 +1,53 @@
#!/usr/bin/expect -f
# Tags: no-fasttest
log_user 0
set timeout 20
match_max 100000
# A default timeout action is to fail
expect_after {
timeout {
exit 1
}
}
set basedir [file dirname $argv0]
spawn bash -c "source $basedir/../shell_config.sh ; \$CLICKHOUSE_LOCAL --disable_suggestion"
expect ":) "
send -- "drop table if exists t\r"
expect "Ok."
send -- "create table t engine=MergeTree() order by tuple() as select 1\r"
expect "Ok."
send -- "set optimize_on_insert = 0\r"
expect "Ok."
send -- "drop table if exists tt\r"
expect "Ok."
send -- "create table tt (date Date, version UInt64, val UInt64) engine = ReplacingMergeTree(version) partition by date order by date\r"
expect "Ok."
send -- "insert into tt values ('2020-01-01', 2, 2), ('2020-01-01', 1, 1)\r"
expect "Ok."
send -- "insert into tt values ('2020-01-01', 0, 0)\r"
expect "Ok."
send -- "OPTIMIZE TABLE tt\r"
expect "Ok."
send -- "select * from tt order by version format TSV\r"
expect "2020-01-01\t2\t2"
send -- "drop table tt\r"
expect "Ok."
send -- "drop table t\r"
expect "Ok."
send -- "\4"
expect eof