Merge branch 'master' into fix-dynamic-json-serialization-compatibility

This commit is contained in:
Pavel Kruglov 2024-11-12 16:18:55 +01:00 committed by GitHub
commit e0af2e9738
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
88 changed files with 1480 additions and 290 deletions

View File

@ -4773,7 +4773,7 @@ Result:
## toUTCTimestamp
Convert DateTime/DateTime64 type value from other time zone to UTC timezone timestamp
Convert DateTime/DateTime64 type value from other time zone to UTC timezone timestamp. This function is mainly included for compatibility with Apache Spark and similar frameworks.
**Syntax**
@ -4799,14 +4799,14 @@ SELECT toUTCTimestamp(toDateTime('2023-03-16'), 'Asia/Shanghai');
Result:
``` text
┌─toUTCTimestamp(toDateTime('2023-03-16'),'Asia/Shanghai')┐
┌─toUTCTimestamp(toDateTime('2023-03-16'), 'Asia/Shanghai')┐
│ 2023-03-15 16:00:00 │
└─────────────────────────────────────────────────────────┘
```
## fromUTCTimestamp
Convert DateTime/DateTime64 type value from UTC timezone to other time zone timestamp
Convert DateTime/DateTime64 type value from UTC timezone to other time zone timestamp. This function is mainly included for compatibility with Apache Spark and similar frameworks.
**Syntax**
@ -4832,7 +4832,7 @@ SELECT fromUTCTimestamp(toDateTime64('2023-03-16 10:00:00', 3), 'Asia/Shanghai')
Result:
``` text
┌─fromUTCTimestamp(toDateTime64('2023-03-16 10:00:00',3),'Asia/Shanghai')─┐
┌─fromUTCTimestamp(toDateTime64('2023-03-16 10:00:00',3), 'Asia/Shanghai')─┐
│ 2023-03-16 18:00:00.000 │
└─────────────────────────────────────────────────────────────────────────┘
```

View File

@ -279,7 +279,7 @@ For columns with a new or updated `MATERIALIZED` value expression, all existing
For columns with a new or updated `DEFAULT` value expression, the behavior depends on the ClickHouse version:
- In ClickHouse < v24.2, all existing rows are rewritten.
- ClickHouse >= v24.2 distinguishes if a row value in a column with `DEFAULT` value expression was explicitly specified when it was inserted, or not, i.e. calculated from the `DEFAULT` value expression. If the value was explicitly specified, ClickHouse keeps it as is. If the value was was calculated, ClickHouse changes it to the new or updated `MATERIALIZED` value expression.
- ClickHouse >= v24.2 distinguishes if a row value in a column with `DEFAULT` value expression was explicitly specified when it was inserted, or not, i.e. calculated from the `DEFAULT` value expression. If the value was explicitly specified, ClickHouse keeps it as is. If the value was calculated, ClickHouse changes it to the new or updated `MATERIALIZED` value expression.
Syntax:

View File

@ -48,9 +48,15 @@ ASTPtr JoinNode::toASTTableJoin() const
auto join_expression_ast = children[join_expression_child_index]->toAST();
if (is_using_join_expression)
join_ast->using_expression_list = std::move(join_expression_ast);
{
join_ast->using_expression_list = join_expression_ast;
join_ast->children.push_back(join_ast->using_expression_list);
}
else
join_ast->on_expression = std::move(join_expression_ast);
{
join_ast->on_expression = join_expression_ast;
join_ast->children.push_back(join_ast->on_expression);
}
}
return join_ast;

View File

@ -3,7 +3,6 @@
#include <memory>
#include <Common/Exception.h>
#include "Analyzer/Passes/OptimizeGroupByInjectiveFunctionsPass.h"
#include <IO/WriteHelpers.h>
#include <IO/Operators.h>
@ -16,39 +15,39 @@
#include <Analyzer/ColumnNode.h>
#include <Analyzer/FunctionNode.h>
#include <Analyzer/InDepthQueryTreeVisitor.h>
#include <Analyzer/Utils.h>
#include <Analyzer/Passes/AggregateFunctionOfGroupByKeysPass.h>
#include <Analyzer/Passes/AggregateFunctionsArithmericOperationsPass.h>
#include <Analyzer/Passes/ArrayExistsToHasPass.h>
#include <Analyzer/Passes/AutoFinalOnQueryPass.h>
#include <Analyzer/Passes/ComparisonTupleEliminationPass.h>
#include <Analyzer/Passes/ConvertOrLikeChainPass.h>
#include <Analyzer/Passes/ConvertQueryToCNFPass.h>
#include <Analyzer/Passes/CountDistinctPass.h>
#include <Analyzer/Passes/CrossToInnerJoinPass.h>
#include <Analyzer/Passes/FunctionToSubcolumnsPass.h>
#include <Analyzer/Passes/FuseFunctionsPass.h>
#include <Analyzer/Passes/GroupingFunctionsResolvePass.h>
#include <Analyzer/Passes/IfChainToMultiIfPass.h>
#include <Analyzer/Passes/IfConstantConditionPass.h>
#include <Analyzer/Passes/IfTransformStringsToEnumPass.h>
#include <Analyzer/Passes/LogicalExpressionOptimizerPass.h>
#include <Analyzer/Passes/MultiIfToIfPass.h>
#include <Analyzer/Passes/NormalizeCountVariantsPass.h>
#include <Analyzer/Passes/OptimizeDateOrDateTimeConverterWithPreimagePass.h>
#include <Analyzer/Passes/OptimizeGroupByFunctionKeysPass.h>
#include <Analyzer/Passes/OptimizeGroupByInjectiveFunctionsPass.h>
#include <Analyzer/Passes/OptimizeRedundantFunctionsInOrderByPass.h>
#include <Analyzer/Passes/OrderByLimitByDuplicateEliminationPass.h>
#include <Analyzer/Passes/OrderByTupleEliminationPass.h>
#include <Analyzer/Passes/QueryAnalysisPass.h>
#include <Analyzer/Passes/RemoveUnusedProjectionColumnsPass.h>
#include <Analyzer/Passes/RewriteSumFunctionWithSumAndCountPass.h>
#include <Analyzer/Passes/CountDistinctPass.h>
#include <Analyzer/Passes/UniqToCountPass.h>
#include <Analyzer/Passes/FunctionToSubcolumnsPass.h>
#include <Analyzer/Passes/RewriteAggregateFunctionWithIfPass.h>
#include <Analyzer/Passes/SumIfToCountIfPass.h>
#include <Analyzer/Passes/MultiIfToIfPass.h>
#include <Analyzer/Passes/IfConstantConditionPass.h>
#include <Analyzer/Passes/IfChainToMultiIfPass.h>
#include <Analyzer/Passes/OrderByTupleEliminationPass.h>
#include <Analyzer/Passes/NormalizeCountVariantsPass.h>
#include <Analyzer/Passes/AggregateFunctionsArithmericOperationsPass.h>
#include <Analyzer/Passes/UniqInjectiveFunctionsEliminationPass.h>
#include <Analyzer/Passes/OrderByLimitByDuplicateEliminationPass.h>
#include <Analyzer/Passes/FuseFunctionsPass.h>
#include <Analyzer/Passes/OptimizeGroupByFunctionKeysPass.h>
#include <Analyzer/Passes/IfTransformStringsToEnumPass.h>
#include <Analyzer/Passes/ConvertOrLikeChainPass.h>
#include <Analyzer/Passes/OptimizeRedundantFunctionsInOrderByPass.h>
#include <Analyzer/Passes/GroupingFunctionsResolvePass.h>
#include <Analyzer/Passes/AutoFinalOnQueryPass.h>
#include <Analyzer/Passes/ArrayExistsToHasPass.h>
#include <Analyzer/Passes/ComparisonTupleEliminationPass.h>
#include <Analyzer/Passes/LogicalExpressionOptimizerPass.h>
#include <Analyzer/Passes/CrossToInnerJoinPass.h>
#include <Analyzer/Passes/RewriteSumFunctionWithSumAndCountPass.h>
#include <Analyzer/Passes/ShardNumColumnToFunctionPass.h>
#include <Analyzer/Passes/ConvertQueryToCNFPass.h>
#include <Analyzer/Passes/AggregateFunctionOfGroupByKeysPass.h>
#include <Analyzer/Passes/OptimizeDateOrDateTimeConverterWithPreimagePass.h>
#include <Analyzer/Passes/SumIfToCountIfPass.h>
#include <Analyzer/Passes/UniqInjectiveFunctionsEliminationPass.h>
#include <Analyzer/Passes/UniqToCountPass.h>
#include <Analyzer/Utils.h>
namespace DB
{

View File

@ -1,3 +1,4 @@
#include <Interpreters/ProcessorsProfileLog.h>
#include <Common/FieldVisitorToString.h>
#include <DataTypes/DataTypesNumber.h>
@ -676,6 +677,8 @@ void QueryAnalyzer::evaluateScalarSubqueryIfNeeded(QueryTreeNodePtr & node, Iden
"tuple"});
}
}
logProcessorProfile(context, io.pipeline.getProcessors());
}
scalars_cache.emplace(node_with_hash, scalar_block);

View File

