From 1d15d72211f19273793dea91d1c750333f7bf366 Mon Sep 17 00:00:00 2001 From: Dmitry Novik Date: Mon, 27 Jun 2022 18:42:26 +0000 Subject: [PATCH 01/34] Support NULLs in ROLLUP --- src/Core/Settings.h | 2 + src/DataTypes/IDataType.h | 6 ++ src/Interpreters/ExpressionAnalyzer.cpp | 40 +++++++++++++- src/Interpreters/ExpressionAnalyzer.h | 3 + src/Interpreters/InterpreterSelectQuery.cpp | 1 + src/Processors/QueryPlan/AggregatingStep.cpp | 55 +++++++++++++++---- src/Processors/QueryPlan/AggregatingStep.h | 3 + src/Processors/QueryPlan/RollupStep.cpp | 2 +- src/Processors/Transforms/RollupTransform.cpp | 31 ++++++++--- src/Processors/Transforms/RollupTransform.h | 7 ++- .../02343_group_by_use_nulls.reference | 21 +++++++ .../0_stateless/02343_group_by_use_nulls.sql | 4 ++ 12 files changed, 152 insertions(+), 23 deletions(-) create mode 100644 tests/queries/0_stateless/02343_group_by_use_nulls.reference create mode 100644 tests/queries/0_stateless/02343_group_by_use_nulls.sql diff --git a/src/Core/Settings.h b/src/Core/Settings.h index 756e6eb651e..5972ca4a9a3 100644 --- a/src/Core/Settings.h +++ b/src/Core/Settings.h @@ -129,6 +129,8 @@ static constexpr UInt64 operator""_GiB(unsigned long long value) M(UInt64, aggregation_memory_efficient_merge_threads, 0, "Number of threads to use for merge intermediate aggregation results in memory efficient mode. When bigger, then more memory is consumed. 0 means - same as 'max_threads'.", 0) \ M(Bool, enable_positional_arguments, false, "Enable positional arguments in ORDER BY, GROUP BY and LIMIT BY", 0) \ \ + M(Bool, group_by_use_nulls, false, "Treat columns mentioned in ROLLUP, CUBE or GROUPING SETS as Nullable", 0) \ + \ M(UInt64, max_parallel_replicas, 1, "The maximum number of replicas of each shard used when the query is executed. For consistency (to get different parts of the same partition), this option only works for the specified sampling key. The lag of the replicas is not controlled.", 0) \ M(UInt64, parallel_replicas_count, 0, "", 0) \ M(UInt64, parallel_replica_offset, 0, "", 0) \ diff --git a/src/DataTypes/IDataType.h b/src/DataTypes/IDataType.h index 420ef61a13f..08c8fd74f3e 100644 --- a/src/DataTypes/IDataType.h +++ b/src/DataTypes/IDataType.h @@ -532,6 +532,12 @@ inline bool isBool(const DataTypePtr & data_type) return data_type->getName() == "Bool"; } +inline bool isAggregateFunction(const DataTypePtr & data_type) +{ + WhichDataType which(data_type); + return which.isAggregateFunction(); +} + template constexpr bool IsDataTypeDecimal = false; template constexpr bool IsDataTypeNumber = false; template constexpr bool IsDataTypeDateOrDateTime = false; diff --git a/src/Interpreters/ExpressionAnalyzer.cpp b/src/Interpreters/ExpressionAnalyzer.cpp index cfe1167c36c..07b2a1ce1f9 100644 --- a/src/Interpreters/ExpressionAnalyzer.cpp +++ b/src/Interpreters/ExpressionAnalyzer.cpp @@ -41,8 +41,12 @@ #include +#include "Common/logger_useful.h" #include #include +#include "Columns/ColumnNullable.h" +#include "Core/ColumnsWithTypeAndName.h" +#include "DataTypes/IDataType.h" #include #include #include @@ -64,6 +68,7 @@ #include #include #include +#include namespace DB { @@ -393,7 +398,7 @@ void ExpressionAnalyzer::analyzeAggregation(ActionsDAGPtr & temp_actions) } } - NameAndTypePair key{column_name, node->result_type}; + NameAndTypePair key{column_name, makeNullable(node->result_type)}; grouping_set_list.push_back(key); @@ -447,7 +452,7 @@ void ExpressionAnalyzer::analyzeAggregation(ActionsDAGPtr & temp_actions) } } - NameAndTypePair key{column_name, node->result_type}; + NameAndTypePair key = select_query->group_by_with_rollup || select_query->group_by_with_cube ? NameAndTypePair{ column_name, makeNullable(node->result_type) } : NameAndTypePair{column_name, node->result_type}; /// Aggregation keys are uniqued. if (!unique_keys.contains(key.name)) @@ -1418,6 +1423,28 @@ void SelectQueryExpressionAnalyzer::appendExpressionsAfterWindowFunctions(Expres } } +void SelectQueryExpressionAnalyzer::appendGroupByModifiers(ActionsDAGPtr & before_aggregation, ExpressionActionsChain & chain, bool /* only_types */) +{ + const auto * select_query = getAggregatingQuery(); + + if (!select_query->groupBy() || !(select_query->group_by_with_rollup || select_query->group_by_with_cube)) + return; + + auto source_columns = before_aggregation->getResultColumns(); + ColumnsWithTypeAndName result_columns; + + for (const auto & source_column : source_columns) + { + if (isAggregateFunction(source_column.type)) + result_columns.push_back(source_column); + else + result_columns.emplace_back(makeNullable(source_column.type), source_column.name); + } + ExpressionActionsChain::Step & step = chain.lastStep(before_aggregation->getNamesAndTypesList()); + + step.actions() = ActionsDAG::makeConvertingActions(source_columns, result_columns, ActionsDAG::MatchColumnsMode::Position); +} + bool SelectQueryExpressionAnalyzer::appendHaving(ExpressionActionsChain & chain, bool only_types) { const auto * select_query = getAggregatingQuery(); @@ -1597,6 +1624,8 @@ ActionsDAGPtr SelectQueryExpressionAnalyzer::appendProjectResult(ExpressionActio ExpressionActionsChain::Step & step = chain.lastStep(aggregated_columns); + LOG_DEBUG(&Poco::Logger::get("SelectQueryExpressionAnalyzer"), "Before output: {}", step.actions()->getNamesAndTypesList().toString()); + NamesWithAliases result_columns; ASTs asts = select_query->select()->children; @@ -1638,7 +1667,11 @@ ActionsDAGPtr SelectQueryExpressionAnalyzer::appendProjectResult(ExpressionActio } auto actions = chain.getLastActions(); + LOG_DEBUG(&Poco::Logger::get("SelectQueryExpressionAnalyzer"), "Before projection: {}", actions->getNamesAndTypesList().toString()); + actions->project(result_columns); + LOG_DEBUG(&Poco::Logger::get("SelectQueryExpressionAnalyzer"), "After projection: {}", actions->getNamesAndTypesList().toString()); + return actions; } @@ -1862,6 +1895,9 @@ ExpressionAnalysisResult::ExpressionAnalysisResult( query_analyzer.appendAggregateFunctionsArguments(chain, only_types || !first_stage); before_aggregation = chain.getLastActions(); + before_aggregation_with_nullable = chain.getLastActions(); + query_analyzer.appendGroupByModifiers(before_aggregation, chain, only_types); + finalize_chain(chain); if (query_analyzer.appendHaving(chain, only_types || !second_stage)) diff --git a/src/Interpreters/ExpressionAnalyzer.h b/src/Interpreters/ExpressionAnalyzer.h index 6c27d8c6760..7a183e865c0 100644 --- a/src/Interpreters/ExpressionAnalyzer.h +++ b/src/Interpreters/ExpressionAnalyzer.h @@ -245,6 +245,7 @@ struct ExpressionAnalysisResult JoinPtr join; ActionsDAGPtr before_where; ActionsDAGPtr before_aggregation; + ActionsDAGPtr before_aggregation_with_nullable; ActionsDAGPtr before_having; String having_column_name; bool remove_having_filter = false; @@ -410,6 +411,8 @@ private: void appendExpressionsAfterWindowFunctions(ExpressionActionsChain & chain, bool only_types); + void appendGroupByModifiers(ActionsDAGPtr & before_aggregation, ExpressionActionsChain & chain, bool only_types); + /// After aggregation: bool appendHaving(ExpressionActionsChain & chain, bool only_types); /// appendSelect diff --git a/src/Interpreters/InterpreterSelectQuery.cpp b/src/Interpreters/InterpreterSelectQuery.cpp index ec7c3878b06..feae7ac6a21 100644 --- a/src/Interpreters/InterpreterSelectQuery.cpp +++ b/src/Interpreters/InterpreterSelectQuery.cpp @@ -582,6 +582,7 @@ InterpreterSelectQuery::InterpreterSelectQuery( /// Calculate structure of the result. result_header = getSampleBlockImpl(); + LOG_DEBUG(&Poco::Logger::get("InterpreterSelectQuery"), "Result header: {}", result_header.dumpStructure()); }; analyze(shouldMoveToPrewhere()); diff --git a/src/Processors/QueryPlan/AggregatingStep.cpp b/src/Processors/QueryPlan/AggregatingStep.cpp index 17a0498fb7e..a0f5fce908b 100644 --- a/src/Processors/QueryPlan/AggregatingStep.cpp +++ b/src/Processors/QueryPlan/AggregatingStep.cpp @@ -11,10 +11,13 @@ #include #include #include +#include #include #include #include #include +#include "Core/ColumnNumbers.h" +#include "DataTypes/IDataType.h" namespace DB { @@ -46,22 +49,44 @@ Block appendGroupingSetColumn(Block header) return res; } +Block generateOutputHeader(const Block & input_header) +{ + auto header = appendGroupingSetColumn(input_header); + for (size_t i = 1; i < header.columns(); ++i) + { + auto & column = header.getByPosition(i); + + if (!isAggregateFunction(column.type)) + { + column.type = makeNullable(column.type); + column.column = makeNullable(column.column); + } + } + return header; +} + +Block generateOutputHeader(const Block & input_header, const ColumnNumbers & keys) +{ + auto header = appendGroupingSetColumn(input_header); + for (auto key : keys) + { + auto & column = header.getByPosition(key + 1); + + if (!isAggregateFunction(column.type)) + { + column.type = makeNullable(column.type); + column.column = makeNullable(column.column); + } + } + return header; +} + static Block appendGroupingColumn(Block block, const GroupingSetsParamsList & params) { if (params.empty()) return block; - Block res; - - size_t rows = block.rows(); - auto column = ColumnUInt64::create(rows); - - res.insert({ColumnPtr(std::move(column)), std::make_shared(), "__grouping_set"}); - - for (auto & col : block) - res.insert(std::move(col)); - - return res; + return generateOutputHeader(block); } AggregatingStep::AggregatingStep( @@ -249,7 +274,13 @@ void AggregatingStep::transformPipeline(QueryPipelineBuilder & pipeline, const B index.push_back(node); } else - index.push_back(dag->getIndex()[header.getPositionByName(col.name)]); + { + const auto * column_node = dag->getIndex()[header.getPositionByName(col.name)]; + // index.push_back(dag->getIndex()[header.getPositionByName(col.name)]); + + const auto * node = &dag->addFunction(FunctionFactory::instance().get("toNullable", nullptr), { column_node }, col.name); + index.push_back(node); + } } dag->getIndex().swap(index); diff --git a/src/Processors/QueryPlan/AggregatingStep.h b/src/Processors/QueryPlan/AggregatingStep.h index 4dd3d956350..3d024a99063 100644 --- a/src/Processors/QueryPlan/AggregatingStep.h +++ b/src/Processors/QueryPlan/AggregatingStep.h @@ -3,6 +3,7 @@ #include #include #include +#include "Core/ColumnNumbers.h" namespace DB { @@ -26,6 +27,8 @@ struct GroupingSetsParams using GroupingSetsParamsList = std::vector; Block appendGroupingSetColumn(Block header); +Block generateOutputHeader(const Block & input_header); +Block generateOutputHeader(const Block & input_header, const ColumnNumbers & keys); /// Aggregation. See AggregatingTransform. class AggregatingStep : public ITransformingStep diff --git a/src/Processors/QueryPlan/RollupStep.cpp b/src/Processors/QueryPlan/RollupStep.cpp index 3b061f9c246..5109a5ce169 100644 --- a/src/Processors/QueryPlan/RollupStep.cpp +++ b/src/Processors/QueryPlan/RollupStep.cpp @@ -23,7 +23,7 @@ static ITransformingStep::Traits getTraits() } RollupStep::RollupStep(const DataStream & input_stream_, AggregatingTransformParamsPtr params_) - : ITransformingStep(input_stream_, appendGroupingSetColumn(params_->getHeader()), getTraits()) + : ITransformingStep(input_stream_, generateOutputHeader(params_->getHeader(), params_->params.keys), getTraits()) , params(std::move(params_)) , keys_size(params->params.keys_size) { diff --git a/src/Processors/Transforms/RollupTransform.cpp b/src/Processors/Transforms/RollupTransform.cpp index b69a691323c..6ac5ae35fa2 100644 --- a/src/Processors/Transforms/RollupTransform.cpp +++ b/src/Processors/Transforms/RollupTransform.cpp @@ -1,16 +1,24 @@ #include #include #include +#include +#include "Common/logger_useful.h" +#include "Columns/ColumnNullable.h" namespace DB { RollupTransform::RollupTransform(Block header, AggregatingTransformParamsPtr params_) - : IAccumulatingTransform(std::move(header), appendGroupingSetColumn(params_->getHeader())) + : IAccumulatingTransform(std::move(header), generateOutputHeader(params_->getHeader(), params_->params.keys)) , params(std::move(params_)) , keys(params->params.keys) , aggregates_mask(getAggregatesMask(params->getHeader(), params->params.aggregates)) { + auto output_aggregator_params = params->params; + intermediate_header = getOutputPort().getHeader(); + intermediate_header.erase(0); + output_aggregator_params.src_header = intermediate_header; + output_aggregator = std::make_unique(output_aggregator_params); } void RollupTransform::consume(Chunk chunk) @@ -18,13 +26,14 @@ void RollupTransform::consume(Chunk chunk) consumed_chunks.emplace_back(std::move(chunk)); } -Chunk RollupTransform::merge(Chunks && chunks, bool final) +Chunk RollupTransform::merge(Chunks && chunks, bool is_input, bool final) { BlocksList rollup_blocks; + auto header = is_input ? getInputPort().getHeader() : intermediate_header; for (auto & chunk : chunks) - rollup_blocks.emplace_back(getInputPort().getHeader().cloneWithColumns(chunk.detachColumns())); + rollup_blocks.emplace_back(header.cloneWithColumns(chunk.detachColumns())); - auto rollup_block = params->aggregator.mergeBlocks(rollup_blocks, final); + auto rollup_block = is_input ? params->aggregator.mergeBlocks(rollup_blocks, final) : output_aggregator->mergeBlocks(rollup_blocks, final); auto num_rows = rollup_block.rows(); return Chunk(rollup_block.getColumns(), num_rows); } @@ -42,9 +51,16 @@ Chunk RollupTransform::generate() if (!consumed_chunks.empty()) { if (consumed_chunks.size() > 1) - rollup_chunk = merge(std::move(consumed_chunks), false); + rollup_chunk = merge(std::move(consumed_chunks), true, false); else rollup_chunk = std::move(consumed_chunks.front()); + + size_t rows = rollup_chunk.getNumRows(); + auto columns = rollup_chunk.getColumns(); + for (auto key : keys) + columns[key] = makeNullable(columns[key]); + rollup_chunk = Chunk{ columns, rows }; + LOG_DEBUG(&Poco::Logger::get("RollupTransform"), "Chunk source: {}", rollup_chunk.dumpStructure()); consumed_chunks.clear(); last_removed_key = keys.size(); @@ -59,11 +75,12 @@ Chunk RollupTransform::generate() auto num_rows = gen_chunk.getNumRows(); auto columns = gen_chunk.getColumns(); - columns[key] = getColumnWithDefaults(getInputPort().getHeader(), key, num_rows); + columns[key] = getColumnWithDefaults(intermediate_header, key, num_rows); Chunks chunks; chunks.emplace_back(std::move(columns), num_rows); - rollup_chunk = merge(std::move(chunks), false); + rollup_chunk = merge(std::move(chunks), false, false); + LOG_DEBUG(&Poco::Logger::get("RollupTransform"), "Chunk generated: {}", rollup_chunk.dumpStructure()); } finalizeChunk(gen_chunk, aggregates_mask); diff --git a/src/Processors/Transforms/RollupTransform.h b/src/Processors/Transforms/RollupTransform.h index 8fd27e3e6a2..8b66c85e0b5 100644 --- a/src/Processors/Transforms/RollupTransform.h +++ b/src/Processors/Transforms/RollupTransform.h @@ -1,4 +1,5 @@ #pragma once +#include #include #include #include @@ -23,12 +24,16 @@ private: const ColumnNumbers keys; const ColumnsMask aggregates_mask; + std::unique_ptr output_aggregator; + + Block intermediate_header; + Chunks consumed_chunks; Chunk rollup_chunk; size_t last_removed_key = 0; size_t set_counter = 0; - Chunk merge(Chunks && chunks, bool final); + Chunk merge(Chunks && chunks, bool is_input, bool final); }; } diff --git a/tests/queries/0_stateless/02343_group_by_use_nulls.reference b/tests/queries/0_stateless/02343_group_by_use_nulls.reference new file mode 100644 index 00000000000..0d7fa8f3a3b --- /dev/null +++ b/tests/queries/0_stateless/02343_group_by_use_nulls.reference @@ -0,0 +1,21 @@ +0 0 0 +0 \N 0 +1 1 1 +1 \N 1 +2 0 2 +2 \N 2 +3 1 3 +3 \N 3 +4 0 4 +4 \N 4 +5 1 5 +5 \N 5 +6 0 6 +6 \N 6 +7 1 7 +7 \N 7 +8 0 8 +8 \N 8 +9 1 9 +9 \N 9 +\N \N 45 diff --git a/tests/queries/0_stateless/02343_group_by_use_nulls.sql b/tests/queries/0_stateless/02343_group_by_use_nulls.sql new file mode 100644 index 00000000000..1107ae79244 --- /dev/null +++ b/tests/queries/0_stateless/02343_group_by_use_nulls.sql @@ -0,0 +1,4 @@ +SELECT number, number % 2, sum(number) AS val +FROM numbers(10) +GROUP BY ROLLUP(number, number % 2) +ORDER BY (number, number % 2, val); From 98e9bc84d5a95259112ecff815228c62ce08c9da Mon Sep 17 00:00:00 2001 From: Dmitry Novik Date: Thu, 30 Jun 2022 10:13:58 +0000 Subject: [PATCH 02/34] Refector ROLLUP and CUBE --- src/Interpreters/InterpreterSelectQuery.cpp | 4 +- src/Processors/QueryPlan/AggregatingStep.cpp | 21 +++-- src/Processors/QueryPlan/AggregatingStep.h | 2 +- src/Processors/QueryPlan/CubeStep.cpp | 2 +- src/Processors/QueryPlan/RollupStep.cpp | 2 +- .../Transforms/AggregatingTransform.h | 4 +- src/Processors/Transforms/CubeTransform.cpp | 41 ++------- src/Processors/Transforms/CubeTransform.h | 10 +-- src/Processors/Transforms/RollupTransform.cpp | 88 ++++++++++++------- src/Processors/Transforms/RollupTransform.h | 40 ++++++--- .../02343_group_by_use_nulls.reference | 88 +++++++++++++++++++ .../0_stateless/02343_group_by_use_nulls.sql | 22 ++++- 12 files changed, 221 insertions(+), 103 deletions(-) diff --git a/src/Interpreters/InterpreterSelectQuery.cpp b/src/Interpreters/InterpreterSelectQuery.cpp index feae7ac6a21..a29684288cf 100644 --- a/src/Interpreters/InterpreterSelectQuery.cpp +++ b/src/Interpreters/InterpreterSelectQuery.cpp @@ -1618,7 +1618,7 @@ static void executeMergeAggregatedImpl( Aggregator::Params params(header_before_merge, keys, aggregates, overflow_row, settings.max_threads); - auto transform_params = std::make_shared(params, final); + auto transform_params = std::make_shared(params, final, false); auto merging_aggregated = std::make_unique( query_plan.getCurrentDataStream(), @@ -2363,7 +2363,7 @@ void InterpreterSelectQuery::executeRollupOrCube(QueryPlan & query_plan, Modific keys.push_back(header_before_transform.getPositionByName(key.name)); auto params = getAggregatorParams(query_ptr, *query_analyzer, *context, header_before_transform, keys, query_analyzer->aggregates(), false, settings, 0, 0); - auto transform_params = std::make_shared(std::move(params), true); + auto transform_params = std::make_shared(std::move(params), true, settings.group_by_use_nulls); QueryPlanStepPtr step; if (modificator == Modificator::ROLLUP) diff --git a/src/Processors/QueryPlan/AggregatingStep.cpp b/src/Processors/QueryPlan/AggregatingStep.cpp index a0f5fce908b..a038b5cf302 100644 --- a/src/Processors/QueryPlan/AggregatingStep.cpp +++ b/src/Processors/QueryPlan/AggregatingStep.cpp @@ -65,17 +65,20 @@ Block generateOutputHeader(const Block & input_header) return header; } -Block generateOutputHeader(const Block & input_header, const ColumnNumbers & keys) +Block generateOutputHeader(const Block & input_header, const ColumnNumbers & keys, bool use_nulls) { auto header = appendGroupingSetColumn(input_header); - for (auto key : keys) + if (use_nulls) { - auto & column = header.getByPosition(key + 1); - - if (!isAggregateFunction(column.type)) + for (auto key : keys) { - column.type = makeNullable(column.type); - column.column = makeNullable(column.column); + auto & column = header.getByPosition(key + 1); + + if (!isAggregateFunction(column.type)) + { + column.type = makeNullable(column.type); + column.column = makeNullable(column.column); + } } } return header; @@ -144,7 +147,7 @@ void AggregatingStep::transformPipeline(QueryPipelineBuilder & pipeline, const B * 1. Parallel aggregation is done, and the results should be merged in parallel. * 2. An aggregation is done with store of temporary data on the disk, and they need to be merged in a memory efficient way. */ - auto transform_params = std::make_shared(std::move(params), final); + auto transform_params = std::make_shared(std::move(params), final, false); if (!grouping_sets_params.empty()) { @@ -194,7 +197,7 @@ void AggregatingStep::transformPipeline(QueryPipelineBuilder & pipeline, const B transform_params->params.intermediate_header, transform_params->params.stats_collecting_params }; - auto transform_params_for_set = std::make_shared(std::move(params_for_set), final); + auto transform_params_for_set = std::make_shared(std::move(params_for_set), final, false); if (streams > 1) { diff --git a/src/Processors/QueryPlan/AggregatingStep.h b/src/Processors/QueryPlan/AggregatingStep.h index 3d024a99063..c0be99403e9 100644 --- a/src/Processors/QueryPlan/AggregatingStep.h +++ b/src/Processors/QueryPlan/AggregatingStep.h @@ -28,7 +28,7 @@ using GroupingSetsParamsList = std::vector; Block appendGroupingSetColumn(Block header); Block generateOutputHeader(const Block & input_header); -Block generateOutputHeader(const Block & input_header, const ColumnNumbers & keys); +Block generateOutputHeader(const Block & input_header, const ColumnNumbers & keys, bool use_nulls); /// Aggregation. See AggregatingTransform. class AggregatingStep : public ITransformingStep diff --git a/src/Processors/QueryPlan/CubeStep.cpp b/src/Processors/QueryPlan/CubeStep.cpp index 91c85a08412..9ea0d1c9995 100644 --- a/src/Processors/QueryPlan/CubeStep.cpp +++ b/src/Processors/QueryPlan/CubeStep.cpp @@ -25,7 +25,7 @@ static ITransformingStep::Traits getTraits() } CubeStep::CubeStep(const DataStream & input_stream_, AggregatingTransformParamsPtr params_) - : ITransformingStep(input_stream_, appendGroupingSetColumn(params_->getHeader()), getTraits()) + : ITransformingStep(input_stream_, generateOutputHeader(params_->getHeader(), params_->params.keys, params_->use_nulls), getTraits()) , keys_size(params_->params.keys_size) , params(std::move(params_)) { diff --git a/src/Processors/QueryPlan/RollupStep.cpp b/src/Processors/QueryPlan/RollupStep.cpp index 5109a5ce169..130525bfacb 100644 --- a/src/Processors/QueryPlan/RollupStep.cpp +++ b/src/Processors/QueryPlan/RollupStep.cpp @@ -23,7 +23,7 @@ static ITransformingStep::Traits getTraits() } RollupStep::RollupStep(const DataStream & input_stream_, AggregatingTransformParamsPtr params_) - : ITransformingStep(input_stream_, generateOutputHeader(params_->getHeader(), params_->params.keys), getTraits()) + : ITransformingStep(input_stream_, generateOutputHeader(params_->getHeader(), params_->params.keys, params_->use_nulls), getTraits()) , params(std::move(params_)) , keys_size(params->params.keys_size) { diff --git a/src/Processors/Transforms/AggregatingTransform.h b/src/Processors/Transforms/AggregatingTransform.h index bfc3904e5d8..5438bc32ed9 100644 --- a/src/Processors/Transforms/AggregatingTransform.h +++ b/src/Processors/Transforms/AggregatingTransform.h @@ -35,12 +35,14 @@ struct AggregatingTransformParams Aggregator & aggregator; bool final; bool only_merge = false; + bool use_nulls = false; - AggregatingTransformParams(const Aggregator::Params & params_, bool final_) + AggregatingTransformParams(const Aggregator::Params & params_, bool final_, bool use_nulls_) : params(params_) , aggregator_list_ptr(std::make_shared()) , aggregator(*aggregator_list_ptr->emplace(aggregator_list_ptr->end(), params)) , final(final_) + , use_nulls(use_nulls_) { } diff --git a/src/Processors/Transforms/CubeTransform.cpp b/src/Processors/Transforms/CubeTransform.cpp index 83ed346dabe..40b096e88f4 100644 --- a/src/Processors/Transforms/CubeTransform.cpp +++ b/src/Processors/Transforms/CubeTransform.cpp @@ -1,6 +1,7 @@ #include #include #include +#include "Processors/Transforms/RollupTransform.h" namespace DB { @@ -10,57 +11,31 @@ namespace ErrorCodes } CubeTransform::CubeTransform(Block header, AggregatingTransformParamsPtr params_) - : IAccumulatingTransform(std::move(header), appendGroupingSetColumn(params_->getHeader())) - , params(std::move(params_)) - , keys(params->params.keys) + : GroupByModifierTransform(std::move(header), params_) , aggregates_mask(getAggregatesMask(params->getHeader(), params->params.aggregates)) { if (keys.size() >= 8 * sizeof(mask)) throw Exception("Too many keys are used for CubeTransform.", ErrorCodes::LOGICAL_ERROR); } -Chunk CubeTransform::merge(Chunks && chunks, bool final) -{ - BlocksList rollup_blocks; - for (auto & chunk : chunks) - rollup_blocks.emplace_back(getInputPort().getHeader().cloneWithColumns(chunk.detachColumns())); - - auto rollup_block = params->aggregator.mergeBlocks(rollup_blocks, final); - auto num_rows = rollup_block.rows(); - return Chunk(rollup_block.getColumns(), num_rows); -} - -void CubeTransform::consume(Chunk chunk) -{ - consumed_chunks.emplace_back(std::move(chunk)); -} - -MutableColumnPtr getColumnWithDefaults(Block const & header, size_t key, size_t n); - Chunk CubeTransform::generate() { if (!consumed_chunks.empty()) { - if (consumed_chunks.size() > 1) - cube_chunk = merge(std::move(consumed_chunks), false); - else - cube_chunk = std::move(consumed_chunks.front()); + mergeConsumed(); - consumed_chunks.clear(); - - auto num_rows = cube_chunk.getNumRows(); + auto num_rows = current_chunk.getNumRows(); mask = (static_cast(1) << keys.size()) - 1; - current_columns = cube_chunk.getColumns(); + current_columns = current_chunk.getColumns(); current_zero_columns.clear(); current_zero_columns.reserve(keys.size()); - auto const & input_header = getInputPort().getHeader(); for (auto key : keys) - current_zero_columns.emplace_back(getColumnWithDefaults(input_header, key, num_rows)); + current_zero_columns.emplace_back(getColumnWithDefaults(key, num_rows)); } - auto gen_chunk = std::move(cube_chunk); + auto gen_chunk = std::move(current_chunk); if (mask) { @@ -75,7 +50,7 @@ Chunk CubeTransform::generate() Chunks chunks; chunks.emplace_back(std::move(columns), current_columns.front()->size()); - cube_chunk = merge(std::move(chunks), false); + current_chunk = merge(std::move(chunks), !params->use_nulls, false); } finalizeChunk(gen_chunk, aggregates_mask); diff --git a/src/Processors/Transforms/CubeTransform.h b/src/Processors/Transforms/CubeTransform.h index 4575a01935d..ac51a28bd0e 100644 --- a/src/Processors/Transforms/CubeTransform.h +++ b/src/Processors/Transforms/CubeTransform.h @@ -1,6 +1,7 @@ #pragma once #include #include +#include #include @@ -9,30 +10,23 @@ namespace DB /// Takes blocks after grouping, with non-finalized aggregate functions. /// Calculates all subsets of columns and aggregates over them. -class CubeTransform : public IAccumulatingTransform +class CubeTransform : public GroupByModifierTransform { public: CubeTransform(Block header, AggregatingTransformParamsPtr params); String getName() const override { return "CubeTransform"; } protected: - void consume(Chunk chunk) override; Chunk generate() override; private: - AggregatingTransformParamsPtr params; - const ColumnNumbers keys; const ColumnsMask aggregates_mask; - Chunks consumed_chunks; - Chunk cube_chunk; Columns current_columns; Columns current_zero_columns; UInt64 mask = 0; UInt64 grouping_set = 0; - - Chunk merge(Chunks && chunks, bool final); }; } diff --git a/src/Processors/Transforms/RollupTransform.cpp b/src/Processors/Transforms/RollupTransform.cpp index 6ac5ae35fa2..2fbde7abf0e 100644 --- a/src/Processors/Transforms/RollupTransform.cpp +++ b/src/Processors/Transforms/RollupTransform.cpp @@ -8,36 +8,71 @@ namespace DB { -RollupTransform::RollupTransform(Block header, AggregatingTransformParamsPtr params_) - : IAccumulatingTransform(std::move(header), generateOutputHeader(params_->getHeader(), params_->params.keys)) +GroupByModifierTransform::GroupByModifierTransform(Block header, AggregatingTransformParamsPtr params_) + : IAccumulatingTransform(std::move(header), generateOutputHeader(params_->getHeader(), params_->params.keys, params_->use_nulls)) , params(std::move(params_)) , keys(params->params.keys) - , aggregates_mask(getAggregatesMask(params->getHeader(), params->params.aggregates)) { - auto output_aggregator_params = params->params; intermediate_header = getOutputPort().getHeader(); - intermediate_header.erase(0); - output_aggregator_params.src_header = intermediate_header; - output_aggregator = std::make_unique(output_aggregator_params); + if (params->use_nulls) + { + auto output_aggregator_params = params->params; + intermediate_header.erase(0); + output_aggregator_params.src_header = intermediate_header; + output_aggregator = std::make_unique(output_aggregator_params); + } } -void RollupTransform::consume(Chunk chunk) +void GroupByModifierTransform::consume(Chunk chunk) { consumed_chunks.emplace_back(std::move(chunk)); } -Chunk RollupTransform::merge(Chunks && chunks, bool is_input, bool final) +void GroupByModifierTransform::mergeConsumed() { - BlocksList rollup_blocks; - auto header = is_input ? getInputPort().getHeader() : intermediate_header; - for (auto & chunk : chunks) - rollup_blocks.emplace_back(header.cloneWithColumns(chunk.detachColumns())); + if (consumed_chunks.size() > 1) + current_chunk = merge(std::move(consumed_chunks), true, false); + else + current_chunk = std::move(consumed_chunks.front()); - auto rollup_block = is_input ? params->aggregator.mergeBlocks(rollup_blocks, final) : output_aggregator->mergeBlocks(rollup_blocks, final); - auto num_rows = rollup_block.rows(); - return Chunk(rollup_block.getColumns(), num_rows); + size_t rows = current_chunk.getNumRows(); + auto columns = current_chunk.getColumns(); + if (params->use_nulls) + { + for (auto key : keys) + columns[key] = makeNullable(columns[key]); + } + current_chunk = Chunk{ columns, rows }; + + consumed_chunks.clear(); } +Chunk GroupByModifierTransform::merge(Chunks && chunks, bool is_input, bool final) +{ + auto header = is_input ? getInputPort().getHeader() : intermediate_header; + + BlocksList blocks; + for (auto & chunk : chunks) + blocks.emplace_back(header.cloneWithColumns(chunk.detachColumns())); + + auto current_block = is_input ? params->aggregator.mergeBlocks(blocks, final) : output_aggregator->mergeBlocks(blocks, final); + auto num_rows = current_block.rows(); + return Chunk(current_block.getColumns(), num_rows); +} + +MutableColumnPtr GroupByModifierTransform::getColumnWithDefaults(size_t key, size_t n) const +{ + auto const & col = intermediate_header.getByPosition(key); + auto result_column = col.column->cloneEmpty(); + col.type->insertManyDefaultsInto(*result_column, n); + return result_column; +} + +RollupTransform::RollupTransform(Block header, AggregatingTransformParamsPtr params_) + : GroupByModifierTransform(std::move(header), params_) + , aggregates_mask(getAggregatesMask(params->getHeader(), params->params.aggregates)) +{} + MutableColumnPtr getColumnWithDefaults(Block const & header, size_t key, size_t n) { auto const & col = header.getByPosition(key); @@ -50,23 +85,11 @@ Chunk RollupTransform::generate() { if (!consumed_chunks.empty()) { - if (consumed_chunks.size() > 1) - rollup_chunk = merge(std::move(consumed_chunks), true, false); - else - rollup_chunk = std::move(consumed_chunks.front()); - - size_t rows = rollup_chunk.getNumRows(); - auto columns = rollup_chunk.getColumns(); - for (auto key : keys) - columns[key] = makeNullable(columns[key]); - rollup_chunk = Chunk{ columns, rows }; - LOG_DEBUG(&Poco::Logger::get("RollupTransform"), "Chunk source: {}", rollup_chunk.dumpStructure()); - - consumed_chunks.clear(); + mergeConsumed(); last_removed_key = keys.size(); } - auto gen_chunk = std::move(rollup_chunk); + auto gen_chunk = std::move(current_chunk); if (last_removed_key) { @@ -75,12 +98,11 @@ Chunk RollupTransform::generate() auto num_rows = gen_chunk.getNumRows(); auto columns = gen_chunk.getColumns(); - columns[key] = getColumnWithDefaults(intermediate_header, key, num_rows); + columns[key] = getColumnWithDefaults(key, num_rows); Chunks chunks; chunks.emplace_back(std::move(columns), num_rows); - rollup_chunk = merge(std::move(chunks), false, false); - LOG_DEBUG(&Poco::Logger::get("RollupTransform"), "Chunk generated: {}", rollup_chunk.dumpStructure()); + current_chunk = merge(std::move(chunks), !params->use_nulls, false); } finalizeChunk(gen_chunk, aggregates_mask); diff --git a/src/Processors/Transforms/RollupTransform.h b/src/Processors/Transforms/RollupTransform.h index 8b66c85e0b5..cbed7705628 100644 --- a/src/Processors/Transforms/RollupTransform.h +++ b/src/Processors/Transforms/RollupTransform.h @@ -7,33 +7,47 @@ namespace DB { -/// Takes blocks after grouping, with non-finalized aggregate functions. -/// Calculates subtotals and grand totals values for a set of columns. -class RollupTransform : public IAccumulatingTransform +struct GroupByModifierTransform : public IAccumulatingTransform { -public: - RollupTransform(Block header, AggregatingTransformParamsPtr params); - String getName() const override { return "RollupTransform"; } + GroupByModifierTransform(Block header, AggregatingTransformParamsPtr params_); protected: void consume(Chunk chunk) override; - Chunk generate() override; -private: + void mergeConsumed(); + + Chunk merge(Chunks && chunks, bool is_input, bool final); + + MutableColumnPtr getColumnWithDefaults(size_t key, size_t n) const; + AggregatingTransformParamsPtr params; - const ColumnNumbers keys; - const ColumnsMask aggregates_mask; + + const ColumnNumbers & keys; std::unique_ptr output_aggregator; Block intermediate_header; Chunks consumed_chunks; - Chunk rollup_chunk; + Chunk current_chunk; +}; + +/// Takes blocks after grouping, with non-finalized aggregate functions. +/// Calculates subtotals and grand totals values for a set of columns. +class RollupTransform : public GroupByModifierTransform +{ +public: + RollupTransform(Block header, AggregatingTransformParamsPtr params); + String getName() const override { return "RollupTransform"; } + +protected: + Chunk generate() override; + +private: + const ColumnsMask aggregates_mask; + size_t last_removed_key = 0; size_t set_counter = 0; - - Chunk merge(Chunks && chunks, bool is_input, bool final); }; } diff --git a/tests/queries/0_stateless/02343_group_by_use_nulls.reference b/tests/queries/0_stateless/02343_group_by_use_nulls.reference index 0d7fa8f3a3b..769324efec3 100644 --- a/tests/queries/0_stateless/02343_group_by_use_nulls.reference +++ b/tests/queries/0_stateless/02343_group_by_use_nulls.reference @@ -1,3 +1,9 @@ +-- { echoOn } +SELECT number, number % 2, sum(number) AS val +FROM numbers(10) +GROUP BY ROLLUP(number, number % 2) +ORDER BY (number, number % 2, val) +SETTINGS group_by_use_nulls=1; 0 0 0 0 \N 0 1 1 1 @@ -19,3 +25,85 @@ 9 1 9 9 \N 9 \N \N 45 +SELECT number, number % 2, sum(number) AS val +FROM numbers(10) +GROUP BY ROLLUP(number, number % 2) +ORDER BY (number, number % 2, val) +SETTINGS group_by_use_nulls=0; +0 0 0 +0 0 0 +0 0 45 +1 0 1 +1 1 1 +2 0 2 +2 0 2 +3 0 3 +3 1 3 +4 0 4 +4 0 4 +5 0 5 +5 1 5 +6 0 6 +6 0 6 +7 0 7 +7 1 7 +8 0 8 +8 0 8 +9 0 9 +9 1 9 +SELECT number, number % 2, sum(number) AS val +FROM numbers(10) +GROUP BY CUBE(number, number % 2) +ORDER BY (number, number % 2, val) +SETTINGS group_by_use_nulls=1; +0 0 0 +0 \N 0 +1 1 1 +1 \N 1 +2 0 2 +2 \N 2 +3 1 3 +3 \N 3 +4 0 4 +4 \N 4 +5 1 5 +5 \N 5 +6 0 6 +6 \N 6 +7 1 7 +7 \N 7 +8 0 8 +8 \N 8 +9 1 9 +9 \N 9 +\N 0 20 +\N 1 25 +\N \N 45 +SELECT number, number % 2, sum(number) AS val +FROM numbers(10) +GROUP BY CUBE(number, number % 2) +ORDER BY (number, number % 2, val) +SETTINGS group_by_use_nulls=0; +0 0 0 +0 0 0 +0 0 20 +0 0 45 +0 1 25 +1 0 1 +1 1 1 +2 0 2 +2 0 2 +3 0 3 +3 1 3 +4 0 4 +4 0 4 +5 0 5 +5 1 5 +6 0 6 +6 0 6 +7 0 7 +7 1 7 +8 0 8 +8 0 8 +9 0 9 +9 1 9 diff --git a/tests/queries/0_stateless/02343_group_by_use_nulls.sql b/tests/queries/0_stateless/02343_group_by_use_nulls.sql index 1107ae79244..ac63e9feebc 100644 --- a/tests/queries/0_stateless/02343_group_by_use_nulls.sql +++ b/tests/queries/0_stateless/02343_group_by_use_nulls.sql @@ -1,4 +1,24 @@ +-- { echoOn } SELECT number, number % 2, sum(number) AS val FROM numbers(10) GROUP BY ROLLUP(number, number % 2) -ORDER BY (number, number % 2, val); +ORDER BY (number, number % 2, val) +SETTINGS group_by_use_nulls=1; + +SELECT number, number % 2, sum(number) AS val +FROM numbers(10) +GROUP BY ROLLUP(number, number % 2) +ORDER BY (number, number % 2, val) +SETTINGS group_by_use_nulls=0; + +SELECT number, number % 2, sum(number) AS val +FROM numbers(10) +GROUP BY CUBE(number, number % 2) +ORDER BY (number, number % 2, val) +SETTINGS group_by_use_nulls=1; + +SELECT number, number % 2, sum(number) AS val +FROM numbers(10) +GROUP BY CUBE(number, number % 2) +ORDER BY (number, number % 2, val) +SETTINGS group_by_use_nulls=0; From 33f601ec0a82133b0c9cef734cff025169172e1f Mon Sep 17 00:00:00 2001 From: Dmitry Novik Date: Thu, 30 Jun 2022 15:14:26 +0000 Subject: [PATCH 03/34] Commit support use_nulls for GS --- src/Interpreters/InterpreterSelectQuery.cpp | 1 + src/Processors/QueryPlan/AggregatingStep.cpp | 34 ++++++++----- src/Processors/QueryPlan/AggregatingStep.h | 3 +- .../02343_group_by_use_nulls.reference | 48 +++++++++++++++++++ .../0_stateless/02343_group_by_use_nulls.sql | 26 ++++++++++ 5 files changed, 99 insertions(+), 13 deletions(-) diff --git a/src/Interpreters/InterpreterSelectQuery.cpp b/src/Interpreters/InterpreterSelectQuery.cpp index a29684288cf..797fa700c9b 100644 --- a/src/Interpreters/InterpreterSelectQuery.cpp +++ b/src/Interpreters/InterpreterSelectQuery.cpp @@ -2295,6 +2295,7 @@ void InterpreterSelectQuery::executeAggregation(QueryPlan & query_plan, const Ac merge_threads, temporary_data_merge_threads, storage_has_evenly_distributed_read, + settings.group_by_use_nulls, std::move(group_by_info), std::move(group_by_sort_description)); query_plan.addStep(std::move(aggregating_step)); diff --git a/src/Processors/QueryPlan/AggregatingStep.cpp b/src/Processors/QueryPlan/AggregatingStep.cpp index a038b5cf302..48fb3729940 100644 --- a/src/Processors/QueryPlan/AggregatingStep.cpp +++ b/src/Processors/QueryPlan/AggregatingStep.cpp @@ -16,6 +16,8 @@ #include #include #include +#include +#include "Common/logger_useful.h" #include "Core/ColumnNumbers.h" #include "DataTypes/IDataType.h" @@ -49,12 +51,11 @@ Block appendGroupingSetColumn(Block header) return res; } -Block generateOutputHeader(const Block & input_header) +static inline void convertToNullable(Block & header, const ColumnNumbers & keys) { - auto header = appendGroupingSetColumn(input_header); - for (size_t i = 1; i < header.columns(); ++i) + for (auto key : keys) { - auto & column = header.getByPosition(i); + auto & column = header.getByPosition(key); if (!isAggregateFunction(column.type)) { @@ -62,7 +63,6 @@ Block generateOutputHeader(const Block & input_header) column.column = makeNullable(column.column); } } - return header; } Block generateOutputHeader(const Block & input_header, const ColumnNumbers & keys, bool use_nulls) @@ -84,12 +84,12 @@ Block generateOutputHeader(const Block & input_header, const ColumnNumbers & key return header; } -static Block appendGroupingColumn(Block block, const GroupingSetsParamsList & params) +static Block appendGroupingColumn(Block block, const ColumnNumbers & keys, const GroupingSetsParamsList & params, bool use_nulls) { if (params.empty()) return block; - return generateOutputHeader(block); + return generateOutputHeader(block, keys, use_nulls); } AggregatingStep::AggregatingStep( @@ -102,9 +102,10 @@ AggregatingStep::AggregatingStep( size_t merge_threads_, size_t temporary_data_merge_threads_, bool storage_has_evenly_distributed_read_, + bool group_by_use_nulls_, InputOrderInfoPtr group_by_info_, SortDescription group_by_sort_description_) - : ITransformingStep(input_stream_, appendGroupingColumn(params_.getHeader(final_), grouping_sets_params_), getTraits(), false) + : ITransformingStep(input_stream_, appendGroupingColumn(params_.getHeader(final_), params_.keys, grouping_sets_params_, group_by_use_nulls_), getTraits(), false) , params(std::move(params_)) , grouping_sets_params(std::move(grouping_sets_params_)) , final(std::move(final_)) @@ -113,6 +114,7 @@ AggregatingStep::AggregatingStep( , merge_threads(merge_threads_) , temporary_data_merge_threads(temporary_data_merge_threads_) , storage_has_evenly_distributed_read(storage_has_evenly_distributed_read_) + , group_by_use_nulls(group_by_use_nulls_) , group_by_info(std::move(group_by_info_)) , group_by_sort_description(std::move(group_by_sort_description_)) { @@ -243,6 +245,8 @@ void AggregatingStep::transformPipeline(QueryPipelineBuilder & pipeline, const B assert(ports.size() == grouping_sets_size); auto output_header = transform_params->getHeader(); + if (group_by_use_nulls) + convertToNullable(output_header, params.keys); for (size_t set_counter = 0; set_counter < grouping_sets_size; ++set_counter) { @@ -279,16 +283,22 @@ void AggregatingStep::transformPipeline(QueryPipelineBuilder & pipeline, const B else { const auto * column_node = dag->getIndex()[header.getPositionByName(col.name)]; - // index.push_back(dag->getIndex()[header.getPositionByName(col.name)]); - - const auto * node = &dag->addFunction(FunctionFactory::instance().get("toNullable", nullptr), { column_node }, col.name); - index.push_back(node); + if (isAggregateFunction(column_node->result_type) || !group_by_use_nulls) + { + index.push_back(column_node); + } + else + { + const auto * node = &dag->addFunction(FunctionFactory::instance().get("toNullable", nullptr), { column_node }, col.name); + index.push_back(node); + } } } dag->getIndex().swap(index); auto expression = std::make_shared(dag, settings.getActionsSettings()); auto transform = std::make_shared(header, expression); + LOG_DEBUG(&Poco::Logger::get("AggregatingStep"), "Header for GROUPING SET #{}: {}", set_counter, transform->getOutputPort().getHeader().dumpStructure()); connect(*ports[set_counter], transform->getInputPort()); processors.emplace_back(std::move(transform)); diff --git a/src/Processors/QueryPlan/AggregatingStep.h b/src/Processors/QueryPlan/AggregatingStep.h index c0be99403e9..c40994e018d 100644 --- a/src/Processors/QueryPlan/AggregatingStep.h +++ b/src/Processors/QueryPlan/AggregatingStep.h @@ -27,7 +27,6 @@ struct GroupingSetsParams using GroupingSetsParamsList = std::vector; Block appendGroupingSetColumn(Block header); -Block generateOutputHeader(const Block & input_header); Block generateOutputHeader(const Block & input_header, const ColumnNumbers & keys, bool use_nulls); /// Aggregation. See AggregatingTransform. @@ -44,6 +43,7 @@ public: size_t merge_threads_, size_t temporary_data_merge_threads_, bool storage_has_evenly_distributed_read_, + bool group_by_use_nulls_, InputOrderInfoPtr group_by_info_, SortDescription group_by_sort_description_); @@ -68,6 +68,7 @@ private: size_t temporary_data_merge_threads; bool storage_has_evenly_distributed_read; + bool group_by_use_nulls; InputOrderInfoPtr group_by_info; SortDescription group_by_sort_description; diff --git a/tests/queries/0_stateless/02343_group_by_use_nulls.reference b/tests/queries/0_stateless/02343_group_by_use_nulls.reference index 769324efec3..92d36c1a894 100644 --- a/tests/queries/0_stateless/02343_group_by_use_nulls.reference +++ b/tests/queries/0_stateless/02343_group_by_use_nulls.reference @@ -107,3 +107,51 @@ SETTINGS group_by_use_nulls=0; 8 0 8 9 0 9 9 1 9 +SELECT + number, + number % 2, + sum(number) AS val +FROM numbers(10) +GROUP BY + GROUPING SETS ( + (number), + (number % 2) + ) +ORDER BY (number, number % 2, val) +SETTINGS group_by_use_nulls = 1; +0 \N 0 +1 \N 1 +2 \N 2 +3 \N 3 +4 \N 4 +5 \N 5 +6 \N 6 +7 \N 7 +8 \N 8 +9 \N 9 +\N 0 20 +\N 1 25 +SELECT + number, + number % 2, + sum(number) AS val +FROM numbers(10) +GROUP BY + GROUPING SETS ( + (number), + (number % 2) + ) +ORDER BY (number, number % 2, val) +SETTINGS group_by_use_nulls = 0; +0 0 0 +0 0 20 +0 1 25 +1 0 1 +2 0 2 +3 0 3 +4 0 4 +5 0 5 +6 0 6 +7 0 7 +8 0 8 +9 0 9 diff --git a/tests/queries/0_stateless/02343_group_by_use_nulls.sql b/tests/queries/0_stateless/02343_group_by_use_nulls.sql index ac63e9feebc..5256c6bda75 100644 --- a/tests/queries/0_stateless/02343_group_by_use_nulls.sql +++ b/tests/queries/0_stateless/02343_group_by_use_nulls.sql @@ -22,3 +22,29 @@ FROM numbers(10) GROUP BY CUBE(number, number % 2) ORDER BY (number, number % 2, val) SETTINGS group_by_use_nulls=0; + +SELECT + number, + number % 2, + sum(number) AS val +FROM numbers(10) +GROUP BY + GROUPING SETS ( + (number), + (number % 2) + ) +ORDER BY (number, number % 2, val) +SETTINGS group_by_use_nulls = 1; + +SELECT + number, + number % 2, + sum(number) AS val +FROM numbers(10) +GROUP BY + GROUPING SETS ( + (number), + (number % 2) + ) +ORDER BY (number, number % 2, val) +SETTINGS group_by_use_nulls = 0; From e1794493812e8d6ed9da5f4c04ae0828b0ed12dc Mon Sep 17 00:00:00 2001 From: Dmitry Novik Date: Mon, 4 Jul 2022 15:56:42 +0000 Subject: [PATCH 04/34] Fix some tests --- src/Interpreters/ExpressionAnalyzer.cpp | 8 +++++--- 1 file changed, 5 insertions(+), 3 deletions(-) diff --git a/src/Interpreters/ExpressionAnalyzer.cpp b/src/Interpreters/ExpressionAnalyzer.cpp index f982f066d3b..bab017f5a11 100644 --- a/src/Interpreters/ExpressionAnalyzer.cpp +++ b/src/Interpreters/ExpressionAnalyzer.cpp @@ -344,6 +344,7 @@ void ExpressionAnalyzer::analyzeAggregation(ActionsDAGPtr & temp_actions) group_by_kind = GroupByKind::GROUPING_SETS; else group_by_kind = GroupByKind::ORDINARY; + bool use_nulls = group_by_kind != GroupByKind::ORDINARY && getContext()->getSettingsRef().group_by_use_nulls; /// For GROUPING SETS with multiple groups we always add virtual __grouping_set column /// With set number, which is used as an additional key at the stage of merging aggregating data. @@ -398,7 +399,7 @@ void ExpressionAnalyzer::analyzeAggregation(ActionsDAGPtr & temp_actions) } } - NameAndTypePair key{column_name, makeNullable(node->result_type)}; + NameAndTypePair key{column_name, use_nulls ? makeNullable(node->result_type) : node->result_type }; grouping_set_list.push_back(key); @@ -452,7 +453,7 @@ void ExpressionAnalyzer::analyzeAggregation(ActionsDAGPtr & temp_actions) } } - NameAndTypePair key = select_query->group_by_with_rollup || select_query->group_by_with_cube ? NameAndTypePair{ column_name, makeNullable(node->result_type) } : NameAndTypePair{column_name, node->result_type}; + NameAndTypePair key = NameAndTypePair{ column_name, use_nulls ? makeNullable(node->result_type) : node->result_type }; /// Aggregation keys are uniqued. if (!unique_keys.contains(key.name)) @@ -1903,7 +1904,8 @@ ExpressionAnalysisResult::ExpressionAnalysisResult( before_aggregation = chain.getLastActions(); before_aggregation_with_nullable = chain.getLastActions(); - query_analyzer.appendGroupByModifiers(before_aggregation, chain, only_types); + if (settings.group_by_use_nulls) + query_analyzer.appendGroupByModifiers(before_aggregation, chain, only_types); finalize_chain(chain); From 864ab205824640430da00bda75d82193731a02c7 Mon Sep 17 00:00:00 2001 From: Dmitry Novik Date: Mon, 4 Jul 2022 16:17:58 +0000 Subject: [PATCH 05/34] Use correct intermediate header for ROLLUP and CUBE --- src/Processors/Transforms/RollupTransform.cpp | 4 +++- ...02313_group_by_modifiers_with_non_default_types.reference} | 0 ...ql => 02313_group_by_modifiers_with_non_default_types.sql} | 0 3 files changed, 3 insertions(+), 1 deletion(-) rename tests/queries/0_stateless/{02313_group_by_modifiers_with_non-default_types.reference => 02313_group_by_modifiers_with_non_default_types.reference} (100%) rename tests/queries/0_stateless/{02313_group_by_modifiers_with_non-default_types.sql => 02313_group_by_modifiers_with_non_default_types.sql} (100%) diff --git a/src/Processors/Transforms/RollupTransform.cpp b/src/Processors/Transforms/RollupTransform.cpp index fc2159b0f44..0ab5900447f 100644 --- a/src/Processors/Transforms/RollupTransform.cpp +++ b/src/Processors/Transforms/RollupTransform.cpp @@ -18,12 +18,14 @@ GroupByModifierTransform::GroupByModifierTransform(Block header, AggregatingTran keys.emplace_back(input.getHeader().getPositionByName(key)); intermediate_header = getOutputPort().getHeader(); + intermediate_header.erase(0); + if (use_nulls) { auto output_aggregator_params = params->params; - intermediate_header.erase(0); output_aggregator = std::make_unique(intermediate_header, output_aggregator_params); } + LOG_DEBUG(&Poco::Logger::get("GroupByModifierTransform"), "Intermediate header: {}", intermediate_header.dumpStructure()); } void GroupByModifierTransform::consume(Chunk chunk) diff --git a/tests/queries/0_stateless/02313_group_by_modifiers_with_non-default_types.reference b/tests/queries/0_stateless/02313_group_by_modifiers_with_non_default_types.reference similarity index 100% rename from tests/queries/0_stateless/02313_group_by_modifiers_with_non-default_types.reference rename to tests/queries/0_stateless/02313_group_by_modifiers_with_non_default_types.reference diff --git a/tests/queries/0_stateless/02313_group_by_modifiers_with_non-default_types.sql b/tests/queries/0_stateless/02313_group_by_modifiers_with_non_default_types.sql similarity index 100% rename from tests/queries/0_stateless/02313_group_by_modifiers_with_non-default_types.sql rename to tests/queries/0_stateless/02313_group_by_modifiers_with_non_default_types.sql From 614ce26ee797eb612da1c5b63926b4509796db23 Mon Sep 17 00:00:00 2001 From: Dmitry Novik Date: Mon, 4 Jul 2022 18:09:48 +0000 Subject: [PATCH 06/34] Add group_be_use_nulls setting to stress tests --- docker/test/stress/stress | 3 +++ 1 file changed, 3 insertions(+) diff --git a/docker/test/stress/stress b/docker/test/stress/stress index e195f81b551..9be3059795c 100755 --- a/docker/test/stress/stress +++ b/docker/test/stress/stress @@ -44,6 +44,9 @@ def get_options(i, backward_compatibility_check): if i == 13: client_options.append("memory_tracker_fault_probability=0.001") + if i % 17 == 1 and not backward_compatibility_check: + client_options.append("group_by_use_nulls=1") + if client_options: options.append(" --client-option " + " ".join(client_options)) From aeb4f9c6be8523c83da22753d6e84e67b2839b50 Mon Sep 17 00:00:00 2001 From: lgbo-ustc Date: Wed, 6 Jul 2022 16:56:17 +0800 Subject: [PATCH 07/34] fixed: crash caused by IHiveFile be shared among threads --- src/Storages/Hive/HiveFile.cpp | 8 +++++++- src/Storages/Hive/HiveFile.h | 4 ++++ 2 files changed, 11 insertions(+), 1 deletion(-) diff --git a/src/Storages/Hive/HiveFile.cpp b/src/Storages/Hive/HiveFile.cpp index 57acbdd577b..92eee681ea1 100644 --- a/src/Storages/Hive/HiveFile.cpp +++ b/src/Storages/Hive/HiveFile.cpp @@ -79,8 +79,12 @@ Range createRangeFromParquetStatistics(std::shared_ptr IHiveFile::getRows() { - if (!rows) + if (!has_init_rows) + { + std::lock_guard lock(mutex); rows = getRowsImpl(); + has_init_rows = true; + } return rows; } @@ -88,6 +92,7 @@ void IHiveFile::loadFileMinMaxIndex() { if (file_minmax_idx_loaded) return; + std::lock_guard lock(mutex); loadFileMinMaxIndexImpl(); file_minmax_idx_loaded = true; } @@ -96,6 +101,7 @@ void IHiveFile::loadSplitMinMaxIndexes() { if (split_minmax_idxes_loaded) return; + std::lock_guard lock(mutex); loadSplitMinMaxIndexesImpl(); split_minmax_idxes_loaded = true; } diff --git a/src/Storages/Hive/HiveFile.h b/src/Storages/Hive/HiveFile.h index cbdf17bd5b5..a4bd345aa48 100644 --- a/src/Storages/Hive/HiveFile.h +++ b/src/Storages/Hive/HiveFile.h @@ -149,6 +149,7 @@ protected: String path; UInt64 last_modify_time; size_t size; + std::atomic has_init_rows = false; std::optional rows; NamesAndTypesList index_names_and_types; @@ -162,6 +163,9 @@ protected: /// Skip splits for this file after applying minmax index (if any) std::unordered_set skip_splits; std::shared_ptr storage_settings; + + /// IHiveFile would be shared among multi threads, need lock's protection to update min/max indexes. + std::mutex mutex; }; using HiveFilePtr = std::shared_ptr; From cda081dc57ad3656e228c0465ac52bcf99239a11 Mon Sep 17 00:00:00 2001 From: lgbo-ustc Date: Thu, 7 Jul 2022 09:41:52 +0800 Subject: [PATCH 08/34] update codes --- src/Storages/Hive/HiveFile.cpp | 6 ++++++ 1 file changed, 6 insertions(+) diff --git a/src/Storages/Hive/HiveFile.cpp b/src/Storages/Hive/HiveFile.cpp index 92eee681ea1..c4d4a28c88f 100644 --- a/src/Storages/Hive/HiveFile.cpp +++ b/src/Storages/Hive/HiveFile.cpp @@ -82,6 +82,8 @@ std::optional IHiveFile::getRows() if (!has_init_rows) { std::lock_guard lock(mutex); + if (has_init_rows) + return rows; rows = getRowsImpl(); has_init_rows = true; } @@ -93,6 +95,8 @@ void IHiveFile::loadFileMinMaxIndex() if (file_minmax_idx_loaded) return; std::lock_guard lock(mutex); + if (file_minmax_idx_loaded) + return; loadFileMinMaxIndexImpl(); file_minmax_idx_loaded = true; } @@ -102,6 +106,8 @@ void IHiveFile::loadSplitMinMaxIndexes() if (split_minmax_idxes_loaded) return; std::lock_guard lock(mutex); + if (split_minmax_idxes_loaded) + return; loadSplitMinMaxIndexesImpl(); split_minmax_idxes_loaded = true; } From 337f379bc3b7e677e004e6a06e3c42e67b91a1f1 Mon Sep 17 00:00:00 2001 From: lgbo-ustc Date: Thu, 7 Jul 2022 09:45:49 +0800 Subject: [PATCH 09/34] update codes --- src/Storages/Hive/HiveFile.cpp | 9 +++++---- 1 file changed, 5 insertions(+), 4 deletions(-) diff --git a/src/Storages/Hive/HiveFile.cpp b/src/Storages/Hive/HiveFile.cpp index c4d4a28c88f..112798fea4e 100644 --- a/src/Storages/Hive/HiveFile.cpp +++ b/src/Storages/Hive/HiveFile.cpp @@ -82,10 +82,11 @@ std::optional IHiveFile::getRows() if (!has_init_rows) { std::lock_guard lock(mutex); - if (has_init_rows) - return rows; - rows = getRowsImpl(); - has_init_rows = true; + if (!has_init_rows) + { + rows = getRowsImpl(); + has_init_rows = true; + } } return rows; } From 24243c51bef04a45fe25dc30c859a8f555181444 Mon Sep 17 00:00:00 2001 From: Dmitry Novik Date: Thu, 7 Jul 2022 16:40:35 +0000 Subject: [PATCH 10/34] Fix distributed grouping sets with nulls --- src/Interpreters/InterpreterSelectQuery.cpp | 12 +- ...3_group_by_use_nulls_distributed.reference | 157 ++++++++++++++++++ .../02343_group_by_use_nulls_distributed.sql | 51 ++++++ 3 files changed, 218 insertions(+), 2 deletions(-) create mode 100644 tests/queries/0_stateless/02343_group_by_use_nulls_distributed.reference create mode 100644 tests/queries/0_stateless/02343_group_by_use_nulls_distributed.sql diff --git a/src/Interpreters/InterpreterSelectQuery.cpp b/src/Interpreters/InterpreterSelectQuery.cpp index 551bd072586..a1dd0b7ad0e 100644 --- a/src/Interpreters/InterpreterSelectQuery.cpp +++ b/src/Interpreters/InterpreterSelectQuery.cpp @@ -786,8 +786,16 @@ Block InterpreterSelectQuery::getSampleBlockImpl() if (analysis_result.use_grouping_set_key) res.insert({ nullptr, std::make_shared(), "__grouping_set" }); - for (const auto & key : query_analyzer->aggregationKeys()) - res.insert({nullptr, header.getByName(key.name).type, key.name}); + if (context->getSettingsRef().group_by_use_nulls) + { + for (const auto & key : query_analyzer->aggregationKeys()) + res.insert({nullptr, makeNullable(header.getByName(key.name).type), key.name}); + } + else + { + for (const auto & key : query_analyzer->aggregationKeys()) + res.insert({nullptr, header.getByName(key.name).type, key.name}); + } for (const auto & aggregate : query_analyzer->aggregates()) { diff --git a/tests/queries/0_stateless/02343_group_by_use_nulls_distributed.reference b/tests/queries/0_stateless/02343_group_by_use_nulls_distributed.reference new file mode 100644 index 00000000000..7a9263e883c --- /dev/null +++ b/tests/queries/0_stateless/02343_group_by_use_nulls_distributed.reference @@ -0,0 +1,157 @@ +-- { echoOn } +SELECT number, number % 2, sum(number) AS val +FROM remote('127.0.0.{2,3}', numbers(10)) +GROUP BY ROLLUP(number, number % 2) +ORDER BY (number, number % 2, val) +SETTINGS group_by_use_nulls=1; +0 0 0 +0 \N 0 +1 1 2 +1 \N 2 +2 0 4 +2 \N 4 +3 1 6 +3 \N 6 +4 0 8 +4 \N 8 +5 1 10 +5 \N 10 +6 0 12 +6 \N 12 +7 1 14 +7 \N 14 +8 0 16 +8 \N 16 +9 1 18 +9 \N 18 +\N \N 90 +SELECT number, number % 2, sum(number) AS val +FROM remote('127.0.0.{2,3}', numbers(10)) +GROUP BY ROLLUP(number, number % 2) +ORDER BY (number, number % 2, val) +SETTINGS group_by_use_nulls=0; +0 0 0 +0 0 0 +0 0 90 +1 0 2 +1 1 2 +2 0 4 +2 0 4 +3 0 6 +3 1 6 +4 0 8 +4 0 8 +5 0 10 +5 1 10 +6 0 12 +6 0 12 +7 0 14 +7 1 14 +8 0 16 +8 0 16 +9 0 18 +9 1 18 +SELECT number, number % 2, sum(number) AS val +FROM remote('127.0.0.{2,3}', numbers(10)) +GROUP BY CUBE(number, number % 2) +ORDER BY (number, number % 2, val) +SETTINGS group_by_use_nulls=1; +0 0 0 +0 \N 0 +1 1 2 +1 \N 2 +2 0 4 +2 \N 4 +3 1 6 +3 \N 6 +4 0 8 +4 \N 8 +5 1 10 +5 \N 10 +6 0 12 +6 \N 12 +7 1 14 +7 \N 14 +8 0 16 +8 \N 16 +9 1 18 +9 \N 18 +\N 0 40 +\N 1 50 +\N \N 90 +SELECT number, number % 2, sum(number) AS val +FROM remote('127.0.0.{2,3}', numbers(10)) +GROUP BY CUBE(number, number % 2) +ORDER BY (number, number % 2, val) +SETTINGS group_by_use_nulls=0; +0 0 0 +0 0 0 +0 0 40 +0 0 90 +0 1 50 +1 0 2 +1 1 2 +2 0 4 +2 0 4 +3 0 6 +3 1 6 +4 0 8 +4 0 8 +5 0 10 +5 1 10 +6 0 12 +6 0 12 +7 0 14 +7 1 14 +8 0 16 +8 0 16 +9 0 18 +9 1 18 +SELECT + number, + number % 2, + sum(number) AS val +FROM remote('127.0.0.{2,3}', numbers(10)) +GROUP BY + GROUPING SETS ( + (number), + (number % 2) + ) +ORDER BY (number, number % 2, val) +SETTINGS group_by_use_nulls = 1; +0 \N 0 +1 \N 2 +2 \N 4 +3 \N 6 +4 \N 8 +5 \N 10 +6 \N 12 +7 \N 14 +8 \N 16 +9 \N 18 +\N 0 40 +\N 1 50 +SELECT + number, + number % 2, + sum(number) AS val +FROM remote('127.0.0.{2,3}', numbers(10)) +GROUP BY + GROUPING SETS ( + (number), + (number % 2) + ) +ORDER BY (number, number % 2, val) +SETTINGS group_by_use_nulls = 0; +0 0 0 +0 0 40 +0 1 50 +1 0 2 +2 0 4 +3 0 6 +4 0 8 +5 0 10 +6 0 12 +7 0 14 +8 0 16 +9 0 18 diff --git a/tests/queries/0_stateless/02343_group_by_use_nulls_distributed.sql b/tests/queries/0_stateless/02343_group_by_use_nulls_distributed.sql new file mode 100644 index 00000000000..15ac1127de7 --- /dev/null +++ b/tests/queries/0_stateless/02343_group_by_use_nulls_distributed.sql @@ -0,0 +1,51 @@ +-- { echoOn } +SELECT number, number % 2, sum(number) AS val +FROM remote('127.0.0.{2,3}', numbers(10)) +GROUP BY ROLLUP(number, number % 2) +ORDER BY (number, number % 2, val) +SETTINGS group_by_use_nulls=1; + +SELECT number, number % 2, sum(number) AS val +FROM remote('127.0.0.{2,3}', numbers(10)) +GROUP BY ROLLUP(number, number % 2) +ORDER BY (number, number % 2, val) +SETTINGS group_by_use_nulls=0; + +SELECT number, number % 2, sum(number) AS val +FROM remote('127.0.0.{2,3}', numbers(10)) +GROUP BY CUBE(number, number % 2) +ORDER BY (number, number % 2, val) +SETTINGS group_by_use_nulls=1; + +SELECT number, number % 2, sum(number) AS val +FROM remote('127.0.0.{2,3}', numbers(10)) +GROUP BY CUBE(number, number % 2) +ORDER BY (number, number % 2, val) +SETTINGS group_by_use_nulls=0; + +SELECT + number, + number % 2, + sum(number) AS val +FROM remote('127.0.0.{2,3}', numbers(10)) +GROUP BY + GROUPING SETS ( + (number), + (number % 2) + ) +ORDER BY (number, number % 2, val) +SETTINGS group_by_use_nulls = 1; + +SELECT + number, + number % 2, + sum(number) AS val +FROM remote('127.0.0.{2,3}', numbers(10)) +GROUP BY + GROUPING SETS ( + (number), + (number % 2) + ) +ORDER BY (number, number % 2, val) +SETTINGS group_by_use_nulls = 0; + From 1587385f7ad34f243cd8cd4e92490b71ce62d131 Mon Sep 17 00:00:00 2001 From: Dmitry Novik Date: Thu, 7 Jul 2022 18:53:20 +0000 Subject: [PATCH 11/34] Cleanup code --- src/Columns/ColumnNullable.cpp | 14 +++++++ src/Columns/ColumnNullable.h | 1 + src/DataTypes/DataTypeNullable.cpp | 7 ++++ src/DataTypes/DataTypeNullable.h | 1 + src/Interpreters/ExpressionAnalyzer.cpp | 21 +++-------- src/Interpreters/ExpressionAnalyzer.h | 1 - src/Interpreters/InterpreterSelectQuery.cpp | 3 +- src/Processors/QueryPlan/AggregatingStep.cpp | 37 ++++--------------- src/Processors/QueryPlan/AggregatingStep.h | 1 - src/Processors/Transforms/RollupTransform.cpp | 7 +--- 10 files changed, 39 insertions(+), 54 deletions(-) diff --git a/src/Columns/ColumnNullable.cpp b/src/Columns/ColumnNullable.cpp index 435e6bf1fbc..7677da991b9 100644 --- a/src/Columns/ColumnNullable.cpp +++ b/src/Columns/ColumnNullable.cpp @@ -785,4 +785,18 @@ ColumnPtr makeNullable(const ColumnPtr & column) return ColumnNullable::create(column, ColumnUInt8::create(column->size(), 0)); } +ColumnPtr makeNullableSafe(const ColumnPtr & column) +{ + if (isColumnNullable(*column)) + return column; + + if (isColumnConst(*column)) + return ColumnConst::create(makeNullableSafe(assert_cast(*column).getDataColumnPtr()), column->size()); + + if (column->canBeInsideNullable()) + return makeNullable(column); + + return column; +} + } diff --git a/src/Columns/ColumnNullable.h b/src/Columns/ColumnNullable.h index 60951dfcc2e..fb765db454d 100644 --- a/src/Columns/ColumnNullable.h +++ b/src/Columns/ColumnNullable.h @@ -221,5 +221,6 @@ private: }; ColumnPtr makeNullable(const ColumnPtr & column); +ColumnPtr makeNullableSafe(const ColumnPtr & column); } diff --git a/src/DataTypes/DataTypeNullable.cpp b/src/DataTypes/DataTypeNullable.cpp index b354b1278be..a14fb785b96 100644 --- a/src/DataTypes/DataTypeNullable.cpp +++ b/src/DataTypes/DataTypeNullable.cpp @@ -85,6 +85,13 @@ DataTypePtr makeNullable(const DataTypePtr & type) return std::make_shared(type); } +DataTypePtr makeNullableSafe(const DataTypePtr & type) +{ + if (type->canBeInsideNullable()) + return makeNullable(type); + return type; +} + DataTypePtr removeNullable(const DataTypePtr & type) { if (type->isNullable()) diff --git a/src/DataTypes/DataTypeNullable.h b/src/DataTypes/DataTypeNullable.h index c87e4f77008..379119b364c 100644 --- a/src/DataTypes/DataTypeNullable.h +++ b/src/DataTypes/DataTypeNullable.h @@ -51,6 +51,7 @@ private: DataTypePtr makeNullable(const DataTypePtr & type); +DataTypePtr makeNullableSafe(const DataTypePtr & type); DataTypePtr removeNullable(const DataTypePtr & type); } diff --git a/src/Interpreters/ExpressionAnalyzer.cpp b/src/Interpreters/ExpressionAnalyzer.cpp index bab017f5a11..177a919c0de 100644 --- a/src/Interpreters/ExpressionAnalyzer.cpp +++ b/src/Interpreters/ExpressionAnalyzer.cpp @@ -41,12 +41,11 @@ #include -#include "Common/logger_useful.h" #include #include -#include "Columns/ColumnNullable.h" -#include "Core/ColumnsWithTypeAndName.h" -#include "DataTypes/IDataType.h" +#include +#include +#include #include #include #include @@ -68,7 +67,6 @@ #include #include #include -#include namespace DB { @@ -399,7 +397,7 @@ void ExpressionAnalyzer::analyzeAggregation(ActionsDAGPtr & temp_actions) } } - NameAndTypePair key{column_name, use_nulls ? makeNullable(node->result_type) : node->result_type }; + NameAndTypePair key{column_name, use_nulls ? makeNullableSafe(node->result_type) : node->result_type }; grouping_set_list.push_back(key); @@ -453,7 +451,7 @@ void ExpressionAnalyzer::analyzeAggregation(ActionsDAGPtr & temp_actions) } } - NameAndTypePair key = NameAndTypePair{ column_name, use_nulls ? makeNullable(node->result_type) : node->result_type }; + NameAndTypePair key = NameAndTypePair{ column_name, use_nulls ? makeNullableSafe(node->result_type) : node->result_type }; /// Aggregation keys are uniqued. if (!unique_keys.contains(key.name)) @@ -1446,7 +1444,7 @@ void SelectQueryExpressionAnalyzer::appendGroupByModifiers(ActionsDAGPtr & befor if (isAggregateFunction(source_column.type)) result_columns.push_back(source_column); else - result_columns.emplace_back(makeNullable(source_column.type), source_column.name); + result_columns.emplace_back(makeNullableSafe(source_column.type), source_column.name); } ExpressionActionsChain::Step & step = chain.lastStep(before_aggregation->getNamesAndTypesList()); @@ -1632,8 +1630,6 @@ ActionsDAGPtr SelectQueryExpressionAnalyzer::appendProjectResult(ExpressionActio ExpressionActionsChain::Step & step = chain.lastStep(aggregated_columns); - LOG_DEBUG(&Poco::Logger::get("SelectQueryExpressionAnalyzer"), "Before output: {}", step.actions()->getNamesAndTypesList().toString()); - NamesWithAliases result_columns; ASTs asts = select_query->select()->children; @@ -1675,11 +1671,7 @@ ActionsDAGPtr SelectQueryExpressionAnalyzer::appendProjectResult(ExpressionActio } auto actions = chain.getLastActions(); - LOG_DEBUG(&Poco::Logger::get("SelectQueryExpressionAnalyzer"), "Before projection: {}", actions->getNamesAndTypesList().toString()); - actions->project(result_columns); - LOG_DEBUG(&Poco::Logger::get("SelectQueryExpressionAnalyzer"), "After projection: {}", actions->getNamesAndTypesList().toString()); - return actions; } @@ -1903,7 +1895,6 @@ ExpressionAnalysisResult::ExpressionAnalysisResult( query_analyzer.appendAggregateFunctionsArguments(chain, only_types || !first_stage); before_aggregation = chain.getLastActions(); - before_aggregation_with_nullable = chain.getLastActions(); if (settings.group_by_use_nulls) query_analyzer.appendGroupByModifiers(before_aggregation, chain, only_types); diff --git a/src/Interpreters/ExpressionAnalyzer.h b/src/Interpreters/ExpressionAnalyzer.h index 01135cfdc9e..f288162845c 100644 --- a/src/Interpreters/ExpressionAnalyzer.h +++ b/src/Interpreters/ExpressionAnalyzer.h @@ -246,7 +246,6 @@ struct ExpressionAnalysisResult JoinPtr join; ActionsDAGPtr before_where; ActionsDAGPtr before_aggregation; - ActionsDAGPtr before_aggregation_with_nullable; ActionsDAGPtr before_having; String having_column_name; bool remove_having_filter = false; diff --git a/src/Interpreters/InterpreterSelectQuery.cpp b/src/Interpreters/InterpreterSelectQuery.cpp index a1dd0b7ad0e..0c9a3033012 100644 --- a/src/Interpreters/InterpreterSelectQuery.cpp +++ b/src/Interpreters/InterpreterSelectQuery.cpp @@ -582,7 +582,6 @@ InterpreterSelectQuery::InterpreterSelectQuery( /// Calculate structure of the result. result_header = getSampleBlockImpl(); - LOG_DEBUG(&Poco::Logger::get("InterpreterSelectQuery"), "Result header: {}", result_header.dumpStructure()); }; analyze(shouldMoveToPrewhere()); @@ -789,7 +788,7 @@ Block InterpreterSelectQuery::getSampleBlockImpl() if (context->getSettingsRef().group_by_use_nulls) { for (const auto & key : query_analyzer->aggregationKeys()) - res.insert({nullptr, makeNullable(header.getByName(key.name).type), key.name}); + res.insert({nullptr, makeNullableSafe(header.getByName(key.name).type), key.name}); } else { diff --git a/src/Processors/QueryPlan/AggregatingStep.cpp b/src/Processors/QueryPlan/AggregatingStep.cpp index b73ca093164..f4e3749bd70 100644 --- a/src/Processors/QueryPlan/AggregatingStep.cpp +++ b/src/Processors/QueryPlan/AggregatingStep.cpp @@ -16,10 +16,6 @@ #include #include #include -#include -#include "Common/logger_useful.h" -#include "Core/ColumnNumbers.h" -#include "DataTypes/IDataType.h" namespace DB { @@ -57,11 +53,8 @@ static inline void convertToNullable(Block & header, const Names & keys) { auto & column = header.getByName(key); - if (!isAggregateFunction(column.type)) - { - column.type = makeNullable(column.type); - column.column = makeNullable(column.column); - } + column.type = makeNullableSafe(column.type); + column.column = makeNullableSafe(column.column); } } @@ -69,18 +62,7 @@ Block generateOutputHeader(const Block & input_header, const Names & keys, bool { auto header = appendGroupingSetColumn(input_header); if (use_nulls) - { - for (const auto & key : keys) - { - auto & column = header.getByName(key); - - if (!isAggregateFunction(column.type)) - { - column.type = makeNullable(column.type); - column.column = makeNullable(column.column); - } - } - } + convertToNullable(header, keys); return header; } @@ -269,6 +251,7 @@ void AggregatingStep::transformPipeline(QueryPipelineBuilder & pipeline, const B const auto & missing_columns = grouping_sets_params[set_counter].missing_keys; + auto to_nullable_function = FunctionFactory::instance().get("toNullable", nullptr); for (size_t i = 0; i < output_header.columns(); ++i) { auto & col = output_header.getByPosition(i); @@ -286,22 +269,16 @@ void AggregatingStep::transformPipeline(QueryPipelineBuilder & pipeline, const B else { const auto * column_node = dag->getIndex()[header.getPositionByName(col.name)]; - if (isAggregateFunction(column_node->result_type) || !group_by_use_nulls) - { - index.push_back(column_node); - } + if (group_by_use_nulls && column_node->result_type->canBeInsideNullable()) + index.push_back(&dag->addFunction(to_nullable_function, { column_node }, col.name)); else - { - const auto * node = &dag->addFunction(FunctionFactory::instance().get("toNullable", nullptr), { column_node }, col.name); - index.push_back(node); - } + index.push_back(column_node); } } dag->getIndex().swap(index); auto expression = std::make_shared(dag, settings.getActionsSettings()); auto transform = std::make_shared(header, expression); - LOG_DEBUG(&Poco::Logger::get("AggregatingStep"), "Header for GROUPING SET #{}: {}", set_counter, transform->getOutputPort().getHeader().dumpStructure()); connect(*ports[set_counter], transform->getInputPort()); processors.emplace_back(std::move(transform)); diff --git a/src/Processors/QueryPlan/AggregatingStep.h b/src/Processors/QueryPlan/AggregatingStep.h index fb838580e0f..71130b65adb 100644 --- a/src/Processors/QueryPlan/AggregatingStep.h +++ b/src/Processors/QueryPlan/AggregatingStep.h @@ -3,7 +3,6 @@ #include #include #include -#include "Core/ColumnNumbers.h" namespace DB { diff --git a/src/Processors/Transforms/RollupTransform.cpp b/src/Processors/Transforms/RollupTransform.cpp index 0ab5900447f..a5d67fb2f15 100644 --- a/src/Processors/Transforms/RollupTransform.cpp +++ b/src/Processors/Transforms/RollupTransform.cpp @@ -1,9 +1,7 @@ #include #include #include -#include -#include "Common/logger_useful.h" -#include "Columns/ColumnNullable.h" +#include namespace DB { @@ -25,7 +23,6 @@ GroupByModifierTransform::GroupByModifierTransform(Block header, AggregatingTran auto output_aggregator_params = params->params; output_aggregator = std::make_unique(intermediate_header, output_aggregator_params); } - LOG_DEBUG(&Poco::Logger::get("GroupByModifierTransform"), "Intermediate header: {}", intermediate_header.dumpStructure()); } void GroupByModifierTransform::consume(Chunk chunk) @@ -45,7 +42,7 @@ void GroupByModifierTransform::mergeConsumed() if (use_nulls) { for (auto key : keys) - columns[key] = makeNullable(columns[key]); + columns[key] = makeNullableSafe(columns[key]); } current_chunk = Chunk{ columns, rows }; From 0b18b4ed43f814ea046811ab42d25aa01c6c9dca Mon Sep 17 00:00:00 2001 From: Dmitry Novik Date: Thu, 7 Jul 2022 18:58:46 +0000 Subject: [PATCH 12/34] Use group_by_use_nulls in tests more often --- docker/test/stress/stress | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/docker/test/stress/stress b/docker/test/stress/stress index 76e1ff55c50..6d90b9d5437 100755 --- a/docker/test/stress/stress +++ b/docker/test/stress/stress @@ -46,7 +46,7 @@ def get_options(i, backward_compatibility_check): if i == 13: client_options.append("memory_tracker_fault_probability=0.001") - if i % 17 == 1 and not backward_compatibility_check: + if i % 2 == 1 and not backward_compatibility_check: client_options.append("group_by_use_nulls=1") if client_options: From e01b6cf8a5bccd30b42d1d5a1f9587422301a493 Mon Sep 17 00:00:00 2001 From: lgbo-ustc Date: Fri, 8 Jul 2022 09:25:45 +0800 Subject: [PATCH 13/34] update codes --- src/Storages/Hive/HiveFile.cpp | 1 + 1 file changed, 1 insertion(+) diff --git a/src/Storages/Hive/HiveFile.cpp b/src/Storages/Hive/HiveFile.cpp index 112798fea4e..671cb11deaa 100644 --- a/src/Storages/Hive/HiveFile.cpp +++ b/src/Storages/Hive/HiveFile.cpp @@ -88,6 +88,7 @@ std::optional IHiveFile::getRows() has_init_rows = true; } } + return rows; } From 66074b3950e4df04f90570c81b0a71423d7743cf Mon Sep 17 00:00:00 2001 From: Dmitry Novik Date: Mon, 11 Jul 2022 18:11:58 +0000 Subject: [PATCH 14/34] Fix distributed aggregation --- src/Interpreters/InterpreterSelectQuery.cpp | 2 +- .../02343_group_by_use_nulls.reference | 20 +++++++++++++++++++ .../0_stateless/02343_group_by_use_nulls.sql | 11 ++++++++++ 3 files changed, 32 insertions(+), 1 deletion(-) diff --git a/src/Interpreters/InterpreterSelectQuery.cpp b/src/Interpreters/InterpreterSelectQuery.cpp index 1fbc42df111..43f2b533605 100644 --- a/src/Interpreters/InterpreterSelectQuery.cpp +++ b/src/Interpreters/InterpreterSelectQuery.cpp @@ -786,7 +786,7 @@ Block InterpreterSelectQuery::getSampleBlockImpl() if (analysis_result.use_grouping_set_key) res.insert({ nullptr, std::make_shared(), "__grouping_set" }); - if (context->getSettingsRef().group_by_use_nulls) + if (context->getSettingsRef().group_by_use_nulls && analysis_result.use_grouping_set_key) { for (const auto & key : query_analyzer->aggregationKeys()) res.insert({nullptr, makeNullableSafe(header.getByName(key.name).type), key.name}); diff --git a/tests/queries/0_stateless/02343_group_by_use_nulls.reference b/tests/queries/0_stateless/02343_group_by_use_nulls.reference index 92d36c1a894..ec893b91ce1 100644 --- a/tests/queries/0_stateless/02343_group_by_use_nulls.reference +++ b/tests/queries/0_stateless/02343_group_by_use_nulls.reference @@ -155,3 +155,23 @@ SETTINGS group_by_use_nulls = 0; 7 0 7 8 0 8 9 0 9 +SELECT + CounterID AS k, + quantileBFloat16(0.5)(ResolutionWidth) +FROM remote('127.0.0.{1,2}', datasets, hits_v1) +GROUP BY k +ORDER BY + count() DESC, + CounterID ASC +LIMIT 10 +SETTINGS group_by_use_nulls = 1; +1704509 1384 +732797 1336 +598875 1384 +792887 1336 +3807842 1336 +25703952 1336 +716829 1384 +59183 1336 +33010362 1336 +800784 1336 diff --git a/tests/queries/0_stateless/02343_group_by_use_nulls.sql b/tests/queries/0_stateless/02343_group_by_use_nulls.sql index 5256c6bda75..455c53d10d6 100644 --- a/tests/queries/0_stateless/02343_group_by_use_nulls.sql +++ b/tests/queries/0_stateless/02343_group_by_use_nulls.sql @@ -48,3 +48,14 @@ GROUP BY ) ORDER BY (number, number % 2, val) SETTINGS group_by_use_nulls = 0; + +SELECT + CounterID AS k, + quantileBFloat16(0.5)(ResolutionWidth) +FROM remote('127.0.0.{1,2}', datasets, hits_v1) +GROUP BY k +ORDER BY + count() DESC, + CounterID ASC +LIMIT 10 +SETTINGS group_by_use_nulls = 1; From 29cfe33ed7c8301442bd9bb8ef1a505514e6ec0a Mon Sep 17 00:00:00 2001 From: Dmitry Novik Date: Mon, 11 Jul 2022 18:14:49 +0000 Subject: [PATCH 15/34] Small fix --- src/Interpreters/ExpressionAnalyzer.cpp | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/src/Interpreters/ExpressionAnalyzer.cpp b/src/Interpreters/ExpressionAnalyzer.cpp index 9353558926f..58dec59f703 100644 --- a/src/Interpreters/ExpressionAnalyzer.cpp +++ b/src/Interpreters/ExpressionAnalyzer.cpp @@ -1513,10 +1513,10 @@ void SelectQueryExpressionAnalyzer::appendGroupByModifiers(ActionsDAGPtr & befor for (const auto & source_column : source_columns) { - if (isAggregateFunction(source_column.type)) - result_columns.push_back(source_column); - else + if (source_column.type->canBeInsideNullable()) result_columns.emplace_back(makeNullableSafe(source_column.type), source_column.name); + else + result_columns.push_back(source_column); } ExpressionActionsChain::Step & step = chain.lastStep(before_aggregation->getNamesAndTypesList()); From ffa5ef6293fd4668054b80005654eb96273255c3 Mon Sep 17 00:00:00 2001 From: Dmitry Novik Date: Tue, 12 Jul 2022 00:14:41 +0000 Subject: [PATCH 16/34] Update tests --- .../02343_group_by_use_nulls.reference | 20 ------------------- .../0_stateless/02343_group_by_use_nulls.sql | 11 ---------- .../00173_group_by_use_nulls.reference | 10 ++++++++++ .../1_stateful/00173_group_by_use_nulls.sql | 10 ++++++++++ 4 files changed, 20 insertions(+), 31 deletions(-) create mode 100644 tests/queries/1_stateful/00173_group_by_use_nulls.reference create mode 100644 tests/queries/1_stateful/00173_group_by_use_nulls.sql diff --git a/tests/queries/0_stateless/02343_group_by_use_nulls.reference b/tests/queries/0_stateless/02343_group_by_use_nulls.reference index ec893b91ce1..92d36c1a894 100644 --- a/tests/queries/0_stateless/02343_group_by_use_nulls.reference +++ b/tests/queries/0_stateless/02343_group_by_use_nulls.reference @@ -155,23 +155,3 @@ SETTINGS group_by_use_nulls = 0; 7 0 7 8 0 8 9 0 9 -SELECT - CounterID AS k, - quantileBFloat16(0.5)(ResolutionWidth) -FROM remote('127.0.0.{1,2}', datasets, hits_v1) -GROUP BY k -ORDER BY - count() DESC, - CounterID ASC -LIMIT 10 -SETTINGS group_by_use_nulls = 1; -1704509 1384 -732797 1336 -598875 1384 -792887 1336 -3807842 1336 -25703952 1336 -716829 1384 -59183 1336 -33010362 1336 -800784 1336 diff --git a/tests/queries/0_stateless/02343_group_by_use_nulls.sql b/tests/queries/0_stateless/02343_group_by_use_nulls.sql index 455c53d10d6..5256c6bda75 100644 --- a/tests/queries/0_stateless/02343_group_by_use_nulls.sql +++ b/tests/queries/0_stateless/02343_group_by_use_nulls.sql @@ -48,14 +48,3 @@ GROUP BY ) ORDER BY (number, number % 2, val) SETTINGS group_by_use_nulls = 0; - -SELECT - CounterID AS k, - quantileBFloat16(0.5)(ResolutionWidth) -FROM remote('127.0.0.{1,2}', datasets, hits_v1) -GROUP BY k -ORDER BY - count() DESC, - CounterID ASC -LIMIT 10 -SETTINGS group_by_use_nulls = 1; diff --git a/tests/queries/1_stateful/00173_group_by_use_nulls.reference b/tests/queries/1_stateful/00173_group_by_use_nulls.reference new file mode 100644 index 00000000000..02723bf14dd --- /dev/null +++ b/tests/queries/1_stateful/00173_group_by_use_nulls.reference @@ -0,0 +1,10 @@ +1704509 1384 +732797 1336 +598875 1384 +792887 1336 +3807842 1336 +25703952 1336 +716829 1384 +59183 1336 +33010362 1336 +800784 1336 diff --git a/tests/queries/1_stateful/00173_group_by_use_nulls.sql b/tests/queries/1_stateful/00173_group_by_use_nulls.sql new file mode 100644 index 00000000000..7acacc4e579 --- /dev/null +++ b/tests/queries/1_stateful/00173_group_by_use_nulls.sql @@ -0,0 +1,10 @@ +SELECT + CounterID AS k, + quantileBFloat16(0.5)(ResolutionWidth) +FROM remote('127.0.0.{1,2}', test, hits) +GROUP BY k +ORDER BY + count() DESC, + CounterID ASC +LIMIT 10 +SETTINGS group_by_use_nulls = 1; From cfca3db8844dc0f3ccb34aa813c7aafb94fda21b Mon Sep 17 00:00:00 2001 From: Dmitry Novik Date: Tue, 12 Jul 2022 12:15:43 +0000 Subject: [PATCH 17/34] Fix crash with totals --- src/Processors/QueryPlan/CubeStep.cpp | 17 +++++- src/Processors/QueryPlan/RollupStep.cpp | 4 +- .../02343_group_by_use_nulls.reference | 58 +++++++++++++++++++ .../0_stateless/02343_group_by_use_nulls.sql | 12 ++++ 4 files changed, 86 insertions(+), 5 deletions(-) diff --git a/src/Processors/QueryPlan/CubeStep.cpp b/src/Processors/QueryPlan/CubeStep.cpp index 5559b4e1707..1642b577efc 100644 --- a/src/Processors/QueryPlan/CubeStep.cpp +++ b/src/Processors/QueryPlan/CubeStep.cpp @@ -4,6 +4,7 @@ #include #include #include +#include namespace DB { @@ -36,16 +37,26 @@ CubeStep::CubeStep(const DataStream & input_stream_, Aggregator::Params params_, output_stream->distinct_columns.insert(key); } -ProcessorPtr addGroupingSetForTotals(const Block & header, const BuildQueryPipelineSettings & settings, UInt64 grouping_set_number) +ProcessorPtr addGroupingSetForTotals(const Block & header, const Names & keys, const BuildQueryPipelineSettings & settings, UInt64 grouping_set_number) { auto dag = std::make_shared(header.getColumnsWithTypeAndName()); + auto & index = dag->getIndex(); + + auto to_nullable = FunctionFactory::instance().get("toNullable", nullptr); + for (const auto & key : keys) + { + const auto * node = dag->getIndex()[header.getPositionByName(key)]; + if (node->result_type->canBeInsideNullable()) + { + dag->addOrReplaceInIndex(dag->addFunction(to_nullable, { node }, node->result_name)); + } + } auto grouping_col = ColumnUInt64::create(1, grouping_set_number); const auto * grouping_node = &dag->addColumn( {ColumnPtr(std::move(grouping_col)), std::make_shared(), "__grouping_set"}); grouping_node = &dag->materializeNode(*grouping_node); - auto & index = dag->getIndex(); index.insert(index.begin(), grouping_node); auto expression = std::make_shared(dag, settings.getActionsSettings()); @@ -59,7 +70,7 @@ void CubeStep::transformPipeline(QueryPipelineBuilder & pipeline, const BuildQue pipeline.addSimpleTransform([&](const Block & header, QueryPipelineBuilder::StreamType stream_type) -> ProcessorPtr { if (stream_type == QueryPipelineBuilder::StreamType::Totals) - return addGroupingSetForTotals(header, settings, (UInt64(1) << keys_size) - 1); + return addGroupingSetForTotals(header, params.keys, settings, (UInt64(1) << keys_size) - 1); auto transform_params = std::make_shared(header, std::move(params), final); return std::make_shared(header, std::move(transform_params), use_nulls); diff --git a/src/Processors/QueryPlan/RollupStep.cpp b/src/Processors/QueryPlan/RollupStep.cpp index 3d1e79d9556..293fd79e7c5 100644 --- a/src/Processors/QueryPlan/RollupStep.cpp +++ b/src/Processors/QueryPlan/RollupStep.cpp @@ -34,7 +34,7 @@ RollupStep::RollupStep(const DataStream & input_stream_, Aggregator::Params para output_stream->distinct_columns.insert(key); } -ProcessorPtr addGroupingSetForTotals(const Block & header, const BuildQueryPipelineSettings & settings, UInt64 grouping_set_number); +ProcessorPtr addGroupingSetForTotals(const Block & header, const Names & keys, const BuildQueryPipelineSettings & settings, UInt64 grouping_set_number); void RollupStep::transformPipeline(QueryPipelineBuilder & pipeline, const BuildQueryPipelineSettings & settings) { @@ -43,7 +43,7 @@ void RollupStep::transformPipeline(QueryPipelineBuilder & pipeline, const BuildQ pipeline.addSimpleTransform([&](const Block & header, QueryPipelineBuilder::StreamType stream_type) -> ProcessorPtr { if (stream_type == QueryPipelineBuilder::StreamType::Totals) - return addGroupingSetForTotals(header, settings, keys_size); + return addGroupingSetForTotals(header, params.keys, settings, keys_size); auto transform_params = std::make_shared(header, std::move(params), true); return std::make_shared(header, std::move(transform_params), use_nulls); diff --git a/tests/queries/0_stateless/02343_group_by_use_nulls.reference b/tests/queries/0_stateless/02343_group_by_use_nulls.reference index 92d36c1a894..24b7bb5277c 100644 --- a/tests/queries/0_stateless/02343_group_by_use_nulls.reference +++ b/tests/queries/0_stateless/02343_group_by_use_nulls.reference @@ -155,3 +155,61 @@ SETTINGS group_by_use_nulls = 0; 7 0 7 8 0 8 9 0 9 +SELECT number, number % 2, sum(number) AS val +FROM numbers(10) +GROUP BY ROLLUP(number, number % 2) WITH TOTALS +ORDER BY (number, number % 2, val) +SETTINGS group_by_use_nulls=1; +0 0 0 +0 \N 0 +1 1 1 +1 \N 1 +2 0 2 +2 \N 2 +3 1 3 +3 \N 3 +4 0 4 +4 \N 4 +5 1 5 +5 \N 5 +6 0 6 +6 \N 6 +7 1 7 +7 \N 7 +8 0 8 +8 \N 8 +9 1 9 +9 \N 9 +\N \N 45 + +0 0 45 +SELECT number, number % 2, sum(number) AS val +FROM numbers(10) +GROUP BY CUBE(number, number % 2) WITH TOTALS +ORDER BY (number, number % 2, val) +SETTINGS group_by_use_nulls=1; +0 0 0 +0 \N 0 +1 1 1 +1 \N 1 +2 0 2 +2 \N 2 +3 1 3 +3 \N 3 +4 0 4 +4 \N 4 +5 1 5 +5 \N 5 +6 0 6 +6 \N 6 +7 1 7 +7 \N 7 +8 0 8 +8 \N 8 +9 1 9 +9 \N 9 +\N 0 20 +\N 1 25 +\N \N 45 + +0 0 45 diff --git a/tests/queries/0_stateless/02343_group_by_use_nulls.sql b/tests/queries/0_stateless/02343_group_by_use_nulls.sql index 5256c6bda75..a14db824013 100644 --- a/tests/queries/0_stateless/02343_group_by_use_nulls.sql +++ b/tests/queries/0_stateless/02343_group_by_use_nulls.sql @@ -48,3 +48,15 @@ GROUP BY ) ORDER BY (number, number % 2, val) SETTINGS group_by_use_nulls = 0; + +SELECT number, number % 2, sum(number) AS val +FROM numbers(10) +GROUP BY ROLLUP(number, number % 2) WITH TOTALS +ORDER BY (number, number % 2, val) +SETTINGS group_by_use_nulls=1; + +SELECT number, number % 2, sum(number) AS val +FROM numbers(10) +GROUP BY CUBE(number, number % 2) WITH TOTALS +ORDER BY (number, number % 2, val) +SETTINGS group_by_use_nulls=1; From aabf5123d6ae97fa908d005498bf7806e127f8e3 Mon Sep 17 00:00:00 2001 From: Dmitry Novik Date: Tue, 12 Jul 2022 13:46:06 +0000 Subject: [PATCH 18/34] Fixup --- src/Processors/QueryPlan/CubeStep.cpp | 17 ++++++++++------- src/Processors/QueryPlan/RollupStep.cpp | 4 ++-- 2 files changed, 12 insertions(+), 9 deletions(-) diff --git a/src/Processors/QueryPlan/CubeStep.cpp b/src/Processors/QueryPlan/CubeStep.cpp index 1642b577efc..52539dec75f 100644 --- a/src/Processors/QueryPlan/CubeStep.cpp +++ b/src/Processors/QueryPlan/CubeStep.cpp @@ -37,18 +37,21 @@ CubeStep::CubeStep(const DataStream & input_stream_, Aggregator::Params params_, output_stream->distinct_columns.insert(key); } -ProcessorPtr addGroupingSetForTotals(const Block & header, const Names & keys, const BuildQueryPipelineSettings & settings, UInt64 grouping_set_number) +ProcessorPtr addGroupingSetForTotals(const Block & header, const Names & keys, bool use_nulls, const BuildQueryPipelineSettings & settings, UInt64 grouping_set_number) { auto dag = std::make_shared(header.getColumnsWithTypeAndName()); auto & index = dag->getIndex(); - auto to_nullable = FunctionFactory::instance().get("toNullable", nullptr); - for (const auto & key : keys) + if (use_nulls) { - const auto * node = dag->getIndex()[header.getPositionByName(key)]; - if (node->result_type->canBeInsideNullable()) + auto to_nullable = FunctionFactory::instance().get("toNullable", nullptr); + for (const auto & key : keys) { - dag->addOrReplaceInIndex(dag->addFunction(to_nullable, { node }, node->result_name)); + const auto * node = dag->getIndex()[header.getPositionByName(key)]; + if (node->result_type->canBeInsideNullable()) + { + dag->addOrReplaceInIndex(dag->addFunction(to_nullable, { node }, node->result_name)); + } } } @@ -70,7 +73,7 @@ void CubeStep::transformPipeline(QueryPipelineBuilder & pipeline, const BuildQue pipeline.addSimpleTransform([&](const Block & header, QueryPipelineBuilder::StreamType stream_type) -> ProcessorPtr { if (stream_type == QueryPipelineBuilder::StreamType::Totals) - return addGroupingSetForTotals(header, params.keys, settings, (UInt64(1) << keys_size) - 1); + return addGroupingSetForTotals(header, params.keys, use_nulls, settings, (UInt64(1) << keys_size) - 1); auto transform_params = std::make_shared(header, std::move(params), final); return std::make_shared(header, std::move(transform_params), use_nulls); diff --git a/src/Processors/QueryPlan/RollupStep.cpp b/src/Processors/QueryPlan/RollupStep.cpp index 293fd79e7c5..3305f24602f 100644 --- a/src/Processors/QueryPlan/RollupStep.cpp +++ b/src/Processors/QueryPlan/RollupStep.cpp @@ -34,7 +34,7 @@ RollupStep::RollupStep(const DataStream & input_stream_, Aggregator::Params para output_stream->distinct_columns.insert(key); } -ProcessorPtr addGroupingSetForTotals(const Block & header, const Names & keys, const BuildQueryPipelineSettings & settings, UInt64 grouping_set_number); +ProcessorPtr addGroupingSetForTotals(const Block & header, const Names & keys, bool use_nulls, const BuildQueryPipelineSettings & settings, UInt64 grouping_set_number); void RollupStep::transformPipeline(QueryPipelineBuilder & pipeline, const BuildQueryPipelineSettings & settings) { @@ -43,7 +43,7 @@ void RollupStep::transformPipeline(QueryPipelineBuilder & pipeline, const BuildQ pipeline.addSimpleTransform([&](const Block & header, QueryPipelineBuilder::StreamType stream_type) -> ProcessorPtr { if (stream_type == QueryPipelineBuilder::StreamType::Totals) - return addGroupingSetForTotals(header, params.keys, settings, keys_size); + return addGroupingSetForTotals(header, params.keys, use_nulls, settings, keys_size); auto transform_params = std::make_shared(header, std::move(params), true); return std::make_shared(header, std::move(transform_params), use_nulls); From 2367f40b70ad2aca8e30bfc33108380e60c1d9e1 Mon Sep 17 00:00:00 2001 From: avogar Date: Mon, 18 Jul 2022 15:36:33 +0000 Subject: [PATCH 19/34] Better exception messages in schema inference --- src/Formats/ReadSchemaUtils.cpp | 23 +++++++++++++++++------ 1 file changed, 17 insertions(+), 6 deletions(-) diff --git a/src/Formats/ReadSchemaUtils.cpp b/src/Formats/ReadSchemaUtils.cpp index 11a91bd50dc..39812f5ba56 100644 --- a/src/Formats/ReadSchemaUtils.cpp +++ b/src/Formats/ReadSchemaUtils.cpp @@ -66,7 +66,7 @@ ColumnsDescription readSchemaFromFormat( } catch (const DB::Exception & e) { - throw Exception(ErrorCodes::CANNOT_EXTRACT_TABLE_STRUCTURE, "Cannot extract table structure from {} format file. Error: {}", format_name, e.message()); + throw Exception(ErrorCodes::CANNOT_EXTRACT_TABLE_STRUCTURE, "Cannot extract table structure from {} format file. Error: {}. You can specify the structure manually", format_name, e.message()); } } else if (FormatFactory::instance().checkIfFormatHasSchemaReader(format_name)) @@ -75,16 +75,27 @@ ColumnsDescription readSchemaFromFormat( SchemaReaderPtr schema_reader; size_t max_rows_to_read = format_settings ? format_settings->max_rows_to_read_for_schema_inference : context->getSettingsRef().input_format_max_rows_to_read_for_schema_inference; size_t iterations = 0; - while ((buf = read_buffer_iterator())) + while (true) { + bool is_eof = false; + try + { + buf = read_buffer_iterator(); + is_eof = buf->eof(); + } + catch (...) + { + auto exception_message = getCurrentExceptionMessage(false); + throw Exception(ErrorCodes::CANNOT_EXTRACT_TABLE_STRUCTURE, "Cannot extract table structure from {} format file: {}. You can specify the structure manually", format_name, exception_message); + } ++iterations; - if (buf->eof()) + if (is_eof) { auto exception_message = fmt::format("Cannot extract table structure from {} format file, file is empty", format_name); if (!retry) - throw Exception(ErrorCodes::CANNOT_EXTRACT_TABLE_STRUCTURE, exception_message); + throw Exception(ErrorCodes::CANNOT_EXTRACT_TABLE_STRUCTURE, "{}. You can specify the structure manually", exception_message); exception_messages += "\n" + exception_message; continue; @@ -118,14 +129,14 @@ ColumnsDescription readSchemaFromFormat( } if (!retry || !isRetryableSchemaInferenceError(getCurrentExceptionCode())) - throw Exception(ErrorCodes::CANNOT_EXTRACT_TABLE_STRUCTURE, "Cannot extract table structure from {} format file. Error: {}", format_name, exception_message); + throw Exception(ErrorCodes::CANNOT_EXTRACT_TABLE_STRUCTURE, "Cannot extract table structure from {} format file. Error: {}. You can specify the structure manually", format_name, exception_message); exception_messages += "\n" + exception_message; } } if (names_and_types.empty()) - throw Exception(ErrorCodes::CANNOT_EXTRACT_TABLE_STRUCTURE, "All attempts to extract table structure from files failed. Errors:{}", exception_messages); + throw Exception(ErrorCodes::CANNOT_EXTRACT_TABLE_STRUCTURE, "All attempts to extract table structure from files failed. Errors:{}\nYou can specify the structure manually", exception_messages); /// If we have "INSERT SELECT" query then try to order /// columns as they are ordered in table schema for formats From d82f378a9d67d8e4782fb9aa7a222bdea6f3d1b7 Mon Sep 17 00:00:00 2001 From: Alexander Tokmakov Date: Mon, 18 Jul 2022 23:37:07 +0200 Subject: [PATCH 20/34] do not enqueue uneeded parts for check --- src/Storages/MergeTree/DataPartsExchange.cpp | 9 +++++---- src/Storages/MergeTree/MergeTreeData.cpp | 4 +++- src/Storages/MergeTree/MergeTreeData.h | 7 +------ .../MergeTreeInOrderSelectProcessor.cpp | 2 +- .../MergeTree/ReplicatedMergeTreeQueue.cpp | 6 ++++++ .../MergeTree/ReplicatedMergeTreeQueue.h | 2 ++ src/Storages/StorageReplicatedMergeTree.cpp | 20 ++++++++++++++++--- 7 files changed, 35 insertions(+), 15 deletions(-) diff --git a/src/Storages/MergeTree/DataPartsExchange.cpp b/src/Storages/MergeTree/DataPartsExchange.cpp index 9e18dbc6281..3609a65bc71 100644 --- a/src/Storages/MergeTree/DataPartsExchange.cpp +++ b/src/Storages/MergeTree/DataPartsExchange.cpp @@ -127,12 +127,13 @@ void Service::processQuery(const HTMLForm & params, ReadBuffer & /*body*/, Write { if (part && part->isProjectionPart()) { - data.reportBrokenPart(part->getParentPart()->name); + auto parent_part = part->getParentPart()->shared_from_this(); + data.reportBrokenPart(parent_part); } + else if (part) + data.reportBrokenPart(part); else - { - data.reportBrokenPart(part_name); - } + LOG_TRACE(log, "Part {} was not found, do not report it as broken", part_name); }; try diff --git a/src/Storages/MergeTree/MergeTreeData.cpp b/src/Storages/MergeTree/MergeTreeData.cpp index 3b39100b3de..727ebc9c3cc 100644 --- a/src/Storages/MergeTree/MergeTreeData.cpp +++ b/src/Storages/MergeTree/MergeTreeData.cpp @@ -6031,8 +6031,10 @@ void MergeTreeData::reportBrokenPart(MergeTreeData::DataPartPtr & data_part) con broken_part_callback(part->name); } } - else + else if (data_part && data_part->getState() == IMergeTreeDataPart::State::Active) broken_part_callback(data_part->name); + else + LOG_DEBUG(log, "Will not check potentially broken part {} because it's not active", data_part->getNameWithState()); } MergeTreeData::MatcherFn MergeTreeData::getPartitionMatcher(const ASTPtr & partition_ast, ContextPtr local_context) const diff --git a/src/Storages/MergeTree/MergeTreeData.h b/src/Storages/MergeTree/MergeTreeData.h index 0b6e757ab49..26ac4d362ec 100644 --- a/src/Storages/MergeTree/MergeTreeData.h +++ b/src/Storages/MergeTree/MergeTreeData.h @@ -669,12 +669,7 @@ public: AlterLockHolder & table_lock_holder); /// Should be called if part data is suspected to be corrupted. - void reportBrokenPart(const String & name) const - { - broken_part_callback(name); - } - - /// Same as above but has the ability to check all other parts + /// Has the ability to check all other parts /// which reside on the same disk of the suspicious part. void reportBrokenPart(MergeTreeData::DataPartPtr & data_part) const; diff --git a/src/Storages/MergeTree/MergeTreeInOrderSelectProcessor.cpp b/src/Storages/MergeTree/MergeTreeInOrderSelectProcessor.cpp index 280ce82cfce..655ca003deb 100644 --- a/src/Storages/MergeTree/MergeTreeInOrderSelectProcessor.cpp +++ b/src/Storages/MergeTree/MergeTreeInOrderSelectProcessor.cpp @@ -44,7 +44,7 @@ catch (...) { /// Suspicion of the broken part. A part is added to the queue for verification. if (getCurrentExceptionCode() != ErrorCodes::MEMORY_LIMIT_EXCEEDED) - storage.reportBrokenPart(data_part->name); + storage.reportBrokenPart(data_part); throw; } diff --git a/src/Storages/MergeTree/ReplicatedMergeTreeQueue.cpp b/src/Storages/MergeTree/ReplicatedMergeTreeQueue.cpp index 7967726edca..2c32d9f266c 100644 --- a/src/Storages/MergeTree/ReplicatedMergeTreeQueue.cpp +++ b/src/Storages/MergeTree/ReplicatedMergeTreeQueue.cpp @@ -2336,6 +2336,12 @@ bool ReplicatedMergeTreeMergePredicate::hasDropRange(const MergeTreePartInfo & n return queue.hasDropRange(new_drop_range_info); } +String ReplicatedMergeTreeMergePredicate::getCoveringVirtualPart(const String & part_name) const +{ + std::lock_guard lock(queue.state_mutex); + return queue.virtual_parts.getContainingPart(MergeTreePartInfo::fromPartName(part_name, queue.format_version)); +} + ReplicatedMergeTreeQueue::SubscriberHandler ReplicatedMergeTreeQueue::addSubscriber(ReplicatedMergeTreeQueue::SubscriberCallBack && callback) diff --git a/src/Storages/MergeTree/ReplicatedMergeTreeQueue.h b/src/Storages/MergeTree/ReplicatedMergeTreeQueue.h index a88d9182bbf..a830815f760 100644 --- a/src/Storages/MergeTree/ReplicatedMergeTreeQueue.h +++ b/src/Storages/MergeTree/ReplicatedMergeTreeQueue.h @@ -521,6 +521,8 @@ public: bool hasDropRange(const MergeTreePartInfo & new_drop_range_info) const; + String getCoveringVirtualPart(const String & part_name) const; + private: const ReplicatedMergeTreeQueue & queue; diff --git a/src/Storages/StorageReplicatedMergeTree.cpp b/src/Storages/StorageReplicatedMergeTree.cpp index 520b5534fe3..c2ed93ca074 100644 --- a/src/Storages/StorageReplicatedMergeTree.cpp +++ b/src/Storages/StorageReplicatedMergeTree.cpp @@ -1837,8 +1837,8 @@ void StorageReplicatedMergeTree::executeDropRange(const LogEntry & entry) LOG_TRACE(log, "Executing DROP_RANGE {}", entry.new_part_name); auto drop_range_info = MergeTreePartInfo::fromPartName(entry.new_part_name, format_version); getContext()->getMergeList().cancelInPartition(getStorageID(), drop_range_info.partition_id, drop_range_info.max_block); - part_check_thread.cancelRemovedPartsCheck(drop_range_info); queue.removePartProducingOpsInRange(getZooKeeper(), drop_range_info, entry); + part_check_thread.cancelRemovedPartsCheck(drop_range_info); /// Delete the parts contained in the range to be deleted. /// It's important that no old parts remain (after the merge), because otherwise, @@ -1906,8 +1906,8 @@ bool StorageReplicatedMergeTree::executeReplaceRange(const LogEntry & entry) if (replace) { getContext()->getMergeList().cancelInPartition(getStorageID(), drop_range.partition_id, drop_range.max_block); - part_check_thread.cancelRemovedPartsCheck(drop_range); queue.removePartProducingOpsInRange(getZooKeeper(), drop_range, entry); + part_check_thread.cancelRemovedPartsCheck(drop_range); } else { @@ -7953,12 +7953,26 @@ bool StorageReplicatedMergeTree::createEmptyPartInsteadOfLost(zkutil::ZooKeeperP while (true) { + auto pred = queue.getMergePredicate(zookeeper); + String covering_virtual = pred.getCoveringVirtualPart(lost_part_name); + if (covering_virtual.empty()) + { + LOG_WARNING(log, "Will not create empty part instead of lost {}, because there's no covering part in replication queue", lost_part_name); + return false; + } + if (pred.hasDropRange(MergeTreePartInfo::fromPartName(covering_virtual, format_version))) + { + LOG_WARNING(log, "Will not create empty part instead of lost {}, because it's covered by DROP_RANGE", lost_part_name); + return false; + } Coordination::Requests ops; Coordination::Stat replicas_stat; auto replicas_path = fs::path(zookeeper_path) / "replicas"; Strings replicas = zookeeper->getChildren(replicas_path, &replicas_stat); + ops.emplace_back(zkutil::makeCheckRequest(zookeeper_path + "/log", pred.getVersion())); + /// In rare cases new replica can appear during check ops.emplace_back(zkutil::makeCheckRequest(replicas_path, replicas_stat.version)); @@ -7988,7 +8002,7 @@ bool StorageReplicatedMergeTree::createEmptyPartInsteadOfLost(zkutil::ZooKeeperP } else if (code == Coordination::Error::ZBADVERSION) { - LOG_INFO(log, "Looks like new replica appearead while creating new empty part, will retry"); + LOG_INFO(log, "Looks like log was updated or new replica appeared while creating new empty part, will retry"); } else { From 3e7414b356a2c9b659da17ff966e96c819e2f464 Mon Sep 17 00:00:00 2001 From: Alexander Tokmakov Date: Tue, 19 Jul 2022 12:47:20 +0200 Subject: [PATCH 21/34] add comment --- src/Storages/StorageReplicatedMergeTree.cpp | 5 +++++ 1 file changed, 5 insertions(+) diff --git a/src/Storages/StorageReplicatedMergeTree.cpp b/src/Storages/StorageReplicatedMergeTree.cpp index c2ed93ca074..1bc4c26e40e 100644 --- a/src/Storages/StorageReplicatedMergeTree.cpp +++ b/src/Storages/StorageReplicatedMergeTree.cpp @@ -7953,6 +7953,11 @@ bool StorageReplicatedMergeTree::createEmptyPartInsteadOfLost(zkutil::ZooKeeperP while (true) { + /// We should be careful when creating an empty part, because we are not sure that this part is still needed. + /// For example, it's possible that part (or partition) was dropped (or replaced) concurrently. + /// We can enqueue part for check from DataPartExchange or SelectProcessor + /// and it's hard to synchronize it with ReplicatedMergeTreeQueue and PartCheckThread... + /// But at least we can ignore parts that are definitely not needed according to virtual parts and drop ranges. auto pred = queue.getMergePredicate(zookeeper); String covering_virtual = pred.getCoveringVirtualPart(lost_part_name); if (covering_virtual.empty()) From a761da9f1baa1e0387c900926beed981a55a3a03 Mon Sep 17 00:00:00 2001 From: Alexander Tokmakov Date: Tue, 19 Jul 2022 13:58:59 +0200 Subject: [PATCH 22/34] add comment --- src/Storages/MergeTree/ReplicatedMergeTreeQueue.h | 2 ++ 1 file changed, 2 insertions(+) diff --git a/src/Storages/MergeTree/ReplicatedMergeTreeQueue.h b/src/Storages/MergeTree/ReplicatedMergeTreeQueue.h index a830815f760..f4cae7152ef 100644 --- a/src/Storages/MergeTree/ReplicatedMergeTreeQueue.h +++ b/src/Storages/MergeTree/ReplicatedMergeTreeQueue.h @@ -519,8 +519,10 @@ public: /// The version of "log" node that is used to check that no new merges have appeared. int32_t getVersion() const { return merges_version; } + /// Returns true if there's a drop range covering new_drop_range_info bool hasDropRange(const MergeTreePartInfo & new_drop_range_info) const; + /// Returns virtual part covering part_name (if any) or empty string String getCoveringVirtualPart(const String & part_name) const; private: From 246614229fb2d8997db0b92271f05e75c36f74c3 Mon Sep 17 00:00:00 2001 From: Constantine Peresypkin Date: Wed, 6 Jul 2022 20:51:30 +0200 Subject: [PATCH 23/34] fix long wait for process exit in ShellCommand fixes #38889 --- src/Common/ShellCommand.cpp | 43 +------- src/Common/waitForPid.cpp | 192 ++++++++++++++++++++++++++++++++++++ src/Common/waitForPid.h | 12 +++ 3 files changed, 206 insertions(+), 41 deletions(-) create mode 100644 src/Common/waitForPid.cpp create mode 100644 src/Common/waitForPid.h diff --git a/src/Common/ShellCommand.cpp b/src/Common/ShellCommand.cpp index 86adeeaf7e5..0050288b1cf 100644 --- a/src/Common/ShellCommand.cpp +++ b/src/Common/ShellCommand.cpp @@ -1,9 +1,7 @@ #include #include -#include #include #include -#include #include #include @@ -13,6 +11,7 @@ #include #include #include +#include namespace @@ -94,53 +93,15 @@ ShellCommand::~ShellCommand() bool ShellCommand::tryWaitProcessWithTimeout(size_t timeout_in_seconds) { - int status = 0; - LOG_TRACE(getLogger(), "Try wait for shell command pid {} with timeout {}", pid, timeout_in_seconds); wait_called = true; - struct timespec interval {.tv_sec = 1, .tv_nsec = 0}; in.close(); out.close(); err.close(); - if (timeout_in_seconds == 0) - { - /// If there is no timeout before signal try to waitpid 1 time without block so we can avoid sending - /// signal if process is already normally terminated. - - int waitpid_res = waitpid(pid, &status, WNOHANG); - bool process_terminated_normally = (waitpid_res == pid); - return process_terminated_normally; - } - - /// If timeout is positive try waitpid without block in loop until - /// process is normally terminated or waitpid return error - - while (timeout_in_seconds != 0) - { - int waitpid_res = waitpid(pid, &status, WNOHANG); - bool process_terminated_normally = (waitpid_res == pid); - - if (process_terminated_normally) - { - return true; - } - else if (waitpid_res == 0) - { - --timeout_in_seconds; - nanosleep(&interval, nullptr); - - continue; - } - else if (waitpid_res == -1 && errno != EINTR) - { - return false; - } - } - - return false; + return waitForPid(pid, timeout_in_seconds); } void ShellCommand::logCommand(const char * filename, char * const argv[]) diff --git a/src/Common/waitForPid.cpp b/src/Common/waitForPid.cpp new file mode 100644 index 00000000000..38f43ae2f6a --- /dev/null +++ b/src/Common/waitForPid.cpp @@ -0,0 +1,192 @@ +#include +#include +#include +#include + +#include +#include +#include + +#pragma GCC diagnostic push +#pragma GCC diagnostic ignored "-Wgnu-statement-expression" +#define HANDLE_EINTR(x) ({ \ + decltype(x) eintr_wrapper_result; \ + do { \ + eintr_wrapper_result = (x); \ + } while (eintr_wrapper_result == -1 && errno == EINTR); \ + eintr_wrapper_result; \ +}) + +#if defined(OS_LINUX) + +#include +#include + +#if !defined(__NR_pidfd_open) + #if defined(__x86_64__) + #define SYS_pidfd_open 434 + #elif defined(__aarch64__) + #define SYS_pidfd_open 434 + #elif defined(__ppc64__) + #define SYS_pidfd_open 434 + #elif defined(__riscv) + #define SYS_pidfd_open 434 + #else + #error "Unsupported architecture" + #endif +#else + #define SYS_pidfd_open __NR_pidfd_open +#endif + +namespace DB +{ + +static int syscall_pidfd_open(pid_t pid) +{ + // pidfd_open cannot be interrupted, no EINTR handling + return syscall(SYS_pidfd_open, pid, 0); +} + +static int dir_pidfd_open(pid_t pid) +{ + std::string path = "/proc/" + std::to_string(pid); + return HANDLE_EINTR(open(path.c_str(), O_DIRECTORY)); +} + +static bool supportsPidFdOpen() +{ + VersionNumber pidfd_open_minimal_version(5, 3, 0); + VersionNumber linux_version(Poco::Environment::osVersion()); + return linux_version >= pidfd_open_minimal_version; +} + +static int pidFdOpen(pid_t pid) +{ + // use pidfd_open or just plain old /proc/[pid] open for Linux + if (supportsPidFdOpen()) + { + return syscall_pidfd_open(pid); + } + else + { + return dir_pidfd_open(pid); + } +} + +static int pollPid(pid_t pid, int timeout_in_ms) +{ + struct pollfd pollfd; + + int pid_fd = pidFdOpen(pid); + if (pid_fd == -1) + { + return false; + } + pollfd.fd = pid_fd; + pollfd.events = POLLIN; + int ready = poll(&pollfd, 1, timeout_in_ms); + int save_errno = errno; + close(pid_fd); + errno = save_errno; + return ready; +} +#elif defined(OS_DARWIN) || defined(OS_FREEBSD) + +#include +#include + +namespace DB +{ + +static int pollPid(pid_t pid, int timeout_in_ms) +{ + int status = 0; + int kq = HANDLE_EINTR(kqueue()); + if (kq == -1) + { + return false; + } + struct kevent change = {.ident = NULL}; + EV_SET(&change, pid, EVFILT_PROC, EV_ADD, NOTE_EXIT, 0, NULL); + int result = HANDLE_EINTR(kevent(kq, &change, 1, NULL, 0, NULL)); + if (result == -1) + { + if (errno != ESRCH) + { + return false; + } + // check if pid already died while we called kevent() + if (waitpid(pid, &status, WNOHANG) == pid) + { + return true; + } + return false; + } + + struct kevent event = {.ident = NULL}; + struct timespec remaining_timespec = {.tv_sec = timeout_in_ms / 1000, .tv_nsec = (timeout_in_ms % 1000) * 1000000}; + int ready = kevent(kq, nullptr, 0, &event, 1, &remaining_timespec); + int save_errno = errno; + close(kq); + errno = save_errno; + return ready; +} +#else + #error "Unsupported OS type" +#endif + +bool waitForPid(pid_t pid, size_t timeout_in_seconds) +{ + int status = 0; + + Stopwatch watch; + + if (timeout_in_seconds == 0) + { + /// If there is no timeout before signal try to waitpid 1 time without block so we can avoid sending + /// signal if process is already normally terminated. + + int waitpid_res = waitpid(pid, &status, WNOHANG); + bool process_terminated_normally = (waitpid_res == pid); + return process_terminated_normally; + } + + /// If timeout is positive try waitpid without block in loop until + /// process is normally terminated or waitpid return error + + int timeout_in_ms = timeout_in_seconds * 1000; + while (timeout_in_ms > 0) + { + int waitpid_res = waitpid(pid, &status, WNOHANG); + bool process_terminated_normally = (waitpid_res == pid); + if (process_terminated_normally) + { + return true; + } + else if (waitpid_res == 0) + { + watch.restart(); + int ready = pollPid(pid, timeout_in_ms); + if (ready <= 0) + { + if (errno == EINTR || errno == EAGAIN) + { + timeout_in_ms -= watch.elapsedMilliseconds(); + } + else + { + return false; + } + } + continue; + } + else if (waitpid_res == -1 && errno != EINTR) + { + return false; + } + } + return false; +} + +} +#pragma GCC diagnostic pop diff --git a/src/Common/waitForPid.h b/src/Common/waitForPid.h new file mode 100644 index 00000000000..71c1a74712c --- /dev/null +++ b/src/Common/waitForPid.h @@ -0,0 +1,12 @@ +#pragma once +#include + +namespace DB +{ +/* + * Waits for a specific pid with timeout, using modern Linux and OSX facilities + * Returns `true` if process terminated successfully or `false` otherwise + */ +bool waitForPid(pid_t pid, size_t timeout_in_seconds); + +} From 88d59520a2b64aed4b3863ac8b0df20239a36b71 Mon Sep 17 00:00:00 2001 From: Kruglov Pavel <48961922+Avogar@users.noreply.github.com> Date: Tue, 19 Jul 2022 15:20:56 +0200 Subject: [PATCH 24/34] Fix --- src/Formats/ReadSchemaUtils.cpp | 2 ++ 1 file changed, 2 insertions(+) diff --git a/src/Formats/ReadSchemaUtils.cpp b/src/Formats/ReadSchemaUtils.cpp index 39812f5ba56..058f9b7059b 100644 --- a/src/Formats/ReadSchemaUtils.cpp +++ b/src/Formats/ReadSchemaUtils.cpp @@ -81,6 +81,8 @@ ColumnsDescription readSchemaFromFormat( try { buf = read_buffer_iterator(); + if (!buf) + break; is_eof = buf->eof(); } catch (...) From 10e4ef135d8c62ede00b4af9cd0c0711e57ceb22 Mon Sep 17 00:00:00 2001 From: vdimir Date: Mon, 18 Jul 2022 15:18:39 +0200 Subject: [PATCH 25/34] Set default value cross_to_inner_join_rewrite = 2 for comma join --- src/Core/Settings.h | 2 +- src/Interpreters/CrossToInnerJoinVisitor.cpp | 20 ++++++++++++++--- ...4_setting_cross_to_inner_rewrite.reference | 7 ++++++ .../02364_setting_cross_to_inner_rewrite.sql | 22 +++++++++++++++++++ 4 files changed, 47 insertions(+), 4 deletions(-) create mode 100644 tests/queries/0_stateless/02364_setting_cross_to_inner_rewrite.reference create mode 100644 tests/queries/0_stateless/02364_setting_cross_to_inner_rewrite.sql diff --git a/src/Core/Settings.h b/src/Core/Settings.h index bda72f089eb..ea08658d851 100644 --- a/src/Core/Settings.h +++ b/src/Core/Settings.h @@ -758,7 +758,7 @@ static constexpr UInt64 operator""_GiB(unsigned long long value) M(Bool, output_format_pretty_row_numbers, false, "Add row numbers before each row for pretty output format", 0) \ M(Bool, insert_distributed_one_random_shard, false, "If setting is enabled, inserting into distributed table will choose a random shard to write when there is no sharding key", 0) \ \ - M(UInt64, cross_to_inner_join_rewrite, 1, "Use inner join instead of comma/cross join if possible. Possible values: 0 - no rewrite, 1 - apply if possible, 2 - force rewrite all cross joins", 0) \ + M(UInt64, cross_to_inner_join_rewrite, 2, "Use inner join instead of comma/cross join if possible. Possible values: 0 - no rewrite, 1 - apply if possible for comma/cross, 2 - force rewrite all comma joins, cross - if possible", 0) \ \ M(Bool, output_format_arrow_low_cardinality_as_dictionary, false, "Enable output LowCardinality type as Dictionary Arrow type", 0) \ M(Bool, output_format_arrow_string_as_string, false, "Use Arrow String type instead of Binary for String columns", 0) \ diff --git a/src/Interpreters/CrossToInnerJoinVisitor.cpp b/src/Interpreters/CrossToInnerJoinVisitor.cpp index d438ea9394e..be6c1101fb4 100644 --- a/src/Interpreters/CrossToInnerJoinVisitor.cpp +++ b/src/Interpreters/CrossToInnerJoinVisitor.cpp @@ -39,7 +39,10 @@ struct JoinedElement : element(table_element) { if (element.table_join) + { join = element.table_join->as(); + original_kind = join->kind; + } } void checkTableName(const DatabaseAndTableWithAlias & table, const String & current_database) const @@ -61,6 +64,8 @@ struct JoinedElement join->kind = ASTTableJoin::Kind::Cross; } + ASTTableJoin::Kind getOriginalKind() const { return original_kind; } + bool rewriteCrossToInner(ASTPtr on_expression) { if (join->kind != ASTTableJoin::Kind::Cross) @@ -83,6 +88,8 @@ struct JoinedElement private: const ASTTablesInSelectQueryElement & element; ASTTableJoin * join = nullptr; + + ASTTableJoin::Kind original_kind; }; bool isAllowedToRewriteCrossJoin(const ASTPtr & node, const Aliases & aliases) @@ -251,10 +258,17 @@ void CrossToInnerJoinMatcher::visit(ASTSelectQuery & select, ASTPtr &, Data & da } } - if (data.cross_to_inner_join_rewrite > 1 && !rewritten) + if (joined.getOriginalKind() == ASTTableJoin::Kind::Comma && + data.cross_to_inner_join_rewrite > 1 && + !rewritten) { - throw Exception(ErrorCodes::INCORRECT_QUERY, "Failed to rewrite '{} WHERE {}' to INNER JOIN", - query_before, queryToString(select.where())); + throw Exception( + ErrorCodes::INCORRECT_QUERY, + "Failed to rewrite comma join to INNER. " + "Please, try to simplify WHERE section " + "or set the setting `cross_to_inner_join_rewrite` to 1 to allow slow CROSS JOIN for this case" + "(cannot rewrite '{} WHERE {}' to INNER JOIN)", + query_before, queryToString(select.where())); } } } diff --git a/tests/queries/0_stateless/02364_setting_cross_to_inner_rewrite.reference b/tests/queries/0_stateless/02364_setting_cross_to_inner_rewrite.reference new file mode 100644 index 00000000000..fcb49fa9945 --- /dev/null +++ b/tests/queries/0_stateless/02364_setting_cross_to_inner_rewrite.reference @@ -0,0 +1,7 @@ +1 +1 +1 +1 +1 +1 +1 diff --git a/tests/queries/0_stateless/02364_setting_cross_to_inner_rewrite.sql b/tests/queries/0_stateless/02364_setting_cross_to_inner_rewrite.sql new file mode 100644 index 00000000000..8deddbaa037 --- /dev/null +++ b/tests/queries/0_stateless/02364_setting_cross_to_inner_rewrite.sql @@ -0,0 +1,22 @@ + + +DROP TABLE IF EXISTS t1; +DROP TABLE IF EXISTS t2; + +CREATE TABLE t1 ( x Int ) Engine = Memory; +INSERT INTO t1 VALUES ( 1 ), ( 2 ), ( 3 ); + +CREATE TABLE t2 ( x Int ) Engine = Memory; +INSERT INTO t2 VALUES ( 2 ), ( 3 ), ( 4 ); + +SET cross_to_inner_join_rewrite = 1; +SELECT count() = 1 FROM t1, t2 WHERE t1.x > t2.x; +SELECT count() = 1 2ROM t1, t2 WHERE t1.x = t2.x; +SELECT count() = 1 2ROM t1 CROSS JOIN t2 WHERE t1.x = t2.x; +SELECT count() = 1 FROM t1 CROSS JOIN t2 WHERE t1.x > t2.x; + +SET cross_to_inner_join_rewrite = 2; +SELECT count() = 1 FROM t1, t2 WHERE t1.x > t2.x; -- { serverError INCORRECT_QUERY } +SELECT count() = 2 FROM t1, t2 WHERE t1.x = t2.x; +SELECT count() = 2 FROM t1 CROSS JOIN t2 WHERE t1.x = t2.x; +SELECT count() = 1 FROM t1 CROSS JOIN t2 WHERE t1.x > t2.x; -- do not force rewrite explicit CROSS From c0547bb09cc7d450003bcdaf63d903272cefdab3 Mon Sep 17 00:00:00 2001 From: vdimir Date: Tue, 19 Jul 2022 12:32:04 +0200 Subject: [PATCH 26/34] fix space --- src/Interpreters/CrossToInnerJoinVisitor.cpp | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/Interpreters/CrossToInnerJoinVisitor.cpp b/src/Interpreters/CrossToInnerJoinVisitor.cpp index be6c1101fb4..cfa979f4036 100644 --- a/src/Interpreters/CrossToInnerJoinVisitor.cpp +++ b/src/Interpreters/CrossToInnerJoinVisitor.cpp @@ -266,7 +266,7 @@ void CrossToInnerJoinMatcher::visit(ASTSelectQuery & select, ASTPtr &, Data & da ErrorCodes::INCORRECT_QUERY, "Failed to rewrite comma join to INNER. " "Please, try to simplify WHERE section " - "or set the setting `cross_to_inner_join_rewrite` to 1 to allow slow CROSS JOIN for this case" + "or set the setting `cross_to_inner_join_rewrite` to 1 to allow slow CROSS JOIN for this case " "(cannot rewrite '{} WHERE {}' to INNER JOIN)", query_before, queryToString(select.where())); } From b58f9adce9b9a36c8f83b55231e177582584d395 Mon Sep 17 00:00:00 2001 From: vdimir Date: Tue, 19 Jul 2022 12:32:27 +0200 Subject: [PATCH 27/34] Fix tests for cross_to_inner_join_rewrite --- tests/queries/0_stateless/00849_multiple_comma_join_2.sql | 1 + tests/queries/0_stateless/00950_test_gorilla_codec.sql | 2 ++ tests/queries/0_stateless/01095_tpch_like_smoke.sql | 2 ++ tests/queries/0_stateless/01479_cross_join_9855.sql | 2 ++ tests/queries/0_stateless/01911_logical_error_minus.sql | 2 ++ tests/queries/0_stateless/02313_test_fpc_codec.sql | 2 ++ .../0_stateless/02364_setting_cross_to_inner_rewrite.sql | 4 ++-- 7 files changed, 13 insertions(+), 2 deletions(-) diff --git a/tests/queries/0_stateless/00849_multiple_comma_join_2.sql b/tests/queries/0_stateless/00849_multiple_comma_join_2.sql index eabede3ff00..eb803450ff7 100644 --- a/tests/queries/0_stateless/00849_multiple_comma_join_2.sql +++ b/tests/queries/0_stateless/00849_multiple_comma_join_2.sql @@ -1,5 +1,6 @@ SET enable_optimize_predicate_expression = 0; SET convert_query_to_cnf = 0; +SET cross_to_inner_join_rewrite = 1; DROP TABLE IF EXISTS t1; DROP TABLE IF EXISTS t2; diff --git a/tests/queries/0_stateless/00950_test_gorilla_codec.sql b/tests/queries/0_stateless/00950_test_gorilla_codec.sql index a6e0f1d7b11..e9582480bcb 100644 --- a/tests/queries/0_stateless/00950_test_gorilla_codec.sql +++ b/tests/queries/0_stateless/00950_test_gorilla_codec.sql @@ -1,5 +1,7 @@ DROP TABLE IF EXISTS codecTest; +SET cross_to_inner_join_rewrite = 1; + CREATE TABLE codecTest ( key UInt64, name String, diff --git a/tests/queries/0_stateless/01095_tpch_like_smoke.sql b/tests/queries/0_stateless/01095_tpch_like_smoke.sql index 5971178ade5..1ac9ec229f0 100644 --- a/tests/queries/0_stateless/01095_tpch_like_smoke.sql +++ b/tests/queries/0_stateless/01095_tpch_like_smoke.sql @@ -7,6 +7,8 @@ DROP TABLE IF EXISTS lineitem; DROP TABLE IF EXISTS nation; DROP TABLE IF EXISTS region; +SET cross_to_inner_join_rewrite = 1; + CREATE TABLE part ( p_partkey Int32, -- PK diff --git a/tests/queries/0_stateless/01479_cross_join_9855.sql b/tests/queries/0_stateless/01479_cross_join_9855.sql index 0b549619489..6dc76f22057 100644 --- a/tests/queries/0_stateless/01479_cross_join_9855.sql +++ b/tests/queries/0_stateless/01479_cross_join_9855.sql @@ -1,3 +1,5 @@ +SET cross_to_inner_join_rewrite = 1; + SELECT count() FROM numbers(4) AS n1, numbers(3) AS n2 WHERE n1.number > (select avg(n.number) from numbers(3) n); diff --git a/tests/queries/0_stateless/01911_logical_error_minus.sql b/tests/queries/0_stateless/01911_logical_error_minus.sql index 9813c1a8a5d..3dcdedd38f5 100644 --- a/tests/queries/0_stateless/01911_logical_error_minus.sql +++ b/tests/queries/0_stateless/01911_logical_error_minus.sql @@ -1,6 +1,8 @@ -- This test case is almost completely generated by fuzzer. -- It appeared to trigger assertion. +SET cross_to_inner_join_rewrite = 1; + DROP TABLE IF EXISTS codecTest; CREATE TABLE codecTest ( diff --git a/tests/queries/0_stateless/02313_test_fpc_codec.sql b/tests/queries/0_stateless/02313_test_fpc_codec.sql index 3b1127350f0..4fe54b87c9c 100644 --- a/tests/queries/0_stateless/02313_test_fpc_codec.sql +++ b/tests/queries/0_stateless/02313_test_fpc_codec.sql @@ -1,5 +1,7 @@ DROP TABLE IF EXISTS codecTest; +SET cross_to_inner_join_rewrite = 1; + CREATE TABLE codecTest ( key UInt64, name String, diff --git a/tests/queries/0_stateless/02364_setting_cross_to_inner_rewrite.sql b/tests/queries/0_stateless/02364_setting_cross_to_inner_rewrite.sql index 8deddbaa037..cdbac93937e 100644 --- a/tests/queries/0_stateless/02364_setting_cross_to_inner_rewrite.sql +++ b/tests/queries/0_stateless/02364_setting_cross_to_inner_rewrite.sql @@ -11,8 +11,8 @@ INSERT INTO t2 VALUES ( 2 ), ( 3 ), ( 4 ); SET cross_to_inner_join_rewrite = 1; SELECT count() = 1 FROM t1, t2 WHERE t1.x > t2.x; -SELECT count() = 1 2ROM t1, t2 WHERE t1.x = t2.x; -SELECT count() = 1 2ROM t1 CROSS JOIN t2 WHERE t1.x = t2.x; +SELECT count() = 2 FROM t1, t2 WHERE t1.x = t2.x; +SELECT count() = 2 FROM t1 CROSS JOIN t2 WHERE t1.x = t2.x; SELECT count() = 1 FROM t1 CROSS JOIN t2 WHERE t1.x > t2.x; SET cross_to_inner_join_rewrite = 2; From 5c16d6b55302f235e844fe2fefb24108bf26d943 Mon Sep 17 00:00:00 2001 From: avogar Date: Tue, 19 Jul 2022 19:21:30 +0000 Subject: [PATCH 28/34] Fix WriteBuffer finalize in destructor when cacnel query --- src/Processors/Formats/IOutputFormat.cpp | 4 +++- src/Storages/HDFS/StorageHDFS.cpp | 7 +++++++ src/Storages/StorageFile.cpp | 7 +++++++ src/Storages/StorageS3.cpp | 7 +++++++ src/Storages/StorageURL.cpp | 7 +++++++ src/Storages/StorageURL.h | 1 + 6 files changed, 32 insertions(+), 1 deletion(-) diff --git a/src/Processors/Formats/IOutputFormat.cpp b/src/Processors/Formats/IOutputFormat.cpp index 3c4e6861151..47ebaa9c5f5 100644 --- a/src/Processors/Formats/IOutputFormat.cpp +++ b/src/Processors/Formats/IOutputFormat.cpp @@ -73,7 +73,6 @@ void IOutputFormat::work() setRowsBeforeLimit(rows_before_limit_counter->get()); finalize(); - finalized = true; return; } @@ -120,9 +119,12 @@ void IOutputFormat::write(const Block & block) void IOutputFormat::finalize() { + if (finalized) + return; writePrefixIfNot(); writeSuffixIfNot(); finalizeImpl(); + finalized = true; } } diff --git a/src/Storages/HDFS/StorageHDFS.cpp b/src/Storages/HDFS/StorageHDFS.cpp index 57e893e9683..1e9f9286633 100644 --- a/src/Storages/HDFS/StorageHDFS.cpp +++ b/src/Storages/HDFS/StorageHDFS.cpp @@ -430,6 +430,13 @@ public: writer->write(getHeader().cloneWithColumns(chunk.detachColumns())); } + void onCancel() override + { + if (!writer) + return; + onFinish(); + } + void onException() override { if (!writer) diff --git a/src/Storages/StorageFile.cpp b/src/Storages/StorageFile.cpp index d138104018a..30e5042fb06 100644 --- a/src/Storages/StorageFile.cpp +++ b/src/Storages/StorageFile.cpp @@ -813,6 +813,13 @@ public: writer->write(getHeader().cloneWithColumns(chunk.detachColumns())); } + void onCancel() override + { + if (!writer) + return; + onFinish(); + } + void onException() override { if (!writer) diff --git a/src/Storages/StorageS3.cpp b/src/Storages/StorageS3.cpp index 130bc75a65c..6a7c682199d 100644 --- a/src/Storages/StorageS3.cpp +++ b/src/Storages/StorageS3.cpp @@ -602,6 +602,13 @@ public: writer->write(getHeader().cloneWithColumns(chunk.detachColumns())); } + void onCancel() override + { + if (!writer) + return; + onFinish(); + } + void onException() override { if (!writer) diff --git a/src/Storages/StorageURL.cpp b/src/Storages/StorageURL.cpp index 15ae23305f3..cdc288ba788 100644 --- a/src/Storages/StorageURL.cpp +++ b/src/Storages/StorageURL.cpp @@ -450,6 +450,13 @@ void StorageURLSink::consume(Chunk chunk) writer->write(getHeader().cloneWithColumns(chunk.detachColumns())); } +void StorageURLSink::onCancel() +{ + if (!writer) + return; + onFinish(); +} + void StorageURLSink::onException() { if (!writer) diff --git a/src/Storages/StorageURL.h b/src/Storages/StorageURL.h index 79371242bb1..320c6258ee5 100644 --- a/src/Storages/StorageURL.h +++ b/src/Storages/StorageURL.h @@ -114,6 +114,7 @@ public: std::string getName() const override { return "StorageURLSink"; } void consume(Chunk chunk) override; + void onCancel() override; void onException() override; void onFinish() override; From be1d40916a095962d29f1b4372b8a19381b27ffb Mon Sep 17 00:00:00 2001 From: Denny Crane Date: Tue, 19 Jul 2022 18:22:21 -0300 Subject: [PATCH 29/34] Update formats.md --- docs/en/interfaces/formats.md | 2 ++ 1 file changed, 2 insertions(+) diff --git a/docs/en/interfaces/formats.md b/docs/en/interfaces/formats.md index 5d8ed9cdacd..d4449a6aa8e 100644 --- a/docs/en/interfaces/formats.md +++ b/docs/en/interfaces/formats.md @@ -1632,6 +1632,8 @@ kafka_topic_list = 'topic1', kafka_group_name = 'group1', kafka_format = 'AvroConfluent'; +-- for debug purposes you can set format_avro_schema_registry_url in session. +-- this way cannot be used in production SET format_avro_schema_registry_url = 'http://schema-registry'; SELECT * FROM topic1_stream; From 1d5b2b770e82fe48f877777c8a1a2c219b51600d Mon Sep 17 00:00:00 2001 From: Denny Crane Date: Tue, 19 Jul 2022 18:23:02 -0300 Subject: [PATCH 30/34] Update formats.md --- docs/en/interfaces/formats.md | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/docs/en/interfaces/formats.md b/docs/en/interfaces/formats.md index d4449a6aa8e..e499849426b 100644 --- a/docs/en/interfaces/formats.md +++ b/docs/en/interfaces/formats.md @@ -1632,7 +1632,7 @@ kafka_topic_list = 'topic1', kafka_group_name = 'group1', kafka_format = 'AvroConfluent'; --- for debug purposes you can set format_avro_schema_registry_url in session. +-- for debug purposes you can set format_avro_schema_registry_url in a session. -- this way cannot be used in production SET format_avro_schema_registry_url = 'http://schema-registry'; From e5e9f6e60bc63c5071493b8328833966d04238d7 Mon Sep 17 00:00:00 2001 From: Ladislav Snizek Date: Wed, 20 Jul 2022 11:53:33 +0200 Subject: [PATCH 31/34] Documentation: Correct http_receive_timeout and http_send_timeout defaults (changed in #31450) --- docs/en/operations/settings/settings.md | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/docs/en/operations/settings/settings.md b/docs/en/operations/settings/settings.md index 75c2aa57b32..9f66d5d29a9 100644 --- a/docs/en/operations/settings/settings.md +++ b/docs/en/operations/settings/settings.md @@ -2626,7 +2626,7 @@ Possible values: - Any positive integer. - 0 - Disabled (infinite timeout). -Default value: 1800. +Default value: 180. ## http_receive_timeout {#http_receive_timeout} @@ -2637,7 +2637,7 @@ Possible values: - Any positive integer. - 0 - Disabled (infinite timeout). -Default value: 1800. +Default value: 180. ## check_query_single_value_result {#check_query_single_value_result} From 4b46d80169469076552dc184a891e955163ca643 Mon Sep 17 00:00:00 2001 From: lgbo-ustc Date: Wed, 20 Jul 2022 18:03:34 +0800 Subject: [PATCH 32/34] update codes --- src/Storages/Hive/HiveFile.cpp | 1 - 1 file changed, 1 deletion(-) diff --git a/src/Storages/Hive/HiveFile.cpp b/src/Storages/Hive/HiveFile.cpp index 671cb11deaa..112798fea4e 100644 --- a/src/Storages/Hive/HiveFile.cpp +++ b/src/Storages/Hive/HiveFile.cpp @@ -88,7 +88,6 @@ std::optional IHiveFile::getRows() has_init_rows = true; } } - return rows; } From 840ace663b205f35a42b28228f165f8561e6bbb7 Mon Sep 17 00:00:00 2001 From: Alexander Tokmakov Date: Wed, 20 Jul 2022 13:19:53 +0300 Subject: [PATCH 33/34] Update run.sh --- docker/test/stress/run.sh | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/docker/test/stress/run.sh b/docker/test/stress/run.sh index 06bf05a1727..ffa0b12b8a3 100755 --- a/docker/test/stress/run.sh +++ b/docker/test/stress/run.sh @@ -362,6 +362,8 @@ else # FIXME Not sure if it's expected, but some tests from BC check may not be finished yet when we restarting server. # Let's just ignore all errors from queries ("} TCPHandler: Code:", "} executeQuery: Code:") # FIXME https://github.com/ClickHouse/ClickHouse/issues/39197 ("Missing columns: 'v3' while processing query: 'v3, k, v1, v2, p'") + # NOTE Incompatibility was introduced in https://github.com/ClickHouse/ClickHouse/pull/39263, it's expected + # ("This engine is deprecated and is not supported in transactions", "[Queue = DB::MergeMutateRuntimeQueue]: Code: 235. DB::Exception: Part") echo "Check for Error messages in server log:" zgrep -Fav -e "Code: 236. DB::Exception: Cancelled merging parts" \ -e "Code: 236. DB::Exception: Cancelled mutating parts" \ @@ -389,6 +391,8 @@ else -e "} TCPHandler: Code:" \ -e "} executeQuery: Code:" \ -e "Missing columns: 'v3' while processing query: 'v3, k, v1, v2, p'" \ + -e "This engine is deprecated and is not supported in transactions" \ + -e "[Queue = DB::MergeMutateRuntimeQueue]: Code: 235. DB::Exception: Part" \ /var/log/clickhouse-server/clickhouse-server.backward.clean.log | zgrep -Fa "" > /test_output/bc_check_error_messages.txt \ && echo -e 'Backward compatibility check: Error message in clickhouse-server.log (see bc_check_error_messages.txt)\tFAIL' >> /test_output/test_results.tsv \ || echo -e 'Backward compatibility check: No Error messages in clickhouse-server.log\tOK' >> /test_output/test_results.tsv From 466fceb3ee6530847e3e697e8806322408c0a463 Mon Sep 17 00:00:00 2001 From: Harry Lee <96150659+HarryLeeIBM@users.noreply.github.com> Date: Wed, 20 Jul 2022 09:25:33 -0400 Subject: [PATCH 34/34] Fix exception in AsynchronousMetrics for s390x (#39193) --- src/Interpreters/AsynchronousMetrics.cpp | 6 ++++++ 1 file changed, 6 insertions(+) diff --git a/src/Interpreters/AsynchronousMetrics.cpp b/src/Interpreters/AsynchronousMetrics.cpp index b057b6ee641..9fd27fc28b6 100644 --- a/src/Interpreters/AsynchronousMetrics.cpp +++ b/src/Interpreters/AsynchronousMetrics.cpp @@ -989,9 +989,15 @@ void AsynchronousMetrics::update(std::chrono::system_clock::time_point update_ti if (s.rfind("processor", 0) == 0) { + /// s390x example: processor 0: version = FF, identification = 039C88, machine = 3906 + /// non s390x example: processor : 0 if (auto colon = s.find_first_of(':')) { +#ifdef __s390x__ + core_id = std::stoi(s.substr(10)); /// 10: length of "processor" plus 1 +#else core_id = std::stoi(s.substr(colon + 2)); +#endif } } else if (s.rfind("cpu MHz", 0) == 0)