add setting to enable planning

This commit is contained in:
yariks5s 2024-05-16 17:57:01 +00:00
parent 0619b0921f
commit fed6c65858
5 changed files with 145 additions and 34 deletions

View File

@ -668,6 +668,7 @@ class IColumn;
M(Bool, mutations_execute_nondeterministic_on_initiator, false, "If true nondeterministic function are executed on initiator and replaced to literals in UPDATE and DELETE queries", 0) \ M(Bool, mutations_execute_nondeterministic_on_initiator, false, "If true nondeterministic function are executed on initiator and replaced to literals in UPDATE and DELETE queries", 0) \
M(Bool, mutations_execute_subqueries_on_initiator, false, "If true scalar subqueries are executed on initiator and replaced to literals in UPDATE and DELETE queries", 0) \ M(Bool, mutations_execute_subqueries_on_initiator, false, "If true scalar subqueries are executed on initiator and replaced to literals in UPDATE and DELETE queries", 0) \
M(UInt64, mutations_max_literal_size_to_replace, 16384, "The maximum size of serialized literal in bytes to replace in UPDATE and DELETE queries", 0) \ M(UInt64, mutations_max_literal_size_to_replace, 16384, "The maximum size of serialized literal in bytes to replace in UPDATE and DELETE queries", 0) \
M(Bool, allow_insert_threads_reduction_optimizaion, false, "If true it allows to apply additional single-insert-transformer for insertion of data", 0) \
\ \
M(Float, create_replicated_merge_tree_fault_injection_probability, 0.0f, "The probability of a fault injection during table creation after creating metadata in ZooKeeper", 0) \ M(Float, create_replicated_merge_tree_fault_injection_probability, 0.0f, "The probability of a fault injection during table creation after creating metadata in ZooKeeper", 0) \
\ \

View File