@ -2895,6 +2895,9 @@ Possible values:
**See Also**
- [ORDER BY Clause](../../sql-reference/statements/select/order-by.md/#optimize_read_in_order)
)", 0) \
DECLARE(Bool, read_in_order_use_virtual_row, false, R"(
Use virtual row while reading in order of primary key or its monotonic function fashion. It is useful when searching over multiple parts as only relevant ones are touched.
)", 0) \
DECLARE(Bool, optimize_read_in_window_order, true, R"(
Enable ORDER BY optimization in window clause for reading data in corresponding order in MergeTree tables.
@ -4875,6 +4878,9 @@ Limit on size of a single batch of file segments that a read buffer can request
)", 0) \
DECLARE(UInt64, filesystem_cache_reserve_space_wait_lock_timeout_milliseconds, 1000, R"(
Wait time to lock cache for space reservation in filesystem cache
)", 0) \
DECLARE(Bool, filesystem_cache_prefer_bigger_buffer_size, true, R"(
Prefer bigger buffer size if filesystem cache is enabled to avoid writing small file segments which deteriorate cache performance. On the other hand, enabling this setting might increase memory usage.
)", 0) \
DECLARE(UInt64, temporary_data_in_cache_reserve_space_wait_lock_timeout_milliseconds, (10 * 60 * 1000), R"(
Wait time to lock cache for space reservation for temporary data in filesystem cache

View File

@ -78,6 +78,8 @@ static std::initializer_list<std::pair<ClickHouseVersion, SettingsChangesHistory
{"backup_restore_finish_timeout_after_error_sec", 0, 180, "New setting."},
{"parallel_replicas_local_plan", false, true, "Use local plan for local replica in a query with parallel replicas"},
{"merge_tree_use_v1_object_and_dynamic_serialization", true, false, "Add new serialization V2 version for JSON and Dynamic types"}
{"filesystem_cache_prefer_bigger_buffer_size", true, true, "New setting"},
{"read_in_order_use_virtual_row", false, false, "Use virtual row while reading in order of primary key or its monotonic function fashion. It is useful when searching over multiple parts as only relevant ones are touched."},
}
},
{"24.10",

View File

@ -642,7 +642,10 @@ std::unique_ptr<ReadBufferFromFileBase> DiskObjectStorage::readFile(
};
/// Avoid cache fragmentation by choosing bigger buffer size.
bool prefer_bigger_buffer_size = object_storage->supportsCache() && read_settings.enable_filesystem_cache;
bool prefer_bigger_buffer_size = read_settings.filesystem_cache_prefer_bigger_buffer_size
&& object_storage->supportsCache()
&& read_settings.enable_filesystem_cache;
size_t buffer_size = prefer_bigger_buffer_size
? std::max<size_t>(settings.remote_fs_buffer_size, DBMS_DEFAULT_BUFFER_SIZE)
: settings.remote_fs_buffer_size;

View File

@ -4410,7 +4410,7 @@ private:
variant_column = IColumn::mutate(column);
/// Otherwise we should filter column.
else
variant_column = column->filter(filter, variant_size_hint)->assumeMutable();
variant_column = IColumn::mutate(column->filter(filter, variant_size_hint));
assert_cast<ColumnLowCardinality &>(*variant_column).nestedRemoveNullable();
return createVariantFromDescriptorsAndOneNonEmptyVariant(variant_types, std::move(discriminators), std::move(variant_column), variant_discr);

View File

@ -61,6 +61,7 @@ struct ReadSettings
bool filesystem_cache_allow_background_download = true;
bool filesystem_cache_allow_background_download_for_metadata_files_in_packed_storage = true;
bool filesystem_cache_allow_background_download_during_fetch = true;
bool filesystem_cache_prefer_bigger_buffer_size = true;
bool use_page_cache_for_disks_without_file_cache = false;
bool read_from_page_cache_if_exists_otherwise_bypass_cache = false;

View File

@ -196,6 +196,7 @@ namespace Setting
extern const SettingsUInt64 filesystem_cache_segments_batch_size;
extern const SettingsBool filesystem_cache_enable_background_download_for_metadata_files_in_packed_storage;
extern const SettingsBool filesystem_cache_enable_background_download_during_fetch;
extern const SettingsBool filesystem_cache_prefer_bigger_buffer_size;
extern const SettingsBool http_make_head_request;
extern const SettingsUInt64 http_max_fields;
extern const SettingsUInt64 http_max_field_name_size;
@ -5751,6 +5752,7 @@ ReadSettings Context::getReadSettings() const
res.filesystem_cache_allow_background_download_for_metadata_files_in_packed_storage
= settings_ref[Setting::filesystem_cache_enable_background_download_for_metadata_files_in_packed_storage];
res.filesystem_cache_allow_background_download_during_fetch = settings_ref[Setting::filesystem_cache_enable_background_download_during_fetch];
res.filesystem_cache_prefer_bigger_buffer_size = settings_ref[Setting::filesystem_cache_prefer_bigger_buffer_size];
res.filesystem_cache_max_download_size = settings_ref[Setting::filesystem_cache_max_download_size];
res.skip_download_if_exceeds_query_cache = settings_ref[Setting::skip_download_if_exceeds_query_cache];

View File

@ -5,9 +5,11 @@
#include <Core/Settings.h>
#include <DataTypes/DataTypeNullable.h>
#include <DataTypes/DataTypeTuple.h>
#include <IO/WriteBufferFromString.h>
#include <IO/WriteHelpers.h>
#include <Interpreters/Context.h>
#include <Interpreters/InterpreterSelectWithUnionQuery.h>
#include <Interpreters/ProcessorsProfileLog.h>
#include <Interpreters/addTypeConversionToAST.h>
#include <Interpreters/misc.h>
#include <Parsers/ASTExpressionList.h>
@ -19,9 +21,8 @@
#include <Parsers/ASTWithElement.h>
#include <Parsers/queryToString.h>
#include <Processors/Executors/PullingAsyncPipelineExecutor.h>
#include <Common/ProfileEvents.h>
#include <Common/FieldVisitorToString.h>
#include <IO/WriteBufferFromString.h>
#include <Common/ProfileEvents.h>
namespace ProfileEvents
{
@ -67,6 +68,18 @@ bool ExecuteScalarSubqueriesMatcher::needChildVisit(ASTPtr & node, const ASTPtr
return false;
}
if (auto * tables = node->as<ASTTablesInSelectQueryElement>())
{
/// Contrary to what's said in the code block above, ARRAY JOIN needs to resolve the subquery if possible
/// and assign an alias for 02367_optimize_trivial_count_with_array_join to pass. Otherwise it will fail in
/// ArrayJoinedColumnsVisitor (`No alias for non-trivial value in ARRAY JOIN: _a`)
/// This looks 100% as a incomplete code working on top of a bug, but this code has already been made obsolete
/// by the new analyzer, so it's an inconvenience we can live with until we deprecate it.
if (child == tables->array_join)
return true;
return false;
}
return true;
}
@ -246,6 +259,8 @@ void ExecuteScalarSubqueriesMatcher::visit(const ASTSubquery & subquery, ASTPtr
if (tmp_block.rows() != 0)
throw Exception(ErrorCodes::INCORRECT_RESULT_OF_SCALAR_SUBQUERY, "Scalar subquery returned more than one row");
logProcessorProfile(data.getContext(), io.pipeline.getProcessors());
}
block = materializeBlock(block);

View File

@ -1,21 +1,22 @@
#include <chrono>
#include <variant>
#include <Interpreters/PreparedSets.h>
#include <Interpreters/Set.h>
#include <Interpreters/InterpreterSelectWithUnionQuery.h>
#include <IO/Operators.h>
#include <Common/logger_useful.h>
#include <Core/Block.h>
#include <Core/Settings.h>
#include <Processors/QueryPlan/CreatingSetsStep.h>
#include <IO/Operators.h>
#include <Interpreters/InterpreterSelectWithUnionQuery.h>
#include <Interpreters/PreparedSets.h>
#include <Interpreters/ProcessorsProfileLog.h>
#include <Interpreters/Set.h>
#include <Processors/Executors/CompletedPipelineExecutor.h>
#include <Processors/QueryPlan/BuildQueryPipelineSettings.h>
#include <Processors/QueryPlan/CreatingSetsStep.h>
#include <Processors/QueryPlan/Optimizations/QueryPlanOptimizationSettings.h>
#include <Processors/QueryPlan/QueryPlan.h>
#include <Processors/Sinks/EmptySink.h>
#include <Processors/Sinks/NullSink.h>
#include <Processors/QueryPlan/QueryPlan.h>
#include <Core/Block.h>
#include <QueryPipeline/QueryPipelineBuilder.h>
#include <QueryPipeline/SizeLimits.h>
#include <Common/logger_useful.h>
namespace DB
{
@ -239,6 +240,8 @@ SetPtr FutureSetFromSubquery::buildOrderedSetInplace(const ContextPtr & context)
if (!set_and_key->set->isCreated())
return nullptr;
logProcessorProfile(context, pipeline.getProcessors());
return set_and_key->set;
}

View File

@ -1,5 +1,6 @@
#include <Interpreters/ProcessorsProfileLog.h>
#include <Core/Settings.h>
#include <DataTypes/DataTypeArray.h>
#include <DataTypes/DataTypeDate.h>
#include <DataTypes/DataTypeDateTime.h>
@ -8,16 +9,19 @@
#include <DataTypes/DataTypeNullable.h>
#include <DataTypes/DataTypeString.h>
#include <DataTypes/DataTypesNumber.h>
#include <Interpreters/Context.h>
#include <base/getFQDNOrHostName.h>
#include <Common/ClickHouseRevision.h>
#include <Common/DateLUTImpl.h>
#include <Common/logger_useful.h>
#include <array>
namespace DB
{
namespace Setting
{
extern const SettingsBool log_processors_profiles;
}
ColumnsDescription ProcessorProfileLogElement::getColumnsDescription()
{
return ColumnsDescription
@ -81,5 +85,57 @@ void ProcessorProfileLogElement::appendToBlock(MutableColumns & columns) const
columns[i++]->insert(output_bytes);
}
void logProcessorProfile(ContextPtr context, const Processors & processors)
{
const Settings & settings = context->getSettingsRef();
if (settings[Setting::log_processors_profiles])
{
if (auto processors_profile_log = context->getProcessorsProfileLog())
{
ProcessorProfileLogElement processor_elem;
const auto time_now = std::chrono::system_clock::now();
processor_elem.event_time = timeInSeconds(time_now);
processor_elem.event_time_microseconds = timeInMicroseconds(time_now);
processor_elem.initial_query_id = context->getInitialQueryId();
processor_elem.query_id = context->getCurrentQueryId();
auto get_proc_id = [](const IProcessor & proc) -> UInt64 { return reinterpret_cast<std::uintptr_t>(&proc); };
for (const auto & processor : processors)
{
std::vector<UInt64> parents;
for (const auto & port : processor->getOutputs())
{
if (!port.isConnected())
continue;
const IProcessor & next = port.getInputPort().getProcessor();
parents.push_back(get_proc_id(next));
}
processor_elem.id = get_proc_id(*processor);
processor_elem.parent_ids = std::move(parents);
processor_elem.plan_step = reinterpret_cast<std::uintptr_t>(processor->getQueryPlanStep());
processor_elem.plan_step_name = processor->getPlanStepName();
processor_elem.plan_step_description = processor->getPlanStepDescription();
processor_elem.plan_group = processor->getQueryPlanStepGroup();
processor_elem.processor_name = processor->getName();
processor_elem.elapsed_us = static_cast<UInt64>(processor->getElapsedNs() / 1000U);
processor_elem.input_wait_elapsed_us = static_cast<UInt64>(processor->getInputWaitElapsedNs() / 1000U);
processor_elem.output_wait_elapsed_us = static_cast<UInt64>(processor->getOutputWaitElapsedNs() / 1000U);
auto stats = processor->getProcessorDataStats();
processor_elem.input_rows = stats.input_rows;
processor_elem.input_bytes = stats.input_bytes;
processor_elem.output_rows = stats.output_rows;
processor_elem.output_bytes = stats.output_bytes;
processors_profile_log->add(processor_elem);
}
}
}
}
}

View File

@ -50,4 +50,5 @@ public:
using SystemLog<ProcessorProfileLogElement>::SystemLog;
};
void logProcessorProfile(ContextPtr context, const Processors & processors);
}

View File

@ -161,7 +161,13 @@ void QueryNormalizer::visit(ASTTablesInSelectQueryElement & node, const ASTPtr &
{
auto & join = node.table_join->as<ASTTableJoin &>();
if (join.on_expression)
{
ASTPtr original_on_expression = join.on_expression;
visit(join.on_expression, data);
if (join.on_expression != original_on_expression)
join.children = { join.on_expression };
}
}
}

View File

@ -6,6 +6,12 @@
namespace DB
{
namespace ErrorCode
{
extern const int LOGICAL_ERROR;
}
void RewriteArrayExistsFunctionMatcher::visit(ASTPtr & ast, Data & data)
{
if (auto * func = ast->as<ASTFunction>())
@ -20,21 +26,21 @@ void RewriteArrayExistsFunctionMatcher::visit(ASTPtr & ast, Data & data)
if (join->using_expression_list)
{
auto * it = std::find(join->children.begin(), join->children.end(), join->using_expression_list);
if (it == join->children.end())
throw Exception(ErrorCodes::LOGICAL_ERROR, "Could not find join->using_expression_list in '{}'", join->formatForLogging());
visit(join->using_expression_list, data);
if (it && *it != join->using_expression_list)
*it = join->using_expression_list;
*it = join->using_expression_list;
}
if (join->on_expression)
{
auto * it = std::find(join->children.begin(), join->children.end(), join->on_expression);
if (it == join->children.end())
throw Exception(ErrorCodes::LOGICAL_ERROR, "Could not find join->on_expression in '{}'", join->formatForLogging());
visit(join->on_expression, data);
if (it && *it != join->on_expression)
*it = join->on_expression;
*it = join->on_expression;
}
}
}

View File

@ -117,7 +117,6 @@ namespace Setting
extern const SettingsOverflowMode join_overflow_mode;
extern const SettingsString log_comment;
extern const SettingsBool log_formatted_queries;
extern const SettingsBool log_processors_profiles;
extern const SettingsBool log_profile_events;
extern const SettingsUInt64 log_queries_cut_to_length;
extern const SettingsBool log_queries;
@ -551,53 +550,8 @@ void logQueryFinish(
if (auto query_log = context->getQueryLog())
query_log->add(elem);
}
if (settings[Setting::log_processors_profiles])
{
if (auto processors_profile_log = context->getProcessorsProfileLog())
{
ProcessorProfileLogElement processor_elem;
processor_elem.event_time = elem.event_time;
processor_elem.event_time_microseconds = elem.event_time_microseconds;
processor_elem.initial_query_id = elem.client_info.initial_query_id;
processor_elem.query_id = elem.client_info.current_query_id;
auto get_proc_id = [](const IProcessor & proc) -> UInt64 { return reinterpret_cast<std::uintptr_t>(&proc); };
for (const auto & processor : query_pipeline.getProcessors())
{
std::vector<UInt64> parents;
for (const auto & port : processor->getOutputs())
{
if (!port.isConnected())
continue;
const IProcessor & next = port.getInputPort().getProcessor();
parents.push_back(get_proc_id(next));
}
processor_elem.id = get_proc_id(*processor);
processor_elem.parent_ids = std::move(parents);
processor_elem.plan_step = reinterpret_cast<std::uintptr_t>(processor->getQueryPlanStep());
processor_elem.plan_step_name = processor->getPlanStepName();
processor_elem.plan_step_description = processor->getPlanStepDescription();
processor_elem.plan_group = processor->getQueryPlanStepGroup();
processor_elem.processor_name = processor->getName();
processor_elem.elapsed_us = static_cast<UInt64>(processor->getElapsedNs() / 1000U);
processor_elem.input_wait_elapsed_us = static_cast<UInt64>(processor->getInputWaitElapsedNs() / 1000U);
processor_elem.output_wait_elapsed_us = static_cast<UInt64>(processor->getOutputWaitElapsedNs() / 1000U);
auto stats = processor->getProcessorDataStats();
processor_elem.input_rows = stats.input_rows;
processor_elem.input_bytes = stats.input_bytes;
processor_elem.output_rows = stats.output_rows;
processor_elem.output_bytes = stats.output_bytes;
processors_profile_log->add(processor_elem);
}
}
}
logProcessorProfile(context, query_pipeline.getProcessors());
logQueryMetricLogFinish(context, internal, elem.client_info.current_query_id, time_now, std::make_shared<QueryStatusInfo>(info));
}

View File

