diff --git a/dbms/src/DataStreams/ParallelAggregatingBlockInputStream.cpp b/dbms/src/DataStreams/ParallelAggregatingBlockInputStream.cpp index c29481671dd..8476f3020af 100644 --- a/dbms/src/DataStreams/ParallelAggregatingBlockInputStream.cpp +++ b/dbms/src/DataStreams/ParallelAggregatingBlockInputStream.cpp @@ -197,6 +197,13 @@ void ParallelAggregatingBlockInputStream::execute() << "Total aggregated. " << total_src_rows << " rows (from " << total_src_bytes / 1048576.0 << " MiB)" << " in " << elapsed_seconds << " sec." << " (" << total_src_rows / elapsed_seconds << " rows/sec., " << total_src_bytes / elapsed_seconds / 1048576.0 << " MiB/sec.)"); + + /// If there was no data, and we aggregate without keys, we must return single row with the result of empty aggregation. + /// To do this, we pass a block with zero rows to aggregate. + if (total_src_rows == 0 && params.keys_size == 0 && !params.empty_result_for_aggregation_by_empty_set) + aggregator.executeOnBlock(children.at(0)->getHeader(), *many_data[0], + threads_data[0].key_columns, threads_data[0].aggregate_columns, + threads_data[0].key, no_more_keys); } } diff --git a/dbms/src/Interpreters/Aggregator.cpp b/dbms/src/Interpreters/Aggregator.cpp index b2816c75202..df1fb72584c 100644 --- a/dbms/src/Interpreters/Aggregator.cpp +++ b/dbms/src/Interpreters/Aggregator.cpp @@ -648,7 +648,7 @@ void NO_INLINE Aggregator::executeWithoutKeyImpl( } -bool Aggregator::executeOnBlock(Block & block, AggregatedDataVariants & result, +bool Aggregator::executeOnBlock(const Block & block, AggregatedDataVariants & result, ColumnRawPtrs & key_columns, AggregateColumns & aggregate_columns, StringRefs & key, bool & no_more_keys) { @@ -1023,6 +1023,11 @@ void Aggregator::execute(const BlockInputStreamPtr & stream, AggregatedDataVaria break; } + /// If there was no data, and we aggregate without keys, and we must return single row with the result of empty aggregation. + /// To do this, we pass a block with zero rows to aggregate. + if (result.empty() && params.keys_size == 0 && !params.empty_result_for_aggregation_by_empty_set) + executeOnBlock(stream->getHeader(), result, key_columns, aggregate_columns, key, no_more_keys); + double elapsed_seconds = watch.elapsedSeconds(); size_t rows = result.sizeWithoutOverflowRow(); LOG_TRACE(log, std::fixed << std::setprecision(3) diff --git a/dbms/src/Interpreters/Aggregator.h b/dbms/src/Interpreters/Aggregator.h index 08027fe6ae6..a498acf4c74 100644 --- a/dbms/src/Interpreters/Aggregator.h +++ b/dbms/src/Interpreters/Aggregator.h @@ -1009,6 +1009,10 @@ public: /// Settings to flush temporary data to the filesystem (external aggregation). const size_t max_bytes_before_external_group_by; /// 0 - do not use external aggregation. + + /// Return empty result when aggregating without keys on empty set. + bool empty_result_for_aggregation_by_empty_set; + const std::string tmp_path; Params( @@ -1017,20 +1021,24 @@ public: bool overflow_row_, size_t max_rows_to_group_by_, OverflowMode group_by_overflow_mode_, Compiler * compiler_, UInt32 min_count_to_compile_, size_t group_by_two_level_threshold_, size_t group_by_two_level_threshold_bytes_, - size_t max_bytes_before_external_group_by_, const std::string & tmp_path_) + size_t max_bytes_before_external_group_by_, + bool empty_result_for_aggregation_by_empty_set_, + const std::string & tmp_path_) : src_header(src_header_), keys(keys_), aggregates(aggregates_), keys_size(keys.size()), aggregates_size(aggregates.size()), overflow_row(overflow_row_), max_rows_to_group_by(max_rows_to_group_by_), group_by_overflow_mode(group_by_overflow_mode_), compiler(compiler_), min_count_to_compile(min_count_to_compile_), group_by_two_level_threshold(group_by_two_level_threshold_), group_by_two_level_threshold_bytes(group_by_two_level_threshold_bytes_), - max_bytes_before_external_group_by(max_bytes_before_external_group_by_), tmp_path(tmp_path_) + max_bytes_before_external_group_by(max_bytes_before_external_group_by_), + empty_result_for_aggregation_by_empty_set(empty_result_for_aggregation_by_empty_set_), + tmp_path(tmp_path_) { } /// Only parameters that matter during merge. Params(const Block & intermediate_header_, const ColumnNumbers & keys_, const AggregateDescriptions & aggregates_, bool overflow_row_) - : Params(Block(), keys_, aggregates_, overflow_row_, 0, OverflowMode::THROW, nullptr, 0, 0, 0, 0, "") + : Params(Block(), keys_, aggregates_, overflow_row_, 0, OverflowMode::THROW, nullptr, 0, 0, 0, 0, false, "") { intermediate_header = intermediate_header_; } @@ -1050,7 +1058,7 @@ public: using AggregateFunctionsPlainPtrs = std::vector; /// Process one block. Return false if the processing should be aborted (with group_by_overflow_mode = 'break'). - bool executeOnBlock(Block & block, AggregatedDataVariants & result, + bool executeOnBlock(const Block & block, AggregatedDataVariants & result, ColumnRawPtrs & key_columns, AggregateColumns & aggregate_columns, /// Passed to not create them anew for each block StringRefs & keys, /// - pass the corresponding objects that are initially empty. bool & no_more_keys); diff --git a/dbms/src/Interpreters/InterpreterSelectQuery.cpp b/dbms/src/Interpreters/InterpreterSelectQuery.cpp index b0579452e6f..be916e0b7b0 100644 --- a/dbms/src/Interpreters/InterpreterSelectQuery.cpp +++ b/dbms/src/Interpreters/InterpreterSelectQuery.cpp @@ -21,7 +21,6 @@ #include #include #include -#include #include #include @@ -462,7 +461,7 @@ void InterpreterSelectQuery::executeSingleQuery() union_within_single_query = false; - /** Take out the data from Storage. from_stage - to what stage the request was completed in Storage. */ + /** Read the data from Storage. from_stage - to what stage the request was completed in Storage. */ QueryProcessingStage::Enum from_stage = executeFetchColumns(); LOG_TRACE(log, QueryProcessingStage::toString(from_stage) << " -> " << QueryProcessingStage::toString(to_stage)); @@ -864,18 +863,8 @@ QueryProcessingStage::Enum InterpreterSelectQuery::executeFetchColumns() if (streams.empty()) streams = storage->read(required_columns, query_info, context, from_stage, max_block_size, max_streams); - /// The storage has no data for this query. if (streams.empty()) - { - from_stage = QueryProcessingStage::FetchColumns; - Block header; - for (const auto & name : required_columns) - { - auto type = storage->getDataTypeByName(name); - header.insert({ type->createColumn(), type, name }); - } - streams.emplace_back(std::make_shared(header)); - } + streams.emplace_back(std::make_shared(storage->getSampleBlockForColumns(required_columns))); if (alias_actions) { @@ -971,7 +960,8 @@ void InterpreterSelectQuery::executeAggregation(const ExpressionActionsPtr & exp settings.compile ? &context.getCompiler() : nullptr, settings.min_count_to_compile, allow_to_use_two_level_group_by ? settings.group_by_two_level_threshold : SettingUInt64(0), allow_to_use_two_level_group_by ? settings.group_by_two_level_threshold_bytes : SettingUInt64(0), - settings.limits.max_bytes_before_external_group_by, context.getTemporaryPath()); + settings.limits.max_bytes_before_external_group_by, settings.empty_result_for_aggregation_by_empty_set, + context.getTemporaryPath()); /// If there are several sources, then we perform parallel aggregation if (streams.size() > 1) diff --git a/dbms/src/Interpreters/Settings.h b/dbms/src/Interpreters/Settings.h index 5244ebb5c9b..4946be8acb6 100644 --- a/dbms/src/Interpreters/Settings.h +++ b/dbms/src/Interpreters/Settings.h @@ -180,7 +180,9 @@ struct Settings M(SettingSeconds, http_send_timeout, DEFAULT_HTTP_READ_BUFFER_TIMEOUT, "HTTP send timeout") \ M(SettingSeconds, http_receive_timeout, DEFAULT_HTTP_READ_BUFFER_TIMEOUT, "HTTP receive timeout") \ M(SettingBool, optimize_throw_if_noop, false, "If setting is enabled and OPTIMIZE query didn't actually assign a merge then an explanatory exception is thrown") \ - M(SettingBool, use_index_for_in_with_subqueries, true, "Try using an index if there is a subquery or a table expression on the right side of the IN operator.") + M(SettingBool, use_index_for_in_with_subqueries, true, "Try using an index if there is a subquery or a table expression on the right side of the IN operator.") \ + \ + M(SettingBool, empty_result_for_aggregation_by_empty_set, false, "Return empty result when aggregating without keys on empty set.") /// Possible limits for query execution. diff --git a/dbms/src/Interpreters/tests/aggregate.cpp b/dbms/src/Interpreters/tests/aggregate.cpp index 046dbf58d9e..65db982dbd1 100644 --- a/dbms/src/Interpreters/tests/aggregate.cpp +++ b/dbms/src/Interpreters/tests/aggregate.cpp @@ -79,7 +79,7 @@ int main(int argc, char ** argv) Aggregator::Params params( stream->getHeader(), {0, 1}, aggregate_descriptions, - false, 0, OverflowMode::THROW, nullptr, 0, 0, 0, 0, ""); + false, 0, OverflowMode::THROW, nullptr, 0, 0, 0, 0, false, ""); Aggregator aggregator(params); diff --git a/dbms/tests/queries/0_stateless/00572_aggregation_by_empty_set.reference b/dbms/tests/queries/0_stateless/00572_aggregation_by_empty_set.reference new file mode 100644 index 00000000000..42cb3bf7829 --- /dev/null +++ b/dbms/tests/queries/0_stateless/00572_aggregation_by_empty_set.reference @@ -0,0 +1,5 @@ +0 +0 +1 +0 0 nan \N [] [] +1 diff --git a/dbms/tests/queries/0_stateless/00572_aggregation_by_empty_set.sql b/dbms/tests/queries/0_stateless/00572_aggregation_by_empty_set.sql new file mode 100644 index 00000000000..2d627ede8b2 --- /dev/null +++ b/dbms/tests/queries/0_stateless/00572_aggregation_by_empty_set.sql @@ -0,0 +1,19 @@ +CREATE TEMPORARY TABLE t (x UInt8); + +SET empty_result_for_aggregation_by_empty_set = 0; + +SELECT count() FROM system.one WHERE 0; +SELECT count() FROM system.one WHERE rand() < 0; +SELECT count() FROM system.one WHERE 1; + +SELECT count(), uniq(x), avg(x), avg(toNullable(x)), groupArray(x), groupUniqArray(x) FROM t; +SELECT x, count(), uniq(x), avg(x), avg(toNullable(x)), groupArray(x), groupUniqArray(x) FROM t GROUP BY x; + +SET empty_result_for_aggregation_by_empty_set = 1; + +SELECT count() FROM system.one WHERE 0; +SELECT count() FROM system.one WHERE rand() < 0; +SELECT count() FROM system.one WHERE 1; + +SELECT count(), uniq(x), avg(x), avg(toNullable(x)), groupArray(x), groupUniqArray(x) FROM t; +SELECT x, count(), uniq(x), avg(x), avg(toNullable(x)), groupArray(x), groupUniqArray(x) FROM t GROUP BY x;