@ -620,6 +620,8 @@ BlockIO InterpreterInsertQuery::execute()
{ {
bool table_prefers_large_blocks = table->prefersLargeBlocks(); bool table_prefers_large_blocks = table->prefersLargeBlocks();
if (settings.allow_insert_threads_reduction_optimizaion)
{
pipeline.addTransform(std::make_shared<PlanSquashingTransform>( pipeline.addTransform(std::make_shared<PlanSquashingTransform>(
header, header,
table_prefers_large_blocks ? settings.min_insert_block_size_rows : settings.max_block_size, table_prefers_large_blocks ? settings.min_insert_block_size_rows : settings.max_block_size,
@ -634,6 +636,17 @@ BlockIO InterpreterInsertQuery::execute()
table_prefers_large_blocks ? settings.min_insert_block_size_bytes : 0ULL); table_prefers_large_blocks ? settings.min_insert_block_size_bytes : 0ULL);
}); });
} }
else
{
pipeline.addSimpleTransform([&](const Block & in_header) -> ProcessorPtr
{
return std::make_shared<SimpleSquashingTransform>(
in_header,
table_prefers_large_blocks ? settings.min_insert_block_size_rows : settings.max_block_size,
table_prefers_large_blocks ? settings.min_insert_block_size_bytes : 0ULL);
});
}
}
size_t num_select_threads = pipeline.getNumThreads(); size_t num_select_threads = pipeline.getNumThreads();
@ -684,6 +697,8 @@ BlockIO InterpreterInsertQuery::execute()
{ {
bool table_prefers_large_blocks = table->prefersLargeBlocks(); bool table_prefers_large_blocks = table->prefersLargeBlocks();
if (settings.allow_insert_threads_reduction_optimizaion)
{
auto squashing = std::make_shared<ApplySquashingTransform>( auto squashing = std::make_shared<ApplySquashingTransform>(
chain.getInputHeader(), chain.getInputHeader(),
table_prefers_large_blocks ? settings.min_insert_block_size_rows : settings.max_block_size, table_prefers_large_blocks ? settings.min_insert_block_size_rows : settings.max_block_size,
@ -699,6 +714,17 @@ BlockIO InterpreterInsertQuery::execute()
chain.addSource(std::move(balancing)); chain.addSource(std::move(balancing));
} }
else
{
auto squashing = std::make_shared<SimpleSquashingTransform>(
chain.getInputHeader(),
table_prefers_large_blocks ? settings.min_insert_block_size_rows : settings.max_block_size,
table_prefers_large_blocks ? settings.min_insert_block_size_bytes : 0ULL);
chain.addSource(std::move(squashing));
}
}
auto context_ptr = getContext(); auto context_ptr = getContext();
auto counting = std::make_shared<CountingTransform>(chain.getInputHeader(), nullptr, context_ptr->getQuota()); auto counting = std::make_shared<CountingTransform>(chain.getInputHeader(), nullptr, context_ptr->getQuota());

View File

@ -372,6 +372,8 @@ std::optional<Chain> generateViewChain(
bool table_prefers_large_blocks = inner_table->prefersLargeBlocks(); bool table_prefers_large_blocks = inner_table->prefersLargeBlocks();
const auto & settings = insert_context->getSettingsRef(); const auto & settings = insert_context->getSettingsRef();
if (settings.allow_insert_threads_reduction_optimizaion)
{
out.addSource(std::make_shared<ApplySquashingTransform>( out.addSource(std::make_shared<ApplySquashingTransform>(
out.getInputHeader(), out.getInputHeader(),
table_prefers_large_blocks ? settings.min_insert_block_size_rows : settings.max_block_size, table_prefers_large_blocks ? settings.min_insert_block_size_rows : settings.max_block_size,
@ -383,6 +385,14 @@ std::optional<Chain> generateViewChain(
table_prefers_large_blocks ? settings.min_insert_block_size_bytes : 0ULL, table_prefers_large_blocks ? settings.min_insert_block_size_bytes : 0ULL,
1)); // Chain requires a single input 1)); // Chain requires a single input
} }
else
{
out.addSource(std::make_shared<SimpleSquashingTransform>(
out.getInputHeader(),
table_prefers_large_blocks ? settings.min_insert_block_size_rows : settings.max_block_size,
table_prefers_large_blocks ? settings.min_insert_block_size_bytes : 0ULL));
}
}
auto counting = std::make_shared<CountingTransform>(out.getInputHeader(), current_thread, insert_context->getQuota()); auto counting = std::make_shared<CountingTransform>(out.getInputHeader(), current_thread, insert_context->getQuota());
counting->setProcessListElement(insert_context->getProcessListElement()); counting->setProcessListElement(insert_context->getProcessListElement());

View File

@ -11,6 +11,7 @@
#include <Poco/Net/NetException.h> #include <Poco/Net/NetException.h>
#include <Poco/Net/SocketAddress.h> #include <Poco/Net/SocketAddress.h>
#include <Poco/Util/LayeredConfiguration.h> #include <Poco/Util/LayeredConfiguration.h>
#include <Common/escapeString.h>
#include <Common/CurrentThread.h> #include <Common/CurrentThread.h>
#include <Common/Stopwatch.h> #include <Common/Stopwatch.h>
#include <Common/NetException.h> #include <Common/NetException.h>
@ -42,6 +43,7 @@
#include <Common/logger_useful.h> #include <Common/logger_useful.h>
#include <Common/CurrentMetrics.h> #include <Common/CurrentMetrics.h>
#include <Common/thread_local_rng.h> #include <Common/thread_local_rng.h>
#include <Access/User.h>
#include <fmt/format.h> #include <fmt/format.h>
#include <Processors/Executors/PullingAsyncPipelineExecutor.h> #include <Processors/Executors/PullingAsyncPipelineExecutor.h>
@ -181,6 +183,7 @@ void validateClientInfo(const ClientInfo & session_client_info, const ClientInfo
namespace DB namespace DB
{ {
using Which = Field::Types::Which;
TCPHandler::TCPHandler( TCPHandler::TCPHandler(
IServer & server_, IServer & server_,
@ -1602,6 +1605,70 @@ void TCPHandler::sendHello()
nonce.emplace(thread_local_rng()); nonce.emplace(thread_local_rng());
writeIntBinary(nonce.value(), *out); writeIntBinary(nonce.value(), *out);
} }
/// If client is Clickhouse-client we will send server profile settings of this user
if (client_name == (std::string(VERSION_NAME) + " client"))
{
const auto & user = session->sessionContext()->getUser();
String query = fmt::format(
R"(SELECT setting_name, value FROM system.settings_profile_elements WHERE user_name = '{0}')",
escapeString(user->getName()));
const auto & res_const = executeQuery(query,server.context(), QueryFlags{ .internal = true }).second;
auto & res = const_cast<BlockIO &>(res_const);
PullingPipelineExecutor pulling_executor(res.pipeline);
Block block;
pulling_executor.pull(block);
/// filter data
std::map<String, Field> server_settings;
for (size_t row = 0; row < block.rows(); ++row)
{
size_t col_index = 0;
String name;
Field value_field;
for (const auto & name_value: block)
{
Field field;
name_value.column->get(row, field);
if (!field.isNull())
{
if (col_index == 0)
name = field.safeGet<String>();
else
value_field = field;
}
else
continue;
++col_index;
}
if (!name.empty())
server_settings[name] = value_field;
}
writeVarUInt(server_settings.size(), *out);
if (!server_settings.empty())
{
for (const auto & setting : server_settings)
{
writeStringBinary(setting.first, *out);
writeVarUInt(setting.second.getType(), *out);
switch (setting.second.getType())
{
case Which::UInt64:
writeVarUInt(setting.second.safeGet<UInt64>(), *out);break;
case Which::String:
writeStringBinary(setting.second.safeGet<String>(), *out);break;
case Which::Bool:
writeVarUInt(setting.second.get<UInt64>(), *out);break;
default:
break;
}
}
}
}
out->next(); out->next();
} }

View File

@ -311,8 +311,15 @@ Block ProjectionDescription::calculate(const Block & block, ContextPtr context)
builder.resize(1); builder.resize(1);
// Generate aggregated blocks with rows less or equal than the original block. // Generate aggregated blocks with rows less or equal than the original block.
// There should be only one output block after this transformation. // There should be only one output block after this transformation.
if (mut_context->getSettings().allow_insert_threads_reduction_optimizaion)
{
builder.addTransform(std::make_shared<PlanSquashingTransform>(builder.getHeader(), block.rows(), 0, 1)); builder.addTransform(std::make_shared<PlanSquashingTransform>(builder.getHeader(), block.rows(), 0, 1));
builder.addTransform(std::make_shared<ApplySquashingTransform>(builder.getHeader(), block.rows(), 0)); builder.addTransform(std::make_shared<ApplySquashingTransform>(builder.getHeader(), block.rows(), 0));
}
else
{
builder.addTransform(std::make_shared<SimpleSquashingTransform>(builder.getHeader(), block.rows(), 0));
}
auto pipeline = QueryPipelineBuilder::getPipeline(std::move(builder)); auto pipeline = QueryPipelineBuilder::getPipeline(std::move(builder));
PullingPipelineExecutor executor(pipeline); PullingPipelineExecutor executor(pipeline);