@ -1534,15 +1534,23 @@ static ColumnWithTypeAndName readColumnWithDateData(
for (size_t i = 0; i < orc_int_column->numElements; ++i)
{
Int32 days_num = static_cast<Int32>(orc_int_column->data[i]);
if (check_date_range && (days_num > DATE_LUT_MAX_EXTEND_DAY_NUM || days_num < -DAYNUM_OFFSET_EPOCH))
throw Exception(
ErrorCodes::VALUE_IS_OUT_OF_RANGE_OF_DATA_TYPE,
"Input value {} of a column \"{}\" exceeds the range of type Date32",
days_num,
column_name);
if (!orc_int_column->hasNulls || orc_int_column->notNull[i])
{
Int32 days_num = static_cast<Int32>(orc_int_column->data[i]);
if (check_date_range && (days_num > DATE_LUT_MAX_EXTEND_DAY_NUM || days_num < -DAYNUM_OFFSET_EPOCH))
throw Exception(
ErrorCodes::VALUE_IS_OUT_OF_RANGE_OF_DATA_TYPE,
"Input value {} of a column \"{}\" exceeds the range of type Date32",
days_num,
column_name);
column_data.push_back(days_num);
column_data.push_back(days_num);
}
else
{
/// ORC library doesn't guarantee that orc_int_column->data[i] is initialized to zero when orc_int_column->notNull[i] is false since https://github.com/ClickHouse/ClickHouse/pull/69473
column_data.push_back(0);
}
}
return {std::move(internal_column), internal_type, column_name};

View File

@ -1,5 +1,5 @@
#include <Processors/Merges/Algorithms/IMergingAlgorithmWithDelayedChunk.h>
#include <Processors/Merges/Algorithms/MergeTreePartLevelInfo.h>
#include <Processors/Merges/Algorithms/MergeTreeReadInfo.h>
namespace DB

View File

@ -1,5 +1,5 @@
#include <Processors/Merges/Algorithms/IMergingAlgorithmWithSharedChunks.h>
#include <Processors/Merges/Algorithms/MergeTreePartLevelInfo.h>
#include <Processors/Merges/Algorithms/MergeTreeReadInfo.h>
namespace DB
{

View File

@ -1,29 +0,0 @@
#pragma once
#include <Processors/Chunk.h>
namespace DB
{
/// To carry part level if chunk is produced by a merge tree source
class MergeTreePartLevelInfo : public ChunkInfoCloneable<MergeTreePartLevelInfo>
{
public:
MergeTreePartLevelInfo() = delete;
explicit MergeTreePartLevelInfo(ssize_t part_level)
: origin_merge_tree_part_level(part_level)
{ }
MergeTreePartLevelInfo(const MergeTreePartLevelInfo & other) = default;
size_t origin_merge_tree_part_level = 0;
};
inline size_t getPartLevelFromChunk(const Chunk & chunk)
{
const auto part_level_info = chunk.getChunkInfos().get<MergeTreePartLevelInfo>();
if (part_level_info)
return part_level_info->origin_merge_tree_part_level;
return 0;
}
}

View File

@ -0,0 +1,86 @@
#pragma once
#include <Processors/Chunk.h>
#include <Core/Block.h>
#include <Interpreters/ExpressionActions.h>
namespace DB
{
namespace ErrorCodes
{
extern const int LOGICAL_ERROR;
}
/// To carry part level and virtual row if chunk is produced by a merge tree source
class MergeTreeReadInfo : public ChunkInfoCloneable<MergeTreeReadInfo>
{
public:
MergeTreeReadInfo() = delete;
explicit MergeTreeReadInfo(size_t part_level) :
origin_merge_tree_part_level(part_level) {}
explicit MergeTreeReadInfo(size_t part_level, const Block & pk_block_, ExpressionActionsPtr virtual_row_conversions_) :
origin_merge_tree_part_level(part_level), pk_block(pk_block_), virtual_row_conversions(std::move(virtual_row_conversions_)) {}
MergeTreeReadInfo(const MergeTreeReadInfo & other) = default;
size_t origin_merge_tree_part_level = 0;
/// If is virtual_row, block should not be empty.
Block pk_block;
ExpressionActionsPtr virtual_row_conversions;
};
inline size_t getPartLevelFromChunk(const Chunk & chunk)
{
const auto read_info = chunk.getChunkInfos().get<MergeTreeReadInfo>();
if (read_info)
return read_info->origin_merge_tree_part_level;
return 0;
}
inline bool isVirtualRow(const Chunk & chunk)
{
const auto read_info = chunk.getChunkInfos().get<MergeTreeReadInfo>();
if (read_info)
return read_info->pk_block.columns() > 0;
return false;
}
inline void setVirtualRow(Chunk & chunk, const Block & header, bool apply_virtual_row_conversions)
{
auto read_info = chunk.getChunkInfos().get<MergeTreeReadInfo>();
chassert(read_info);
Block & pk_block = read_info->pk_block;
// std::cerr << apply_virtual_row_conversions << std::endl;
// std::cerr << read_info->virtual_row_conversions->dumpActions() << std::endl;
if (apply_virtual_row_conversions)
read_info->virtual_row_conversions->execute(pk_block);
// std::cerr << "++++" << pk_block.dumpStructure() << std::endl;
Columns ordered_columns;
ordered_columns.reserve(pk_block.columns());
for (size_t i = 0; i < header.columns(); ++i)
{
const ColumnWithTypeAndName & col = header.getByPosition(i);
if (const auto * pk_col = pk_block.findByName(col.name))
{
if (!col.type->equals(*pk_col->type))
throw Exception(ErrorCodes::LOGICAL_ERROR,
"Virtual row has different type for {}. Expected {}, got {}",
col.name, col.dumpStructure(), pk_col->dumpStructure());
ordered_columns.push_back(pk_col->column);
}
else
ordered_columns.push_back(col.type->createColumnConstWithDefaultValue(1));
}
chunk.setColumns(ordered_columns, 1);
}
}

View File

@ -1,3 +1,4 @@
#include <Processors/Merges/Algorithms/MergeTreeReadInfo.h>
#include <Processors/Merges/Algorithms/MergingSortedAlgorithm.h>
#include <Processors/Transforms/ColumnGathererTransform.h>
#include <IO/WriteBuffer.h>
@ -16,12 +17,14 @@ MergingSortedAlgorithm::MergingSortedAlgorithm(
SortingQueueStrategy sorting_queue_strategy_,
UInt64 limit_,
WriteBuffer * out_row_sources_buf_,
bool use_average_block_sizes)
bool use_average_block_sizes,
bool apply_virtual_row_conversions_)
: header(std::move(header_))
, merged_data(use_average_block_sizes, max_block_size_, max_block_size_bytes_)
, description(description_)
, limit(limit_)
, out_row_sources_buf(out_row_sources_buf_)
, apply_virtual_row_conversions(apply_virtual_row_conversions_)
, current_inputs(num_inputs)
, sorting_queue_strategy(sorting_queue_strategy_)
, cursors(num_inputs)
@ -49,6 +52,15 @@ void MergingSortedAlgorithm::addInput()
void MergingSortedAlgorithm::initialize(Inputs inputs)
{
for (auto & input : inputs)
{
if (!isVirtualRow(input.chunk))
continue;
setVirtualRow(input.chunk, header, apply_virtual_row_conversions);
input.skip_last_row = true;
}
removeConstAndSparse(inputs);
merged_data.initialize(header, inputs);
current_inputs = std::move(inputs);

View File

@ -22,7 +22,8 @@ public:
SortingQueueStrategy sorting_queue_strategy_,
UInt64 limit_ = 0,
WriteBuffer * out_row_sources_buf_ = nullptr,
bool use_average_block_sizes = false);
bool use_average_block_sizes = false,
bool apply_virtual_row_conversions_ = true);
void addInput();
@ -47,6 +48,8 @@ private:
/// If it is not nullptr then it should be populated during execution
WriteBuffer * out_row_sources_buf = nullptr;
bool apply_virtual_row_conversions;
/// Chunks currently being merged.
Inputs current_inputs;

View File

@ -1,3 +1,4 @@
#include <Processors/Merges/Algorithms/MergeTreeReadInfo.h>
#include <Processors/Merges/IMergingTransform.h>
namespace DB
@ -101,11 +102,16 @@ IProcessor::Status IMergingTransformBase::prepareInitializeInputs()
/// setNotNeeded after reading first chunk, because in optimismtic case
/// (e.g. with optimized 'ORDER BY primary_key LIMIT n' and small 'n')
/// we won't have to read any chunks anymore;
auto chunk = input.pull(limit_hint != 0);
if ((limit_hint && chunk.getNumRows() < limit_hint) || always_read_till_end)
/// If virtual row exists, let it pass through, so don't read more chunks.
auto chunk = input.pull(true);
bool virtual_row = isVirtualRow(chunk);
if (limit_hint == 0 && !virtual_row)
input.setNeeded();
if (!chunk.hasRows())
if (!virtual_row && ((limit_hint && chunk.getNumRows() < limit_hint) || always_read_till_end))
input.setNeeded();
if (!virtual_row && !chunk.hasRows())
{
if (!input.isFinished())
{

View File

@ -22,6 +22,7 @@ MergingSortedTransform::MergingSortedTransform(
bool always_read_till_end_,
WriteBuffer * out_row_sources_buf_,
bool use_average_block_sizes,
bool apply_virtual_row_conversions,
bool have_all_inputs_)
: IMergingTransform(
num_inputs,
@ -38,7 +39,8 @@ MergingSortedTransform::MergingSortedTransform(
sorting_queue_strategy,
limit_,
out_row_sources_buf_,
use_average_block_sizes)
use_average_block_sizes,
apply_virtual_row_conversions)
{
}

View File

@ -22,6 +22,7 @@ public:
bool always_read_till_end_ = false,
WriteBuffer * out_row_sources_buf_ = nullptr,
bool use_average_block_sizes = false,
bool apply_virtual_row_conversions = true,
bool have_all_inputs_ = true);
String getName() const override { return "MergingSortedTransform"; }

View File

@ -1,4 +1,5 @@
#include <Processors/QueryPlan/BufferChunksTransform.h>
#include <Processors/Merges/Algorithms/MergeTreeReadInfo.h>
namespace DB
{
@ -48,14 +49,27 @@ IProcessor::Status BufferChunksTransform::prepare()
}
else if (input.hasData())
{
auto chunk = pullChunk();
bool virtual_row;
auto chunk = pullChunk(virtual_row);
output.push(std::move(chunk));
if (virtual_row)
{
input.setNotNeeded();
return Status::PortFull;
}
}
}
if (input.hasData() && (num_buffered_rows < max_rows_to_buffer || num_buffered_bytes < max_bytes_to_buffer))
{
auto chunk = pullChunk();
bool virtual_row;
auto chunk = pullChunk(virtual_row);
if (virtual_row)
{
output.push(std::move(chunk));
input.setNotNeeded();
return Status::PortFull;
}
num_buffered_rows += chunk.getNumRows();
num_buffered_bytes += chunk.bytes();
chunks.push(std::move(chunk));
@ -71,10 +85,12 @@ IProcessor::Status BufferChunksTransform::prepare()
return Status::NeedData;
}
Chunk BufferChunksTransform::pullChunk()
Chunk BufferChunksTransform::pullChunk(bool & virtual_row)
{
auto chunk = input.pull();
num_processed_rows += chunk.getNumRows();
virtual_row = isVirtualRow(chunk);
if (!virtual_row)
num_processed_rows += chunk.getNumRows();
if (limit && num_processed_rows >= limit)
input.close();

View File

@ -24,7 +24,7 @@ public:
String getName() const override { return "BufferChunks"; }
private:
Chunk pullChunk();
Chunk pullChunk(bool & virtual_row);
InputPort & input;
OutputPort & output;

View File

@ -211,6 +211,8 @@ MatchedTrees::Matches matchTrees(const ActionsDAG::NodeRawConstPtrs & inner_dag,
MatchedTrees::Monotonicity monotonicity;
monotonicity.direction *= info.is_positive ? 1 : -1;
monotonicity.strict = info.is_strict;
monotonicity.child_match = &child_match;
monotonicity.child_node = monotonic_child;
if (child_match.monotonicity)
{

View File

@ -22,12 +22,16 @@ namespace DB
/// DAG for PK does not contain aliases and ambiguous nodes.
struct MatchedTrees
{
struct Match;
/// Monotonicity is calculated for monotonic functions chain.
/// Chain is not strict if there is any non-strict monotonic function.
struct Monotonicity
{
int direction = 1;
bool strict = true;
const Match * child_match = nullptr;
const ActionsDAG::Node * child_node = nullptr;
};
struct Match

View File

@ -124,7 +124,7 @@ SortingProperty applyOrder(QueryPlan::Node * parent, SortingProperty * propertie
auto common_prefix = commonPrefix(properties->sort_description, sorting_step->getSortDescription());
if (!common_prefix.empty())
/// Buffering is useful for reading from MergeTree, and it is applied in optimizeReadInOrder only.
sorting_step->convertToFinishSorting(common_prefix, /*use_buffering*/ false);
sorting_step->convertToFinishSorting(common_prefix, /*use_buffering*/ false, false);
}
auto scope = sorting_step->hasPartitions() ? SortingProperty::SortScope::Stream : SortingProperty::SortScope::Global;

View File

@ -324,12 +324,43 @@ void enrichFixedColumns(const ActionsDAG & dag, FixedColumns & fixed_columns)
}
}
const ActionsDAG::Node * addMonotonicChain(ActionsDAG & dag, const ActionsDAG::Node * node, const MatchedTrees::Match * match, const std::string & input_name)
{
if (!match->monotonicity)
return &dag.addInput(input_name, node->result_type);
if (node->type == ActionsDAG::ActionType::ALIAS)
return &dag.addAlias(*addMonotonicChain(dag, node->children.front(), match, input_name), node->result_name);
ActionsDAG::NodeRawConstPtrs args;
args.reserve(node->children.size());
for (const auto * child : node->children)
{
if (child == match->monotonicity->child_node)
args.push_back(addMonotonicChain(dag, match->monotonicity->child_node, match->monotonicity->child_match, input_name));
else
args.push_back(&dag.addColumn({child->column, child->result_type, child->result_name}));
}
return &dag.addFunction(node->function_base, std::move(args), {});
}
struct SortingInputOrder
{
InputOrderInfoPtr input_order{};
/// This is needed for virtual row optimization.
/// Convert the PR values to ORDER BY key.
/// If empty, the optimization cannot be applied.
std::optional<ActionsDAG> virtual_row_conversion{};
};
/// For the case when the order of keys is important (ORDER BY keys).
InputOrderInfoPtr buildInputOrderFromSortDescription(
SortingInputOrder buildInputOrderFromSortDescription(
const FixedColumns & fixed_columns,
const std::optional<ActionsDAG> & dag,
const SortDescription & description,
const KeyDescription & sorting_key,
const Names & pk_column_names,
size_t limit)
{
//std::cerr << "------- buildInputOrderInfo " << std::endl;
@ -369,6 +400,18 @@ InputOrderInfoPtr buildInputOrderFromSortDescription(
size_t next_description_column = 0;
size_t next_sort_key = 0;
bool can_optimize_virtual_row = true;
struct MatchInfo
{
const ActionsDAG::Node * source = nullptr;
const ActionsDAG::Node * fixed_column = nullptr;
const MatchedTrees::Match * monotonic = nullptr;
};
std::vector<MatchInfo> match_infos;
match_infos.reserve(description.size());
while (next_description_column < description.size() && next_sort_key < sorting_key.column_names.size())
{
const auto & sorting_key_column = sorting_key.column_names[next_sort_key];
@ -410,6 +453,7 @@ InputOrderInfoPtr buildInputOrderFromSortDescription(
//std::cerr << "====== (no dag) Found direct match" << std::endl;
match_infos.push_back({.source = sort_column_node});
++next_description_column;
++next_sort_key;
}
@ -438,24 +482,46 @@ InputOrderInfoPtr buildInputOrderFromSortDescription(
{
current_direction *= match.monotonicity->direction;
strict_monotonic = match.monotonicity->strict;
match_infos.push_back({.source = sort_node, .monotonic = &match});
}
else
match_infos.push_back({.source = sort_node});
++next_description_column;
++next_sort_key;
}
else if (fixed_key_columns.contains(sort_column_node))
{
if (next_sort_key == 0)
{
// Disable virtual row optimization.
// For example, when pk is (a,b), a = 1, order by b, virtual row should be
// disabled in the following case:
// 1st part (0, 100), (1, 2), (1, 3), (1, 4)
// 2nd part (0, 100), (1, 2), (1, 3), (1, 4).
can_optimize_virtual_row = false;
}
//std::cerr << "+++++++++ Found fixed key by match" << std::endl;
++next_sort_key;
}
else
{
//std::cerr << "====== Check for fixed const : " << bool(sort_node->column) << " fixed : " << fixed_columns.contains(sort_node) << std::endl;
bool is_fixed_column = sort_node->column || fixed_columns.contains(sort_node);
if (!is_fixed_column)
break;
if (!sort_node->column)
/// Virtual row for fixed column from order by is not supported now.
/// TODO: we can do it for the simple case,
/// But it's better to remove fixed columns from ORDER BY completely, e.g:
/// WHERE x = 42 ORDER BY x, y => WHERE x = 42 ORDER BY y
can_optimize_virtual_row = false;
match_infos.push_back({.source = sort_node, .fixed_column = sort_node});
order_key_prefix_descr.push_back(sort_column_description);
++next_description_column;
}
@ -477,9 +543,46 @@ InputOrderInfoPtr buildInputOrderFromSortDescription(
}
if (read_direction == 0 || order_key_prefix_descr.empty())
return nullptr;
return {};
return std::make_shared<InputOrderInfo>(order_key_prefix_descr, next_sort_key, read_direction, limit);
/// If the prefix description is used, we can't restore the full description from PK value.
/// TODO: partial sort description can be used as well. Implement support later.
if (order_key_prefix_descr.size() < description.size() || pk_column_names.size() < next_sort_key)
can_optimize_virtual_row = false;
auto order_info = std::make_shared<InputOrderInfo>(order_key_prefix_descr, next_sort_key, read_direction, limit);
std::optional<ActionsDAG> virtual_row_conversion;
if (can_optimize_virtual_row)
{
ActionsDAG virtual_row_dag;
virtual_row_dag.getOutputs().reserve(match_infos.size());
size_t next_pk_name = 0;
for (const auto & info : match_infos)
{
const ActionsDAG::Node * output;
if (info.fixed_column)
output = &virtual_row_dag.addColumn({info.fixed_column->column, info.fixed_column->result_type, info.fixed_column->result_name});
else
{
if (info.monotonic)
output = addMonotonicChain(virtual_row_dag, info.source, info.monotonic, pk_column_names[next_pk_name]);
else
{
output = &virtual_row_dag.addInput(pk_column_names[next_pk_name], info.source->result_type);
if (pk_column_names[next_pk_name] != info.source->result_name)
output = &virtual_row_dag.addAlias(*output, info.source->result_name);
}
++next_pk_name;
}
virtual_row_dag.getOutputs().push_back(output);
}
virtual_row_conversion = std::move(virtual_row_dag);
}
return {std::move(order_info), std::move(virtual_row_conversion)};
}
/// We may need a few different sort descriptions here.
@ -689,7 +792,7 @@ InputOrder buildInputOrderFromUnorderedKeys(
return { std::move(input_order), std::move(sort_description) }; // std::move(group_by_sort_description) };
}
InputOrderInfoPtr buildInputOrderFromSortDescription(
SortingInputOrder buildInputOrderFromSortDescription(
const ReadFromMergeTree * reading,
const FixedColumns & fixed_columns,
const std::optional<ActionsDAG> & dag,
@ -697,15 +800,17 @@ InputOrderInfoPtr buildInputOrderFromSortDescription(
size_t limit)
{
const auto & sorting_key = reading->getStorageMetadata()->getSortingKey();
const auto & pk_column_names = reading->getStorageMetadata()->getPrimaryKey().column_names;
return buildInputOrderFromSortDescription(
fixed_columns,
dag, description,
sorting_key,
pk_column_names,
limit);
}
InputOrderInfoPtr buildInputOrderFromSortDescription(
SortingInputOrder buildInputOrderFromSortDescription(
ReadFromMerge * merge,
const FixedColumns & fixed_columns,
const std::optional<ActionsDAG> & dag,
@ -714,28 +819,31 @@ InputOrderInfoPtr buildInputOrderFromSortDescription(
{
const auto & tables = merge->getSelectedTables();
InputOrderInfoPtr order_info;
SortingInputOrder order_info;
for (const auto & table : tables)
{
auto storage = std::get<StoragePtr>(table);
const auto & sorting_key = storage->getInMemoryMetadataPtr()->getSortingKey();
auto metadata = storage->getInMemoryMetadataPtr();
const auto & sorting_key = metadata->getSortingKey();
// const auto & pk_column_names = metadata->getPrimaryKey().column_names;
if (sorting_key.column_names.empty())
return nullptr;
return {};
auto table_order_info = buildInputOrderFromSortDescription(
fixed_columns,
dag, description,
sorting_key,
{},
limit);
if (!table_order_info)
return nullptr;
if (!table_order_info.input_order)
return {};
if (!order_info)
order_info = table_order_info;
else if (*order_info != *table_order_info)
return nullptr;
if (!order_info.input_order)
order_info = std::move(table_order_info);
else if (*order_info.input_order != *table_order_info.input_order)
return {};
}
return order_info;
@ -791,7 +899,7 @@ InputOrder buildInputOrderFromUnorderedKeys(
return order_info;
}
InputOrderInfoPtr buildInputOrderInfo(SortingStep & sorting, QueryPlan::Node & node)
InputOrderInfoPtr buildInputOrderInfo(SortingStep & sorting, bool & apply_virtual_row, QueryPlan::Node & node)
{
QueryPlan::Node * reading_node = findReadingStep(node, /*allow_existing_order=*/ false);
if (!reading_node)
@ -815,14 +923,21 @@ InputOrderInfoPtr buildInputOrderInfo(SortingStep & sorting, QueryPlan::Node & n
dag, description,
limit);
if (order_info)
if (order_info.input_order)
{
bool can_read = reading->requestReadingInOrder(order_info->used_prefix_of_sorting_key_size, order_info->direction, order_info->limit);
apply_virtual_row = order_info.virtual_row_conversion != std::nullopt;
bool can_read = reading->requestReadingInOrder(
order_info.input_order->used_prefix_of_sorting_key_size,
order_info.input_order->direction,
order_info.input_order->limit,
std::move(order_info.virtual_row_conversion));
if (!can_read)
return nullptr;
}
return order_info;
return order_info.input_order;
}
if (auto * merge = typeid_cast<ReadFromMerge *>(reading_node->step.get()))
{
@ -832,14 +947,14 @@ InputOrderInfoPtr buildInputOrderInfo(SortingStep & sorting, QueryPlan::Node & n
dag, description,
limit);
if (order_info)
if (order_info.input_order)
{
bool can_read = merge->requestReadingInOrder(order_info);
bool can_read = merge->requestReadingInOrder(order_info.input_order);
if (!can_read)
return nullptr;
}
return order_info;
return order_info.input_order;
}
return nullptr;
@ -873,7 +988,8 @@ InputOrder buildInputOrderInfo(AggregatingStep & aggregating, QueryPlan::Node &
bool can_read = reading->requestReadingInOrder(
order_info.input_order->used_prefix_of_sorting_key_size,
order_info.input_order->direction,
order_info.input_order->limit);
order_info.input_order->limit,
{});
if (!can_read)
return {};
}
@ -962,7 +1078,7 @@ InputOrder buildInputOrderInfo(DistinctStep & distinct, QueryPlan::Node & node)
if (!reading->requestReadingInOrder(
order_info.input_order->used_prefix_of_sorting_key_size,
order_info.input_order->direction,
order_info.input_order->limit))
order_info.input_order->limit, {}))
return {};
return order_info;
@ -1014,6 +1130,8 @@ void optimizeReadInOrder(QueryPlan::Node & node, QueryPlan::Nodes & nodes)
if (sorting->getType() != SortingStep::Type::Full)
return;
bool apply_virtual_row = false;
if (typeid_cast<UnionStep *>(node.children.front()->step.get()))
{
auto & union_node = node.children.front();
@ -1036,7 +1154,7 @@ void optimizeReadInOrder(QueryPlan::Node & node, QueryPlan::Nodes & nodes)
for (auto * child : union_node->children)
{
infos.push_back(buildInputOrderInfo(*sorting, *child));
infos.push_back(buildInputOrderInfo(*sorting, apply_virtual_row, *child));
if (infos.back())
{
@ -1088,13 +1206,13 @@ void optimizeReadInOrder(QueryPlan::Node & node, QueryPlan::Nodes & nodes)
}
}
sorting->convertToFinishSorting(*max_sort_descr, use_buffering);
sorting->convertToFinishSorting(*max_sort_descr, use_buffering, false);
}
else if (auto order_info = buildInputOrderInfo(*sorting, *node.children.front()))
else if (auto order_info = buildInputOrderInfo(*sorting, apply_virtual_row, *node.children.front()))
{
/// Use buffering only if have filter or don't have limit.
bool use_buffering = order_info->limit == 0;
sorting->convertToFinishSorting(order_info->sort_description_for_merging, use_buffering);
sorting->convertToFinishSorting(order_info->sort_description_for_merging, use_buffering, apply_virtual_row);
}
}
@ -1233,10 +1351,10 @@ size_t tryReuseStorageOrderingForWindowFunctions(QueryPlan::Node * parent_node,
if (order_info)
{
bool can_read = read_from_merge_tree->requestReadingInOrder(order_info->used_prefix_of_sorting_key_size, order_info->direction, order_info->limit);
bool can_read = read_from_merge_tree->requestReadingInOrder(order_info->used_prefix_of_sorting_key_size, order_info->direction, order_info->limit, {});
if (!can_read)
return 0;
sorting->convertToFinishSorting(order_info->sort_description_for_merging, false);
sorting->convertToFinishSorting(order_info->sort_description_for_merging, false, false);
}
return 0;

View File

@ -27,6 +27,7 @@
#include <Processors/Transforms/FilterTransform.h>
#include <Processors/Transforms/ReverseTransform.h>
#include <Processors/Transforms/SelectByIndicesTransform.h>
#include <Processors/Transforms/VirtualRowTransform.h>
#include <QueryPipeline/QueryPipelineBuilder.h>
#include <Storages/MergeTree/MergeTreeDataSelectExecutor.h>
#include <Storages/MergeTree/MergeTreeIndexLegacyVectorSimilarity.h>
@ -176,6 +177,7 @@ namespace Setting
extern const SettingsBool use_skip_indexes_if_final;
extern const SettingsBool use_uncompressed_cache;
extern const SettingsUInt64 merge_tree_min_read_task_size;
extern const SettingsBool read_in_order_use_virtual_row;
}
namespace MergeTreeSetting
@ -680,7 +682,34 @@ Pipe ReadFromMergeTree::readInOrder(
if (set_total_rows_approx)
source->addTotalRowsApprox(total_rows);
pipes.emplace_back(std::move(source));
Pipe pipe(source);
if (virtual_row_conversion && (read_type == ReadType::InOrder))
{
const auto & index = part_with_ranges.data_part->getIndex();
const auto & primary_key = storage_snapshot->metadata->primary_key;
size_t mark_range_begin = part_with_ranges.ranges.front().begin;
ColumnsWithTypeAndName pk_columns;
size_t num_columns = virtual_row_conversion->getRequiredColumnsWithTypes().size();
pk_columns.reserve(num_columns);
for (size_t j = 0; j < num_columns; ++j)
{
auto column = primary_key.data_types[j]->createColumn()->cloneEmpty();
column->insert((*(*index)[j])[mark_range_begin]);
pk_columns.push_back({std::move(column), primary_key.data_types[j], primary_key.column_names[j]});
}
Block pk_block(std::move(pk_columns));
pipe.addSimpleTransform([&](const Block & header)
{
return std::make_shared<VirtualRowTransform>(header, pk_block, virtual_row_conversion);
});
}
pipes.emplace_back(std::move(pipe));
}
auto pipe = Pipe::unitePipes(std::move(pipes));
@ -1140,7 +1169,8 @@ Pipe ReadFromMergeTree::spreadMarkRangesAmongStreamsWithOrder(
if (pipe.numOutputPorts() > 1)
{
auto transform = std::make_shared<MergingSortedTransform>(
pipe.getHeader(), pipe.numOutputPorts(), sort_description, block_size.max_block_size_rows, /*max_block_size_bytes=*/0, SortingQueueStrategy::Batch);
pipe.getHeader(), pipe.numOutputPorts(), sort_description, block_size.max_block_size_rows, /*max_block_size_bytes=*/0, SortingQueueStrategy::Batch,
0, false, nullptr, false, /*apply_virtual_row_conversions*/ false);
pipe.addTransform(std::move(transform));
}
@ -1799,7 +1829,7 @@ void ReadFromMergeTree::updateSortDescription()
enable_vertical_final);
}
bool ReadFromMergeTree::requestReadingInOrder(size_t prefix_size, int direction, size_t read_limit)
bool ReadFromMergeTree::requestReadingInOrder(size_t prefix_size, int direction, size_t read_limit, std::optional<ActionsDAG> virtual_row_conversion_)
{
/// if dirction is not set, use current one
if (!direction)
@ -1822,6 +1852,10 @@ bool ReadFromMergeTree::requestReadingInOrder(size_t prefix_size, int direction,
/// Let prefer in-order optimization over vertical FINAL for now
enable_vertical_final = false;
/// Disable virtual row for FINAL.
if (virtual_row_conversion_ && !isQueryWithFinal() && context->getSettingsRef()[Setting::read_in_order_use_virtual_row])
virtual_row_conversion = std::make_shared<ExpressionActions>(std::move(*virtual_row_conversion_));
updateSortDescription();
return true;
@ -2238,6 +2272,12 @@ void ReadFromMergeTree::describeActions(FormatSettings & format_settings) const
expression->describeActions(format_settings.out, prefix);
}
}
if (virtual_row_conversion)
{
format_settings.out << prefix << "Virtual row conversions" << '\n';
virtual_row_conversion->describeActions(format_settings.out, prefix);
}
}
void ReadFromMergeTree::describeActions(JSONBuilder::JSONMap & map) const
@ -2277,6 +2317,9 @@ void ReadFromMergeTree::describeActions(JSONBuilder::JSONMap & map) const
map.add("Prewhere info", std::move(prewhere_info_map));
}
if (virtual_row_conversion)
map.add("Virtual row conversions", virtual_row_conversion->toTree());
}
void ReadFromMergeTree::describeIndexes(FormatSettings & format_settings) const

View File

@ -187,7 +187,7 @@ public:
StorageMetadataPtr getStorageMetadata() const { return storage_snapshot->metadata; }
/// Returns `false` if requested reading cannot be performed.
bool requestReadingInOrder(size_t prefix_size, int direction, size_t limit);
bool requestReadingInOrder(size_t prefix_size, int direction, size_t limit, std::optional<ActionsDAG> virtual_row_conversion_);
bool readsInOrder() const;
const InputOrderInfoPtr & getInputOrder() const { return query_info.input_order_info; }
const SortDescription & getSortDescription() const override { return result_sort_description; }
@ -281,6 +281,9 @@ private:
std::optional<MergeTreeReadTaskCallback> read_task_callback;
bool enable_vertical_final = false;
bool enable_remove_parts_from_snapshot_optimization = true;
ExpressionActionsPtr virtual_row_conversion;
std::optional<size_t> number_of_current_replica;
};

View File

@ -145,11 +145,12 @@ void SortingStep::updateLimit(size_t limit_)
}
}
void SortingStep::convertToFinishSorting(SortDescription prefix_description_, bool use_buffering_)
void SortingStep::convertToFinishSorting(SortDescription prefix_description_, bool use_buffering_, bool apply_virtual_row_conversions_)
{
type = Type::FinishSorting;
prefix_description = std::move(prefix_description_);
use_buffering = use_buffering_;
apply_virtual_row_conversions = apply_virtual_row_conversions_;
}
void SortingStep::scatterByPartitionIfNeeded(QueryPipelineBuilder& pipeline)
@ -253,7 +254,10 @@ void SortingStep::mergingSorted(QueryPipelineBuilder & pipeline, const SortDescr
/*max_block_size_bytes=*/0,
SortingQueueStrategy::Batch,
limit_,
always_read_till_end);
always_read_till_end,
nullptr,
false,
apply_virtual_row_conversions);
pipeline.addTransform(std::move(transform));
}

View File

@ -84,7 +84,7 @@ public:
bool isSortingForMergeJoin() const { return is_sorting_for_merge_join; }
void convertToFinishSorting(SortDescription prefix_description, bool use_buffering_);
void convertToFinishSorting(SortDescription prefix_description, bool use_buffering_, bool apply_virtual_row_conversions_);
Type getType() const { return type; }
const Settings & getSettings() const { return sort_settings; }
@ -134,6 +134,7 @@ private:
UInt64 limit;
bool always_read_till_end = false;
bool use_buffering = false;
bool apply_virtual_row_conversions = false;
Settings sort_settings;
};

View File

@ -187,6 +187,7 @@ void MergeSortingTransform::consume(Chunk chunk)
{
bool have_all_inputs = false;
bool use_average_block_sizes = false;
bool apply_virtual_row = false;
external_merging_sorted = std::make_shared<MergingSortedTransform>(
header_without_constants,
@ -199,6 +200,7 @@ void MergeSortingTransform::consume(Chunk chunk)
/*always_read_till_end_=*/ false,
nullptr,
use_average_block_sizes,
apply_virtual_row,
have_all_inputs);
processors.emplace_back(external_merging_sorted);

View File

@ -0,0 +1,111 @@
#include <Processors/Transforms/VirtualRowTransform.h>
#include <Processors/Merges/Algorithms/MergeTreeReadInfo.h>
namespace DB
{
namespace ErrorCodes
{
extern const int LOGICAL_ERROR;
}
VirtualRowTransform::VirtualRowTransform(const Block & header_, const Block & pk_block_, ExpressionActionsPtr virtual_row_conversions_)
: IProcessor({header_}, {header_})
, input(inputs.front()), output(outputs.front())
, pk_block(pk_block_)
, virtual_row_conversions(std::move(virtual_row_conversions_))
{
}
VirtualRowTransform::Status VirtualRowTransform::prepare()
{
/// Check can output.
if (output.isFinished())
{
input.close();
return Status::Finished;
}
if (!output.canPush())
{
input.setNotNeeded();
return Status::PortFull;
}
/// Output if has data.
if (generated)
{
output.push(std::move(current_chunk));
generated = false;
return Status::PortFull;
}
if (can_generate)
return Status::Ready;
/// Check can input.
if (!has_input)
{
if (input.isFinished())
{
output.finish();
return Status::Finished;
}
input.setNeeded();
if (!input.hasData())
return Status::NeedData;
/// Set input port NotNeeded after chunk was pulled.
current_chunk = input.pull(true);
has_input = true;
}
/// Now transform.
return Status::Ready;
}
void VirtualRowTransform::work()
{
if (can_generate)
{
if (generated)
throw Exception(ErrorCodes::LOGICAL_ERROR, "VirtualRowTransform cannot consume chunk because it already was generated");
generated = true;
can_generate = false;
if (!is_first)
{
if (current_chunk.empty())
throw Exception(ErrorCodes::LOGICAL_ERROR, "Can't generate chunk in VirtualRowTransform");
return;
}
is_first = false;
Columns empty_columns;
const auto & header = getOutputs().front().getHeader();
empty_columns.reserve(header.columns());
for (size_t i = 0; i < header.columns(); ++i)
{
const ColumnWithTypeAndName & type_and_name = header.getByPosition(i);
empty_columns.push_back(type_and_name.type->createColumn()->cloneEmpty());
}
current_chunk.setColumns(empty_columns, 0);
current_chunk.getChunkInfos().add(std::make_shared<MergeTreeReadInfo>(0, pk_block, virtual_row_conversions));
}
else
{
if (!has_input)
throw Exception(ErrorCodes::LOGICAL_ERROR, "VirtualRowTransform cannot consume chunk because it wasn't read");
has_input = false;
can_generate = true;
}
}
}

View File

@ -0,0 +1,35 @@
#pragma once
#include <Processors/IProcessor.h>
#include <Storages/KeyDescription.h>
#include <Storages/MergeTree/IMergeTreeDataPart.h>
namespace DB
{
/// Virtual row is useful for read-in-order optimization when multiple parts exist.
class VirtualRowTransform : public IProcessor
{
public:
explicit VirtualRowTransform(const Block & header_, const Block & pk_block_, ExpressionActionsPtr virtual_row_conversions_);
String getName() const override { return "VirtualRowTransform"; }
Status prepare() override;
void work() override;
private:
InputPort & input;
OutputPort & output;
Chunk current_chunk;
bool has_input = false;
bool generated = false;
bool can_generate = true;
bool is_first = true;
Block pk_block;
ExpressionActionsPtr virtual_row_conversions;
};
}

View File

@ -10,8 +10,8 @@ namespace DB
/**
* Base cluster for Storages used in table functions like s3Cluster and hdfsCluster
* Needed for code simplification around parallel_distributed_insert_select
* Base cluster for Storages used in table functions like s3Cluster and hdfsCluster.
* Necessary for code simplification around parallel_distributed_insert_select.
*/
class IStorageCluster : public IStorage
{

View File

@ -6,7 +6,7 @@
#include <Common/ElapsedTimeProfileEventIncrement.h>
#include <Common/logger_useful.h>
#include <Common/typeid_cast.h>
#include <Processors/Merges/Algorithms/MergeTreePartLevelInfo.h>
#include <Processors/Merges/Algorithms/MergeTreeReadInfo.h>
#include <DataTypes/DataTypeUUID.h>
#include <DataTypes/DataTypeArray.h>
#include <Processors/Chunk.h>
@ -203,7 +203,7 @@ ChunkAndProgress MergeTreeSelectProcessor::read()
auto chunk = Chunk(ordered_columns, res.row_count);
if (add_part_level)
chunk.getChunkInfos().add(std::make_shared<MergeTreePartLevelInfo>(task->getInfo().data_part->info.level));
chunk.getChunkInfos().add(std::make_shared<MergeTreeReadInfo>(task->getInfo().data_part->info.level));
return ChunkAndProgress{
.chunk = std::move(chunk),

View File

@ -14,9 +14,10 @@
#include <Processors/QueryPlan/QueryPlan.h>
#include <Processors/QueryPlan/FilterStep.h>
#include <Common/logger_useful.h>
#include <Processors/Merges/Algorithms/MergeTreePartLevelInfo.h>
#include <Processors/Merges/Algorithms/MergeTreeReadInfo.h>
#include <Storages/MergeTree/checkDataPart.h>
namespace DB
{
@ -271,7 +272,7 @@ try
auto result = Chunk(std::move(res_columns), rows_read);
if (add_part_level)
result.getChunkInfos().add(std::make_shared<MergeTreePartLevelInfo>(data_part->info.level));
result.getChunkInfos().add(std::make_shared<MergeTreeReadInfo>(data_part->info.level));
return result;
}
}

View File

@ -29,8 +29,8 @@ namespace MergeTreeSetting
namespace ErrorCodes
{
extern const int REPLICA_IS_ALREADY_ACTIVE;
extern const int REPLICA_STATUS_CHANGED;
extern const int LOGICAL_ERROR;
extern const int SUPPORT_IS_DISABLED;
}
namespace FailPoints
@ -217,26 +217,10 @@ bool ReplicatedMergeTreeRestartingThread::tryStartup()
}
else
{
/// Table was created before 20.4 and was never altered,
/// let's initialize replica metadata version from global metadata version.
const String & zookeeper_path = storage.zookeeper_path, & replica_path = storage.replica_path;
Coordination::Stat table_metadata_version_stat;
zookeeper->get(zookeeper_path + "/metadata", &table_metadata_version_stat);
Coordination::Requests ops;
ops.emplace_back(zkutil::makeCheckRequest(zookeeper_path + "/metadata", table_metadata_version_stat.version));
ops.emplace_back(zkutil::makeCreateRequest(replica_path + "/metadata_version", toString(table_metadata_version_stat.version), zkutil::CreateMode::Persistent));
Coordination::Responses res;
auto code = zookeeper->tryMulti(ops, res);
if (code == Coordination::Error::ZBADVERSION)
throw Exception(ErrorCodes::REPLICA_STATUS_CHANGED, "Failed to initialize metadata_version "
"because table was concurrently altered, will retry");
zkutil::KeeperMultiException::check(code, ops, res);
throw Exception(ErrorCodes::SUPPORT_IS_DISABLED,
"It seems you have upgraded from a version earlier than 20.4 straight to one later than 24.10. "
"ClickHouse does not support upgrades that span more than a year. "
"Please update gradually (through intermediate versions).");
}
storage.queue.removeCurrentPartsFromMutations();

View File

@ -619,8 +619,17 @@ bool ReplicatedMergeTreeSinkImpl<false>::writeExistingPart(MergeTreeData::Mutabl
String block_id = deduplicate ? fmt::format("{}_{}", part->info.partition_id, part->checksums.getTotalChecksumHex()) : "";
bool deduplicated = commitPart(zookeeper, part, block_id, replicas_num).second;
int error = 0;
/// Set a special error code if the block is duplicate
int error = (deduplicate && deduplicated) ? ErrorCodes::INSERT_WAS_DEDUPLICATED : 0;
/// And remove attaching_ prefix
if (deduplicate && deduplicated)
{
error = ErrorCodes::INSERT_WAS_DEDUPLICATED;
if (!endsWith(part->getDataPartStorage().getRelativePath(), "detached/attaching_" + part->name + "/"))
throw Exception(ErrorCodes::LOGICAL_ERROR, "Unexpected relative path for a deduplicated part: {}", part->getDataPartStorage().getRelativePath());
fs::path new_relative_path = fs::path("detached") / part->getNewName(part->info);
part->renameTo(new_relative_path, false);
}
PartLog::addNewPart(storage.getContext(), PartLog::PartLogEntry(part, watch.elapsed(), profile_events_scope.getSnapshot()), ExecutionStatus(error));
return deduplicated;
}
@ -880,8 +889,9 @@ std::pair<std::vector<String>, bool> ReplicatedMergeTreeSinkImpl<async_insert>::
}
}
/// Save the current temporary path in case we need to revert the change to retry (ZK connection loss)
/// Save the current temporary path and name in case we need to revert the change to retry (ZK connection loss) or in case part is deduplicated.
const String temporary_part_relative_path = part->getDataPartStorage().getPartDirectory();
const String initial_part_name = part->name;
/// Obtain incremental block number and lock it. The lock holds our intention to add the block to the filesystem.
/// We remove the lock just after renaming the part. In case of exception, block number will be marked as abandoned.
@ -1050,16 +1060,6 @@ std::pair<std::vector<String>, bool> ReplicatedMergeTreeSinkImpl<async_insert>::
zkutil::KeeperMultiException::check(multi_code, ops, responses);
}
transaction.rollback();
if (!Coordination::isUserError(multi_code))
throw Exception(
ErrorCodes::UNEXPECTED_ZOOKEEPER_ERROR,
"Unexpected ZooKeeper error while adding block {} with ID '{}': {}",
block_number,
toString(block_id),
multi_code);
auto failed_op_idx = zkutil::getFailedOpIndex(multi_code, responses);
String failed_op_path = ops[failed_op_idx]->getPath();
@ -1069,6 +1069,11 @@ std::pair<std::vector<String>, bool> ReplicatedMergeTreeSinkImpl<async_insert>::
LOG_INFO(log, "Block with ID {} already exists (it was just appeared) for part {}. Ignore it.",
toString(block_id), part->name);
transaction.rollbackPartsToTemporaryState();
part->is_temp = true;
part->setName(initial_part_name);
part->renameTo(temporary_part_relative_path, false);
if constexpr (async_insert)
{
retry_context.conflict_block_ids = std::vector<String>({failed_op_path});
@ -1080,6 +1085,16 @@ std::pair<std::vector<String>, bool> ReplicatedMergeTreeSinkImpl<async_insert>::
return CommitRetryContext::DUPLICATED_PART;
}
transaction.rollback();
if (!Coordination::isUserError(multi_code))
throw Exception(
ErrorCodes::UNEXPECTED_ZOOKEEPER_ERROR,
"Unexpected ZooKeeper error while adding block {} with ID '{}': {}",
block_number,
toString(block_id),
multi_code);
if (multi_code == Coordination::Error::ZNONODE && failed_op_idx == block_unlock_op_idx)
throw Exception(ErrorCodes::QUERY_WAS_CANCELLED,
"Insert query (for block {}) was canceled by concurrent ALTER PARTITION or TRUNCATE",

View File

@ -517,7 +517,7 @@ std::unique_ptr<ReadBufferFromFileBase> StorageObjectStorageSource::createReadBu
LOG_TRACE(log, "Downloading object of size {} with initial prefetch", object_size);
bool prefer_bigger_buffer_size = impl->isCached();
bool prefer_bigger_buffer_size = read_settings.filesystem_cache_prefer_bigger_buffer_size && impl->isCached();
size_t buffer_size = prefer_bigger_buffer_size
? std::max<size_t>(read_settings.remote_fs_buffer_size, DBMS_DEFAULT_BUFFER_SIZE)
: read_settings.remote_fs_buffer_size;

View File

@ -1563,7 +1563,7 @@ bool ReadFromMerge::requestReadingInOrder(InputOrderInfoPtr order_info_)
auto request_read_in_order = [order_info_](ReadFromMergeTree & read_from_merge_tree)
{
return read_from_merge_tree.requestReadingInOrder(
order_info_->used_prefix_of_sorting_key_size, order_info_->direction, order_info_->limit);
order_info_->used_prefix_of_sorting_key_size, order_info_->direction, order_info_->limit, {});
};
bool ok = true;

View File

@ -245,6 +245,7 @@ namespace
table_join->strictness = JoinStrictness::Semi;
table_join->on_expression = makeASTFunction("equals", makeASTColumn(data_table_id, TimeSeriesColumnNames::ID), makeASTColumn(tags_table_id, TimeSeriesColumnNames::ID));
table_join->children.push_back(table_join->on_expression);
table->table_join = table_join;
auto table_exp = std::make_shared<ASTTableExpression>();

View File

@ -51,7 +51,7 @@ protected:
"corresponding table function",
getName());
/// Evaluate only first argument, everything else will be done Base class
/// Evaluate only first argument, everything else will be done by the Base class
args[0] = evaluateConstantExpressionOrIdentifierAsLiteral(args[0], context);
/// Cluster name is always the first

View File

@ -34,8 +34,9 @@ from typing import List, Optional
import __main__
from ci_buddy import CIBuddy
from ci_config import Labels
from env_helper import TEMP_PATH
from env_helper import IS_CI, TEMP_PATH
from get_robot_token import get_best_robot_token
from git_helper import GIT_PREFIX, git_runner, is_shallow
from github_helper import GitHub, PullRequest, PullRequests, Repository
@ -97,7 +98,7 @@ close it.
self.pr = pr
self.repo = repo
self.cherrypick_branch = f"cherrypick/{name}/{pr.merge_commit_sha}"
self.cherrypick_branch = f"cherrypick/{name}/{pr.number}"
self.backport_branch = f"backport/{name}/{pr.number}"
self.cherrypick_pr = None # type: Optional[PullRequest]
self.backport_pr = None # type: Optional[PullRequest]
@ -653,6 +654,14 @@ def main():
bp.process_backports()
if bp.error is not None:
logging.error("Finished successfully, but errors occurred!")
if IS_CI:
ci_buddy = CIBuddy()
ci_buddy.post_job_error(
f"The cherry-pick finished with errors: {bp.error}",
with_instance_info=True,
with_wf_link=True,
critical=True,
)
raise bp.error

View File

@ -1268,6 +1268,7 @@ def main() -> int:
s3,
pr_info.number,
pr_info.sha,
pr_info.head_ref,
job_report.test_results,
job_report.additional_files,
job_report.check_name or _get_ext_check_name(args.job_name),
@ -1335,6 +1336,7 @@ def main() -> int:
s3,
pr_info.number,
pr_info.sha,
pr_info.head_ref,
job_report.test_results,
job_report.additional_files,
job_report.check_name or _get_ext_check_name(args.job_name),

View File

@ -3,14 +3,13 @@ import json
import os
from typing import Dict, List, Union
import boto3
import requests
from botocore.exceptions import ClientError
from ci_config import CI
from ci_utils import WithIter
from commit_status_helper import get_commit_filtered_statuses, get_repo
from get_robot_token import get_best_robot_token
from get_robot_token import get_best_robot_token, get_parameter_from_ssm
from github_helper import GitHub
from pr_info import PRInfo
@ -89,15 +88,9 @@ class CIBuddy:
def _get_webhooks():
name = "ci_buddy_web_hooks"
session = boto3.Session(region_name="us-east-1") # Replace with your region
ssm_client = session.client("ssm")
json_string = None
try:
response = ssm_client.get_parameter(
Name=name,
WithDecryption=True, # Set to True if the parameter is a SecureString
)
json_string = response["Parameter"]["Value"]
json_string = get_parameter_from_ssm(name, decrypt=True)
except ClientError as e:
print(f"An error occurred: {e}")

View File

@ -9,6 +9,7 @@ from typing import Any, Dict, List, Optional
import requests
from env_helper import GITHUB_REPOSITORY
from get_robot_token import get_parameter_from_ssm
from pr_info import PRInfo
from report import TestResults
@ -211,17 +212,13 @@ def prepare_tests_results_for_clickhouse(
report_url: str,
check_name: str,
) -> List[dict]:
pull_request_url = "https://github.com/ClickHouse/ClickHouse/commits/master"
base_ref = "master"
head_ref = "master"
base_repo = pr_info.repo_full_name
head_repo = pr_info.repo_full_name
base_ref = pr_info.base_ref
base_repo = pr_info.base_name
head_ref = pr_info.head_ref
head_repo = pr_info.head_name
pull_request_url = f"https://github.com/{GITHUB_REPOSITORY}/commits/{head_ref}"
if pr_info.number != 0:
pull_request_url = pr_info.pr_html_url
base_ref = pr_info.base_ref
base_repo = pr_info.base_name
head_ref = pr_info.head_ref
head_repo = pr_info.head_name
common_properties = {
"pull_request_number": pr_info.number,

View File

@ -315,7 +315,13 @@ def create_ci_report(pr_info: PRInfo, statuses: CommitStatuses) -> str:
)
)
return upload_results(
S3Helper(), pr_info.number, pr_info.sha, test_results, [], CI.StatusNames.CI
S3Helper(),
pr_info.number,
pr_info.sha,
pr_info.head_ref,
test_results,
[],
CI.StatusNames.CI,
)

View File

@ -250,7 +250,9 @@ def main():
s3_helper = S3Helper()
pr_info = PRInfo()
url = upload_results(s3_helper, pr_info.number, pr_info.sha, test_results, [], NAME)
url = upload_results(
s3_helper, pr_info.number, pr_info.sha, pr_info.head_ref, test_results, [], NAME
)
print(f"::notice ::Report url: {url}")

View File

@ -183,7 +183,9 @@ def main():
pr_info = PRInfo()
s3_helper = S3Helper()
url = upload_results(s3_helper, pr_info.number, pr_info.sha, test_results, [], NAME)
url = upload_results(
s3_helper, pr_info.number, pr_info.sha, pr_info.head_ref, test_results, [], NAME
)
print(f"::notice ::Report url: {url}")

View File

@ -132,6 +132,12 @@ class PRInfo:
ref = github_event.get("ref", "refs/heads/master")
if ref and ref.startswith("refs/heads/"):
ref = ref[11:]
# Default values
self.base_ref = "" # type: str
self.base_name = "" # type: str
self.head_ref = "" # type: str
self.head_name = "" # type: str
self.number = 0 # type: int
# workflow completed event, used for PRs only
if "action" in github_event and github_event["action"] == "completed":
@ -146,7 +152,7 @@ class PRInfo:
if "pull_request" in github_event: # pull request and other similar events
self.event_type = EventType.PULL_REQUEST
self.number = github_event["pull_request"]["number"] # type: int
self.number = github_event["pull_request"]["number"]
if pr_event_from_api:
try:
response = get_gh_api(
@ -172,17 +178,13 @@ class PRInfo:
self.pr_html_url = f"{repo_prefix}/pull/{self.number}"
# master or backport/xx.x/xxxxx - where the PR will be merged
self.base_ref = github_event["pull_request"]["base"]["ref"] # type: str
self.base_ref = github_event["pull_request"]["base"]["ref"]
# ClickHouse/ClickHouse
self.base_name = github_event["pull_request"]["base"]["repo"][
"full_name"
] # type: str
self.base_name = github_event["pull_request"]["base"]["repo"]["full_name"]
# any_branch-name - the name of working branch name
self.head_ref = github_event["pull_request"]["head"]["ref"] # type: str
self.head_ref = github_event["pull_request"]["head"]["ref"]
# UserName/ClickHouse or ClickHouse/ClickHouse
self.head_name = github_event["pull_request"]["head"]["repo"][
"full_name"
] # type: str
self.head_name = github_event["pull_request"]["head"]["repo"]["full_name"]
self.body = github_event["pull_request"]["body"]
self.labels = {
label["name"] for label in github_event["pull_request"]["labels"]

View File

@ -64,6 +64,7 @@ def upload_results(
s3_client: S3Helper,
pr_number: int,
commit_sha: str,
branch_name: str,
test_results: TestResults,
additional_files: Union[Sequence[Path], Sequence[str]],
check_name: str,
@ -80,8 +81,7 @@ def upload_results(
process_logs(s3_client, additional_files, s3_path_prefix, test_results)
)
branch_url = f"{GITHUB_SERVER_URL}/{GITHUB_REPOSITORY}/commits/master"
branch_name = "master"
branch_url = f"{GITHUB_SERVER_URL}/{GITHUB_REPOSITORY}/commits/{branch_name}"
if pr_number != 0:
branch_name = f"PR #{pr_number}"
branch_url = f"{GITHUB_SERVER_URL}/{GITHUB_REPOSITORY}/pull/{pr_number}"

View File

@ -5,7 +5,8 @@ source /repo/tests/docker_scripts/utils.lib
function attach_gdb_to_clickhouse()
{
IS_ASAN=$(clickhouse-client --query "SELECT count() FROM system.build_options WHERE name = 'CXX_FLAGS' AND position('sanitize=address' IN value)")
# Use retries to avoid failures due to fault injections
IS_ASAN=$(run_with_retry 5 clickhouse-client --query "SELECT count() FROM system.build_options WHERE name = 'CXX_FLAGS' AND position('sanitize=address' IN value)")
if [[ "$IS_ASAN" = "1" ]];
then
echo "ASAN build detected. Not using gdb since it disables LeakSanitizer detections"

View File

@ -744,11 +744,13 @@ class ClickHouseCluster:
# available when with_prometheus == True
self.with_prometheus = False
self.prometheus_writer_host = "prometheus_writer"
self.prometheus_writer_ip = None
self.prometheus_writer_port = 9090
self.prometheus_writer_logs_dir = p.abspath(
p.join(self.instances_dir, "prometheus_writer/logs")
)
self.prometheus_reader_host = "prometheus_reader"
self.prometheus_reader_ip = None
self.prometheus_reader_port = 9091
self.prometheus_reader_logs_dir = p.abspath(
p.join(self.instances_dir, "prometheus_reader/logs")
@ -2728,6 +2730,16 @@ class ClickHouseCluster:
raise Exception("Can't wait LDAP to start")
def wait_prometheus_to_start(self):
self.prometheus_reader_ip = self.get_instance_ip(self.prometheus_reader_host)
self.prometheus_writer_ip = self.get_instance_ip(self.prometheus_writer_host)
self.wait_for_url(
f"http://{self.prometheus_reader_ip}:{self.prometheus_reader_port}/api/v1/query?query=time()"
)
self.wait_for_url(
f"http://{self.prometheus_writer_ip}:{self.prometheus_writer_port}/api/v1/query?query=time()"
)
def start(self):
pytest_xdist_logging_to_separate_files.setup()
logging.info("Running tests in {}".format(self.base_path))
@ -3083,12 +3095,23 @@ class ClickHouseCluster:
f"http://{self.jdbc_bridge_ip}:{self.jdbc_bridge_port}/ping"
)
if self.with_prometheus:
if self.with_prometheus and self.base_prometheus_cmd:
os.makedirs(self.prometheus_writer_logs_dir)
os.chmod(self.prometheus_writer_logs_dir, stat.S_IRWXU | stat.S_IRWXO)
os.makedirs(self.prometheus_reader_logs_dir)
os.chmod(self.prometheus_reader_logs_dir, stat.S_IRWXU | stat.S_IRWXO)
prometheus_start_cmd = self.base_prometheus_cmd + common_opts
logging.info(
"Trying to create Prometheus instances by command %s",
" ".join(map(str, prometheus_start_cmd)),
)
run_and_check(prometheus_start_cmd)
self.up_called = True
logging.info("Trying to connect to Prometheus...")
self.wait_prometheus_to_start()
clickhouse_start_cmd = self.base_cmd + ["up", "-d", "--no-recreate"]
logging.debug(
(

View File

@ -0,0 +1,88 @@
import pytest
from helpers.cluster import ClickHouseCluster
cluster = ClickHouseCluster(__file__)
ch1 = cluster.add_instance(
"ch1",
with_zookeeper=True,
macros={"replica": "node1"},
stay_alive=True,
)
database_name = "dedup_attach"
@pytest.fixture(scope="module")
def started_cluster():
try:
cluster.start()
yield cluster
finally:
cluster.shutdown()
def q(query):
return ch1.query(database=database_name, sql=query)
def test_deduplicated_attached_part_renamed_after_attach(started_cluster):
ch1.query(f"CREATE DATABASE {database_name}")
q(
"CREATE TABLE dedup (id UInt32) ENGINE=ReplicatedMergeTree('/clickhouse/tables/dedup_attach/dedup/s1', 'r1') ORDER BY id;"
)
q("INSERT INTO dedup VALUES (1),(2),(3);")
table_data_path = q(
"SELECT data_paths FROM system.tables WHERE database=currentDatabase() AND table='dedup'"
).strip("'[]\n")
ch1.exec_in_container(
[
"bash",
"-c",
f"cp -r {table_data_path}/all_0_0_0 {table_data_path}/detached/all_0_0_0",
]
)
# Part is attached as all_1_1_0
q("ALTER TABLE dedup ATTACH PART 'all_0_0_0'")
assert 2 == int(
q(
f"SELECT count() FROM system.parts WHERE database='{database_name}' AND table = 'dedup'"
).strip()
)
ch1.exec_in_container(
[
"bash",
"-c",
f"cp -r {table_data_path}/all_1_1_0 {table_data_path}/detached/all_1_1_0",
]
)
# Part is deduplicated and not attached
q("ALTER TABLE dedup ATTACH PART 'all_1_1_0'")
assert 2 == int(
q(
f"SELECT count() FROM system.parts WHERE database='{database_name}' AND table = 'dedup'"
).strip()
)
assert 1 == int(
q(
f"SELECT count() FROM system.detached_parts WHERE database='{database_name}' AND table = 'dedup'"
).strip()
)
# Check that it is not 'attaching_all_1_1_0'
assert (
"all_1_1_0"
== q(
f"SELECT name FROM system.detached_parts WHERE database='{database_name}' AND table = 'dedup'"
).strip()
)
q("DROP TABLE dedup")
q("SYSTEM DROP REPLICA 'r1' FROM ZKPATH '/clickhouse/tables/dedup_attach/dedup/s1'")
ch1.query(f"DROP DATABASE {database_name}")

View File

@ -20,7 +20,7 @@ node = cluster.add_instance(
def execute_query_on_prometheus_writer(query, timestamp):
return execute_query_impl(
cluster.get_instance_ip(cluster.prometheus_writer_host),
cluster.prometheus_writer_ip,
cluster.prometheus_writer_port,
"/api/v1/query",
query,
@ -30,7 +30,7 @@ def execute_query_on_prometheus_writer(query, timestamp):
def execute_query_on_prometheus_reader(query, timestamp):
return execute_query_impl(
cluster.get_instance_ip(cluster.prometheus_reader_host),
cluster.prometheus_reader_ip,
cluster.prometheus_reader_port,
"/api/v1/query",
query,

View File

@ -86,11 +86,17 @@
ReadType: InOrder
Parts: 1
Granules: 3
Virtual row conversions
Actions: INPUT :: 0 -> x UInt32 : 0
Positions: 0
-----------------
ReadFromMergeTree (default.test_index)
ReadType: InReverseOrder
Parts: 1
Granules: 3
Virtual row conversions
Actions: INPUT :: 0 -> x UInt32 : 0
Positions: 0
ReadFromMergeTree (default.idx)
Indexes:
PrimaryKey
@ -174,11 +180,19 @@
ReadType: InOrder
Parts: 1
Granules: 3
Virtual row conversions
Actions: INPUT : 0 -> x UInt32 : 0
ALIAS x :: 0 -> __table1.x UInt32 : 1
Positions: 1
-----------------
ReadFromMergeTree (default.test_index)
ReadType: InReverseOrder
Parts: 1
Granules: 3
Virtual row conversions
Actions: INPUT : 0 -> x UInt32 : 0
ALIAS x :: 0 -> __table1.x UInt32 : 1
Positions: 1
ReadFromMergeTree (default.idx)
Indexes:
PrimaryKey

View File

@ -7,7 +7,7 @@ CURDIR=$(cd "$(dirname "${BASH_SOURCE[0]}")" && pwd)
for i in $(seq 0 1)
do
CH_CLIENT="$CLICKHOUSE_CLIENT --optimize_move_to_prewhere=1 --convert_query_to_cnf=0 --optimize_read_in_order=1 --enable_analyzer=$i"
CH_CLIENT="$CLICKHOUSE_CLIENT --optimize_move_to_prewhere=1 --convert_query_to_cnf=0 --optimize_read_in_order=1 --read_in_order_use_virtual_row=1 --enable_analyzer=$i"
$CH_CLIENT -q "drop table if exists test_index"
$CH_CLIENT -q "drop table if exists idx"

View File

@ -172,7 +172,8 @@ ExpressionTransform
(Expression)
ExpressionTransform
(ReadFromMergeTree)
MergeTreeSelect(pool: ReadPoolInOrder, algorithm: InOrder) 0 → 1
VirtualRowTransform
MergeTreeSelect(pool: ReadPoolInOrder, algorithm: InOrder) 0 → 1
2020-10-10 00:00:00 0.01
2020-10-10 00:00:00 0.01
2020-10-10 00:00:00 0.01
@ -186,7 +187,8 @@ ExpressionTransform
(Expression)
ExpressionTransform
(ReadFromMergeTree)
MergeTreeSelect(pool: ReadPoolInOrder, algorithm: InOrder) 0 → 1
VirtualRowTransform
MergeTreeSelect(pool: ReadPoolInOrder, algorithm: InOrder) 0 → 1
2020-10-10 00:00:00 0.01
2020-10-10 00:00:00 0.01
2020-10-10 00:00:00 0.01

View File

@ -2,6 +2,7 @@ SET max_threads=0;
SET optimize_read_in_order=1;
SET optimize_trivial_insert_select = 1;
SET read_in_order_two_level_merge_threshold=100;
SET read_in_order_use_virtual_row = 1;
DROP TABLE IF EXISTS t_read_in_order;

View File

@ -3,3 +3,10 @@
3 2023-03-16 12:22:33 2023-03-16 10:22:33.000 2023-03-16 03:22:33 2023-03-16 19:22:33.123
2024-02-24 10:22:33 2024-02-24 12:22:33
2024-10-24 09:22:33 2024-10-24 13:22:33
2024-10-24 16:22:33 2024-10-24 06:22:33
leap year: 2024-02-29 16:22:33 2024-02-29 06:22:33
non-leap year: 2023-03-01 16:22:33 2023-03-01 06:22:33
leap year: 2024-02-29 04:22:33 2024-02-29 19:22:33
non-leap year: 2023-03-01 04:22:33 2023-02-28 19:22:33
timezone with half-hour offset: 2024-02-29 00:52:33 2024-02-29 21:52:33
jump over a year: 2024-01-01 04:01:01 2023-12-31 20:01:01

View File

@ -15,4 +15,13 @@ $CLICKHOUSE_CLIENT -q "select x, to_utc_timestamp(toDateTime('2023-03-16 11:22:3
# timestamp convert between DST timezone and UTC
$CLICKHOUSE_CLIENT -q "select to_utc_timestamp(toDateTime('2024-02-24 11:22:33'), 'Europe/Madrid'), from_utc_timestamp(toDateTime('2024-02-24 11:22:33'), 'Europe/Madrid')"
$CLICKHOUSE_CLIENT -q "select to_utc_timestamp(toDateTime('2024-10-24 11:22:33'), 'Europe/Madrid'), from_utc_timestamp(toDateTime('2024-10-24 11:22:33'), 'Europe/Madrid')"
$CLICKHOUSE_CLIENT -q "drop table test_tbl"
$CLICKHOUSE_CLIENT -q "select to_utc_timestamp(toDateTime('2024-10-24 11:22:33'), 'EST'), from_utc_timestamp(toDateTime('2024-10-24 11:22:33'), 'EST')"
$CLICKHOUSE_CLIENT -q "select 'leap year:', to_utc_timestamp(toDateTime('2024-02-29 11:22:33'), 'EST'), from_utc_timestamp(toDateTime('2024-02-29 11:22:33'), 'EST')"
$CLICKHOUSE_CLIENT -q "select 'non-leap year:', to_utc_timestamp(toDateTime('2023-02-29 11:22:33'), 'EST'), from_utc_timestamp(toDateTime('2023-02-29 11:22:33'), 'EST')"
$CLICKHOUSE_CLIENT -q "select 'leap year:', to_utc_timestamp(toDateTime('2024-02-28 23:22:33'), 'EST'), from_utc_timestamp(toDateTime('2024-03-01 00:22:33'), 'EST')"
$CLICKHOUSE_CLIENT -q "select 'non-leap year:', to_utc_timestamp(toDateTime('2023-02-28 23:22:33'), 'EST'), from_utc_timestamp(toDateTime('2023-03-01 00:22:33'), 'EST')"
$CLICKHOUSE_CLIENT -q "select 'timezone with half-hour offset:', to_utc_timestamp(toDateTime('2024-02-29 11:22:33'), 'Australia/Adelaide'), from_utc_timestamp(toDateTime('2024-02-29 11:22:33'), 'Australia/Adelaide')"
$CLICKHOUSE_CLIENT -q "select 'jump over a year:', to_utc_timestamp(toDateTime('2023-12-31 23:01:01'), 'EST'), from_utc_timestamp(toDateTime('2024-01-01 01:01:01'), 'EST')"
$CLICKHOUSE_CLIENT -q "drop table test_tbl"

View File

@ -0,0 +1,42 @@
0
1
2
3
16384
========
16385
16386
16387
16388
24576
========
0
1
2
3
16384
========
16385
16386
16387
16388
24576
========
1 2
1 2
1 3
1 3
1 4
1 4
1 2
1 2
1 3
1 3
1 4
1 4
========
1 3
1 2
1 1
-- test distinct ----
0

View File

@ -0,0 +1,218 @@
SET read_in_order_use_virtual_row = 1;
DROP TABLE IF EXISTS t;
CREATE TABLE t
(
`x` UInt64,
`y` UInt64,
`z` UInt64,
`k` UInt64
)
ENGINE = MergeTree
ORDER BY (x, y, z)
SETTINGS index_granularity = 8192,
index_granularity_bytes = 10485760;
SYSTEM STOP MERGES t;
INSERT INTO t SELECT
number,
number,
number,
number
FROM numbers(8192 * 3);
INSERT INTO t SELECT
number + (8192 * 3),
number + (8192 * 3),
number + (8192 * 3),
number
FROM numbers(8192 * 3);
-- Expecting 2 virtual rows + one chunk (8192) for result + one extra chunk for next consumption in merge transform (8192),
-- both chunks come from the same part.
SELECT x
FROM t
ORDER BY x ASC
LIMIT 4
SETTINGS max_block_size = 8192,
read_in_order_two_level_merge_threshold = 0, --force preliminary merge
max_threads = 1,
optimize_read_in_order = 1,
log_comment = 'preliminary merge, no filter';
SYSTEM FLUSH LOGS;
SELECT read_rows
FROM system.query_log
WHERE current_database = currentDatabase()
AND log_comment = 'preliminary merge, no filter'
AND type = 'QueryFinish'
ORDER BY query_start_time DESC
limit 1;
SELECT '========';
-- Expecting 2 virtual rows + two chunks (8192*2) get filtered out + one chunk for result (8192),
-- all chunks come from the same part.
SELECT k
FROM t
WHERE k > 8192 * 2
ORDER BY x ASC
LIMIT 4
SETTINGS max_block_size = 8192,
read_in_order_two_level_merge_threshold = 0, --force preliminary merge
max_threads = 1,
optimize_read_in_order = 1,
log_comment = 'preliminary merge with filter';
SYSTEM FLUSH LOGS;
SELECT read_rows
FROM system.query_log
WHERE current_database = currentDatabase()
AND log_comment = 'preliminary merge with filter'
AND type = 'QueryFinish'
ORDER BY query_start_time DESC
LIMIT 1;
SELECT '========';
-- Expecting 2 virtual rows + one chunk (8192) for result + one extra chunk for next consumption in merge transform (8192),
-- both chunks come from the same part.
SELECT x
FROM t
ORDER BY x ASC
LIMIT 4
SETTINGS max_block_size = 8192,
read_in_order_two_level_merge_threshold = 5, --avoid preliminary merge
max_threads = 1,
optimize_read_in_order = 1,
log_comment = 'no preliminary merge, no filter';
SYSTEM FLUSH LOGS;
SELECT read_rows
FROM system.query_log
WHERE current_database = currentDatabase()
AND log_comment = 'no preliminary merge, no filter'
AND type = 'QueryFinish'
ORDER BY query_start_time DESC
LIMIT 1;
SELECT '========';
-- Expecting 2 virtual rows + two chunks (8192*2) get filtered out + one chunk for result (8192),
-- all chunks come from the same part.
SELECT k
FROM t
WHERE k > 8192 * 2
ORDER BY x ASC
LIMIT 4
SETTINGS max_block_size = 8192,
read_in_order_two_level_merge_threshold = 5, --avoid preliminary merge
max_threads = 1,
optimize_read_in_order = 1,
log_comment = 'no preliminary merge, with filter';
SYSTEM FLUSH LOGS;
SELECT read_rows
FROM system.query_log
WHERE current_database = currentDatabase()
AND log_comment = 'no preliminary merge, with filter'
AND type = 'QueryFinish'
ORDER BY query_start_time DESC
LIMIT 1;
DROP TABLE t;
SELECT '========';
-- from 02149_read_in_order_fixed_prefix
DROP TABLE IF EXISTS fixed_prefix;
CREATE TABLE fixed_prefix(a UInt32, b UInt32)
ENGINE = MergeTree ORDER BY (a, b)
SETTINGS index_granularity = 3;
SYSTEM STOP MERGES fixed_prefix;
INSERT INTO fixed_prefix VALUES (0, 100), (1, 2), (1, 3), (1, 4), (2, 5);
INSERT INTO fixed_prefix VALUES (0, 100), (1, 2), (1, 3), (1, 4), (2, 5);
SELECT a, b
FROM fixed_prefix
WHERE a = 1
ORDER BY b
SETTINGS max_threads = 1,
optimize_read_in_order = 1,
read_in_order_two_level_merge_threshold = 0; --force preliminary merge
SELECT a, b
FROM fixed_prefix
WHERE a = 1
ORDER BY b
SETTINGS max_threads = 1,
optimize_read_in_order = 1,
read_in_order_two_level_merge_threshold = 5; --avoid preliminary merge
DROP TABLE fixed_prefix;
SELECT '========';
DROP TABLE IF EXISTS function_pk;
CREATE TABLE function_pk
(
`A` Int64,
`B` Int64
)
ENGINE = MergeTree ORDER BY (A, -B)
SETTINGS index_granularity = 1;
SYSTEM STOP MERGES function_pk;
INSERT INTO function_pk values(1,1);
INSERT INTO function_pk values(1,3);
INSERT INTO function_pk values(1,2);
SELECT *
FROM function_pk
ORDER BY (A,-B) ASC
limit 3
SETTINGS max_threads = 1,
optimize_read_in_order = 1,
read_in_order_two_level_merge_threshold = 5; --avoid preliminary merge
DROP TABLE function_pk;
-- modified from 02317_distinct_in_order_optimization
SELECT '-- test distinct ----';
DROP TABLE IF EXISTS distinct_in_order SYNC;
CREATE TABLE distinct_in_order
(
`a` int,
`b` int,
`c` int
)
ENGINE = MergeTree
ORDER BY (a, b)
SETTINGS index_granularity = 8192,
index_granularity_bytes = '10Mi';
SYSTEM STOP MERGES distinct_in_order;
INSERT INTO distinct_in_order SELECT
number % number,
number % 5,
number % 10
FROM numbers(1, 1000000);
SELECT DISTINCT a
FROM distinct_in_order
ORDER BY a ASC
SETTINGS read_in_order_two_level_merge_threshold = 0,
optimize_read_in_order = 1,
max_threads = 2;
DROP TABLE distinct_in_order;

View File

@ -0,0 +1,25 @@
(Expression)
ExpressionTransform
(Sorting)
MergingSortedTransform 4 → 1
(Expression)
ExpressionTransform × 4
(ReadFromMergeTree)
ExpressionTransform × 5
VirtualRowTransform
MergeTreeSelect(pool: ReadPoolInOrder, algorithm: InOrder) 0 → 1
MergingSortedTransform 2 → 1
ExpressionTransform × 2
VirtualRowTransform
MergeTreeSelect(pool: ReadPoolInOrder, algorithm: InOrder) 0 → 1
VirtualRowTransform
MergeTreeSelect(pool: ReadPoolInOrder, algorithm: InOrder) 0 → 1
MergingSortedTransform 2 → 1
ExpressionTransform × 2
VirtualRowTransform
MergeTreeSelect(pool: ReadPoolInOrder, algorithm: InOrder) 0 → 1
VirtualRowTransform
MergeTreeSelect(pool: ReadPoolInOrder, algorithm: InOrder) 0 → 1
ExpressionTransform
VirtualRowTransform
MergeTreeSelect(pool: ReadPoolInOrder, algorithm: InOrder) 0 → 1

View File

@ -0,0 +1,26 @@
-- Tags: no-random-merge-tree-settings, no-object-storage
SET optimize_read_in_order = 1, merge_tree_min_rows_for_concurrent_read = 1000, read_in_order_use_virtual_row = 1;
DROP TABLE IF EXISTS tab;
CREATE TABLE tab
(
`t` DateTime
)
ENGINE = MergeTree
ORDER BY t
SETTINGS index_granularity = 1;
SYSTEM STOP MERGES tab;
INSERT INTO tab SELECT toDateTime('2024-01-10') + number FROM numbers(10000);
INSERT INTO tab SELECT toDateTime('2024-01-30') + number FROM numbers(10000);
INSERT INTO tab SELECT toDateTime('2024-01-20') + number FROM numbers(10000);
EXPLAIN PIPELINE
SELECT *
FROM tab
ORDER BY t ASC
SETTINGS read_in_order_two_level_merge_threshold = 0, max_threads = 4, read_in_order_use_buffering = 0
FORMAT tsv;

View File

@ -0,0 +1,21 @@
-- Tags: no-parallel
-- modified from test_01155_ordinary, to test special optimization path for virtual row
DROP DATABASE IF EXISTS test_03031;
CREATE DATABASE test_03031;
USE test_03031;
SET read_in_order_use_virtual_row = 1;
CREATE TABLE src (s String) ENGINE = MergeTree() ORDER BY s;
INSERT INTO src(s) VALUES ('before moving tables');
CREATE TABLE dist (s String) ENGINE = Distributed(test_shard_localhost, test_03031, src);
SET enable_analyzer=0;
SELECT _table FROM merge('test_03031', '') ORDER BY _table, s;
DROP TABLE src;
DROP TABLE dist;
DROP DATABASE test_03031;

View File

@ -41,3 +41,13 @@
[1,2,3,4,5,10,20]
-------
[1,2,3]
-------
[10,-2,1] ['hello','hi'] [3,2,1,NULL]
-------
-------
[1]
-------
[1,2,3,4,5,6,7,8,9,10,11,12,13,14,15,16,17,18,19,20,21,22,23,24,25,26,27,28,29,30,31,32,33,34,35,36,37,38,39,40,41,42,43,44,45,46,47,48,49,50,51,52,53,54,55,56,57,58,59,60,61,62,63,64,65,66,67,68,69,70,71,72,73,74,75,76,77,78,79,80,81,82,83,84,85,86,87,88,89,90,91,92,93,94,95,96,97,98,99,100,101,102,103,104,105,106,107,108,109,110,111,112,113,114,115,116,117,118,119,120,121,122,123,124,125,126,127,128,129,130,131,132,133,134,135,136,137,138,139,140,141,142,143,144,145,146,147,148,149,150,151,152,153,154,155,156,157,158,159,160,161,162,163,164,165,166,167,168,169,170,171,172,173,174,175,176,177,178,179,180,181,182,183,184,185,186,187,188,189,190,191,192,193,194,195,196,197,198,199,200,201,202,203,204,205,206,207,208,209,210,211,212,213,214,215,216,217,218,219,220,221,222,223,224,225,226,227,228,229,230,231,232,233,234,235,236,237,238,239,240,241,242,243,244,245,246,247,248,249,250,251,252,253,254,255,256]
199999
-------
[1,2,3,4,5,6,7,8,9,10,11,12,13,14,15,16,17,18,19]

View File

@ -35,4 +35,23 @@ SELECT arraySort(arrayUnion([NULL, NULL, NULL, 1], [1, NULL, NULL], [1, 2, 3, NU
select '-------';
SELECT arraySort(arrayUnion([1, 1, 1, 2, 3], [2, 2, 4], [5, 10, 20]));
select '-------';
SELECT arraySort(arrayUnion([1, 2], [1, 3], [])),
SELECT arraySort(arrayUnion([1, 2], [1, 3], []));
select '-------';
-- example from docs
SELECT
arrayUnion([-2, 1], [10, 1], [-2], []) as num_example,
arrayUnion(['hi'], [], ['hello', 'hi']) as str_example,
arrayUnion([1, 3, NULL], [2, 3, NULL]) as null_example;
select '-------';
--mix of types
SELECT arrayUnion([1], [-2], [1.1, 'hi'], [NULL, 'hello', []]); -- {serverError NO_COMMON_TYPE}
select '-------';
SELECT arrayUnion([1]);
SELECT arrayUnion(); -- {serverError NUMBER_OF_ARGUMENTS_DOESNT_MATCH}
select '-------';
--bigger arrays
SELECT arraySort(arrayUnion(range(1, 256), range(2, 257)));
SELECT length(arrayUnion(range(1, 100000), range(9999, 200000)));
select '-------';
--bigger number of arguments
SELECT arraySort(arrayUnion([1, 2], [1, 3], [1, 4], [1, 5], [1, 6], [1, 7], [1, 8], [1, 9], [1, 10], [1, 11], [1, 12], [1, 13], [1, 14], [1, 15], [1, 16], [1, 17], [1, 18], [1, 19]));

View File

@ -1,6 +1,6 @@
quantileExactWeightedInterpolated
0 0 0 Decimal(38, 8)
-25.5 -8.49999999 -5.1 Decimal(38, 8)
0 0 0 25 2024-02-20 Decimal(38, 8)
-25.5 -8.49999999 -5.1 12.25 2024-01-25 Decimal(38, 8)
0 0 0
10 3.33333333 2
20 6.66666666 4
@ -10,11 +10,14 @@ quantileExactWeightedInterpolated
[-50,-40,-30,-20,-10,0,10,20,30,40,50]
[-16.66666666,-13.33333333,-10,-6.66666666,-3.33333333,0,3.33333333,6.66666666,10,13.33333333,16.66666666]
[-10,-8,-6,-4,-2,0,2,4,6,8,10]
[0,5,10,15,20,25,30,35,40,45,50]
['2024-01-01','2024-01-11','2024-01-21','2024-01-31','2024-02-10','2024-02-20','2024-03-01','2024-03-11','2024-03-21','2024-03-31','2024-04-10']
quantileExactWeightedInterpolatedState
[10000.6,20000.2,29999.8,39999.4]
Test with filter that returns no rows
0 0 0
0 0 0 nan 1970-01-01
0 0 0 nan 1970-01-01
Test with dynamic weights
21 7 4.2
21 7 4.2 35.5 2024-03-12
Test with all weights set to 0
0 0 0
0 0 0 nan 1970-01-01

View File

@ -5,16 +5,28 @@ CREATE TABLE decimal
a Decimal32(4),
b Decimal64(8),
c Decimal128(8),
f Float64,
d Date,
w UInt64
) ENGINE = Memory;
INSERT INTO decimal (a, b, c, w)
SELECT toDecimal32(number - 50, 4), toDecimal64(number - 50, 8) / 3, toDecimal128(number - 50, 8) / 5, number
INSERT INTO decimal (a, b, c, f, d, w)
SELECT toDecimal32(number - 50, 4), toDecimal64(number - 50, 8) / 3, toDecimal128(number - 50, 8) / 5, number/2, addDays(toDate('2024-01-01'), number), number
FROM system.numbers LIMIT 101;
SELECT 'quantileExactWeightedInterpolated';
SELECT medianExactWeightedInterpolated(a, 1), medianExactWeightedInterpolated(b, 2), medianExactWeightedInterpolated(c, 3) as x, toTypeName(x) FROM decimal;
SELECT quantileExactWeightedInterpolated(a, 1), quantileExactWeightedInterpolated(b, 2), quantileExactWeightedInterpolated(c, 3) as x, toTypeName(x) FROM decimal WHERE a < 0;
SELECT medianExactWeightedInterpolated(a, 1),
medianExactWeightedInterpolated(b, 2),
medianExactWeightedInterpolated(c, 3) as x,
medianExactWeightedInterpolated(f, 4),
medianExactWeightedInterpolated(d, 5),
toTypeName(x) FROM decimal;
SELECT quantileExactWeightedInterpolated(a, 1),
quantileExactWeightedInterpolated(b, 2),
quantileExactWeightedInterpolated(c, 3) as x,
quantileExactWeightedInterpolated(f, 4),
quantileExactWeightedInterpolated(d, 5),
toTypeName(x) FROM decimal WHERE a < 0;
SELECT quantileExactWeightedInterpolated(0.0)(a, 1), quantileExactWeightedInterpolated(0.0)(b, 2), quantileExactWeightedInterpolated(0.0)(c, 3) FROM decimal WHERE a >= 0;
SELECT quantileExactWeightedInterpolated(0.2)(a, 1), quantileExactWeightedInterpolated(0.2)(b, 2), quantileExactWeightedInterpolated(0.2)(c, 3) FROM decimal WHERE a >= 0;
SELECT quantileExactWeightedInterpolated(0.4)(a, 1), quantileExactWeightedInterpolated(0.4)(b, 2), quantileExactWeightedInterpolated(0.4)(c, 3) FROM decimal WHERE a >= 0;
@ -24,6 +36,8 @@ SELECT quantileExactWeightedInterpolated(1.0)(a, 1), quantileExactWeightedInterp
SELECT quantilesExactWeightedInterpolated(0.0, 0.1, 0.2, 0.3, 0.4, 0.5, 0.6, 0.7, 0.8, 0.9, 1.0)(a, 1) FROM decimal;
SELECT quantilesExactWeightedInterpolated(0.0, 0.1, 0.2, 0.3, 0.4, 0.5, 0.6, 0.7, 0.8, 0.9, 1.0)(b, 2) FROM decimal;
SELECT quantilesExactWeightedInterpolated(0.0, 0.1, 0.2, 0.3, 0.4, 0.5, 0.6, 0.7, 0.8, 0.9, 1.0)(c, 3) FROM decimal;
SELECT quantilesExactWeightedInterpolated(0.0, 0.1, 0.2, 0.3, 0.4, 0.5, 0.6, 0.7, 0.8, 0.9, 1.0)(f, 4) FROM decimal;
SELECT quantilesExactWeightedInterpolated(0.0, 0.1, 0.2, 0.3, 0.4, 0.5, 0.6, 0.7, 0.8, 0.9, 1.0)(d, 5) FROM decimal;
SELECT 'quantileExactWeightedInterpolatedState';
SELECT quantilesExactWeightedInterpolatedMerge(0.2, 0.4, 0.6, 0.8)(x)
@ -34,12 +48,29 @@ FROM
);
SELECT 'Test with filter that returns no rows';
SELECT medianExactWeightedInterpolated(a, 1), medianExactWeightedInterpolated(b, 2), medianExactWeightedInterpolated(c, 3) FROM decimal WHERE a > 1000;
SELECT medianExactWeightedInterpolated(a, 1),
medianExactWeightedInterpolated(b, 2),
medianExactWeightedInterpolated(c, 3),
medianExactWeightedInterpolated(f, 4),
medianExactWeightedInterpolated(d, 5) FROM decimal WHERE a > 1000;
SELECT quantileExactWeightedInterpolated(a, 1),
quantileExactWeightedInterpolated(b, 2),
quantileExactWeightedInterpolated(c, 3),
quantileExactWeightedInterpolated(f, 4),
quantileExactWeightedInterpolated(d, 5) FROM decimal WHERE d < toDate('2024-01-01');
SELECT 'Test with dynamic weights';
SELECT medianExactWeightedInterpolated(a, w), medianExactWeightedInterpolated(b, w), medianExactWeightedInterpolated(c, w) FROM decimal;
SELECT medianExactWeightedInterpolated(a, w),
medianExactWeightedInterpolated(b, w),
medianExactWeightedInterpolated(c, w),
medianExactWeightedInterpolated(f, w),
medianExactWeightedInterpolated(d, w) FROM decimal;
SELECT 'Test with all weights set to 0';
SELECT medianExactWeightedInterpolated(a, 0), medianExactWeightedInterpolated(b, 0), medianExactWeightedInterpolated(c, 0) FROM decimal;
SELECT medianExactWeightedInterpolated(a, 0),
medianExactWeightedInterpolated(b, 0),
medianExactWeightedInterpolated(c, 0),
medianExactWeightedInterpolated(f, 0),
medianExactWeightedInterpolated(d, 0) FROM decimal;
DROP TABLE IF EXISTS decimal;

View File

@ -0,0 +1,12 @@
number Nullable(Int64)
date_field Nullable(Date32)
\N
1970-01-02
\N
1970-01-04
\N
1970-01-06
\N
1970-01-08
\N
1970-01-10

View File

@ -0,0 +1,14 @@
-- Tags: no-fasttest, no-parallel
SET session_timezone = 'UTC';
SET engine_file_truncate_on_insert = 1;
insert into function file('03259.orc', 'ORC')
select
number,
if (number % 2 = 0, null, toDate32(number)) as date_field
from numbers(10);
desc file('03259.orc', 'ORC');
select date_field from file('03259.orc', 'ORC') order by number;

View File

@ -0,0 +1,7 @@
SET allow_suspicious_low_cardinality_types = 1, allow_experimental_dynamic_type = 1;
DROP TABLE IF EXISTS t0;
CREATE TABLE t0 (c0 LowCardinality(Nullable(Int))) ENGINE = Memory();
INSERT INTO TABLE t0 (c0) VALUES (NULL);
SELECT c0::Dynamic FROM t0;
SELECT c0 FROM t0;
DROP TABLE t0;

View File

@ -0,0 +1,10 @@
-- https://github.com/ClickHouse/ClickHouse/issues/71382
DROP TABLE IF EXISTS rewrite;
CREATE TABLE rewrite (c0 Int) ENGINE = Memory();
SELECT 1
FROM rewrite
INNER JOIN rewrite AS y ON (
SELECT 1
)
INNER JOIN rewrite AS z ON 1
SETTINGS optimize_rewrite_array_exists_to_has=1;

View File

@ -0,0 +1,2 @@
1
1

View File

@ -0,0 +1,96 @@
#!/usr/bin/env bash
set -e
CUR_DIR=$(cd "$(dirname "${BASH_SOURCE[0]}")" && pwd)
# shellcheck source=../shell_config.sh
. "$CUR_DIR"/../shell_config.sh
$CLICKHOUSE_CLIENT -q "
CREATE TABLE t
(
a UInt32,
b UInt32
)
ENGINE = MergeTree
ORDER BY (a, b);
INSERT INTO t SELECT number, number FROM numbers(1000);
"
query_id="03270_processors_profile_log_3_$RANDOM"
$CLICKHOUSE_CLIENT --query_id="$query_id" -q "
SET log_processors_profiles = 1;
WITH
t0 AS
(
SELECT *
FROM numbers(1000)
),
t1 AS
(
SELECT number * 3 AS b
FROM t0
)
SELECT b * 3
FROM t
WHERE a IN (t1)
FORMAT Null;
"
$CLICKHOUSE_CLIENT --query_id="$query_id" -q "
SYSTEM FLUSH LOGS;
SELECT sum(elapsed_us) > 0
FROM system.processors_profile_log
WHERE event_date >= yesterday() AND query_id = '$query_id' AND name = 'CreatingSetsTransform';
"
#####################################################################
$CLICKHOUSE_CLIENT -q "
CREATE TABLE t1
(
st FixedString(54)
)
ENGINE = MergeTree
ORDER BY tuple();
INSERT INTO t1 VALUES
('abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRTUVWXYZ'),
('\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0'),
('IIIIIIIIII\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0');
"
query_id="03270_processors_profile_log_3_$RANDOM"
$CLICKHOUSE_CLIENT --query_id="$query_id" -q "
SET log_processors_profiles = 1;
SET max_threads=2; -- no merging when max_threads=1
WITH
(
SELECT groupConcat(',')(st)
FROM t1
ORDER BY ALL
) AS a,
(
SELECT groupConcat(',')(CAST(st, 'String'))
FROM t1
ORDER BY ALL
) AS b
SELECT a = b
FORMAT Null;
"
$CLICKHOUSE_CLIENT --query_id="$query_id" -q "
SYSTEM FLUSH LOGS;
SELECT sum(elapsed_us) > 0
FROM system.processors_profile_log
WHERE event_date >= yesterday() AND query_id = '$query_id' AND name = 'MergingSortedTransform';
"