Merge pull request #9113 from dimarub2000/group_by_in_order_optimization

[WIP] Optimization of GROUP BY with respect to table sorting key.
This commit is contained in:
Anton Popov 2020-06-06 14:25:59 +03:00 committed by GitHub
commit 5c42408add
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
44 changed files with 994 additions and 161 deletions

View File

@ -150,6 +150,8 @@ public:
virtual void addBatchSinglePlaceNotNull(
size_t batch_size, AggregateDataPtr place, const IColumn ** columns, const UInt8 * null_map, Arena * arena) const = 0;
virtual void addBatchSinglePlaceFromInterval(size_t batch_begin, size_t batch_end, AggregateDataPtr place, const IColumn ** columns, Arena * arena) const = 0;
/** In addition to addBatch, this method collects multiple rows of arguments into array "places"
* as long as they are between offsets[i-1] and offsets[i]. This is used for arrayReduce and
* -Array combinator. It might also be used generally to break data dependency when array
@ -214,6 +216,12 @@ public:
static_cast<const Derived *>(this)->add(place, columns, i, arena);
}
void addBatchSinglePlaceFromInterval(size_t batch_begin, size_t batch_end, AggregateDataPtr place, const IColumn ** columns, Arena * arena) const override
{
for (size_t i = batch_begin; i < batch_end; ++i)
static_cast<const Derived *>(this)->add(place, columns, i, arena);
}
void addBatchArray(
size_t batch_size, AggregateDataPtr * places, size_t place_offset, const IColumn ** columns, const UInt64 * offsets, Arena * arena)
const override

View File

@ -150,7 +150,7 @@ public:
return res;
}
/// Get peice of memory with alignment
/// Get piece of memory with alignment
char * alignedAlloc(size_t size, size_t alignment)
{
do

View File

@ -388,6 +388,7 @@ struct Settings : public SettingsCollection<Settings>
M(SettingBool, enable_debug_queries, false, "Enables debug queries such as AST.", 0) \
M(SettingBool, enable_unaligned_array_join, false, "Allow ARRAY JOIN with multiple arrays that have different sizes. When this settings is enabled, arrays will be resized to the longest one.", 0) \
M(SettingBool, optimize_read_in_order, true, "Enable ORDER BY optimization for reading data in corresponding order in MergeTree tables.", 0) \
M(SettingBool, optimize_aggregation_in_order, false, "Enable GROUP BY optimization for aggregating data in corresponding order in MergeTree tables.", 0) \
M(SettingBool, low_cardinality_allow_in_native_format, true, "Use LowCardinality type in Native format. Otherwise, convert LowCardinality columns to ordinary for select query, and convert ordinary columns to required LowCardinality for insert query.", 0) \
M(SettingBool, cancel_http_readonly_queries_on_client_close, false, "Cancel HTTP readonly queries when a client closes the connection without waiting for response.", 0) \
M(SettingBool, external_table_functions_use_nulls, true, "If it is set to true, external table functions will implicitly use Nullable type if needed. Otherwise NULLs will be substituted with default values. Currently supported only by 'mysql' and 'odbc' table functions.", 0) \

View File

@ -63,7 +63,7 @@ struct SortCursorImpl
for (auto & column_desc : desc)
{
if (!column_desc.column_name.empty())
throw Exception("SortDesctiption should contain column position if SortCursor was used without header.",
throw Exception("SortDescription should contain column position if SortCursor was used without header.",
ErrorCodes::LOGICAL_ERROR);
}
reset(columns, {});

View File

@ -59,6 +59,13 @@ struct SortColumnDescription
{
return !(*this == other);
}
std::string dump() const
{
std::stringstream ss;
ss << column_name << ":" << column_number << ":dir " << direction << "nulls " << nulls_direction;
return ss.str();
}
};
/// Description of the sorting rule for several columns.

View File

@ -530,63 +530,33 @@ void NO_INLINE Aggregator::executeWithoutKeyImpl(
}
bool Aggregator::executeOnBlock(const Block & block, AggregatedDataVariants & result,
ColumnRawPtrs & key_columns, AggregateColumns & aggregate_columns, bool & no_more_keys)
void NO_INLINE Aggregator::executeOnIntervalWithoutKeyImpl(
AggregatedDataWithoutKey & res,
size_t row_begin,
size_t row_end,
AggregateFunctionInstruction * aggregate_instructions,
Arena * arena)
{
UInt64 num_rows = block.rows();
return executeOnBlock(block.getColumns(), num_rows, result, key_columns, aggregate_columns, no_more_keys);
/// Adding values
for (AggregateFunctionInstruction * inst = aggregate_instructions; inst->that; ++inst)
{
if (inst->offsets)
inst->batch_that->addBatchSinglePlaceFromInterval(inst->offsets[row_begin], inst->offsets[row_end - 1], res + inst->state_offset, inst->batch_arguments, arena);
else
inst->batch_that->addBatchSinglePlaceFromInterval(row_begin, row_end, res + inst->state_offset, inst->batch_arguments, arena);
}
}
bool Aggregator::executeOnBlock(Columns columns, UInt64 num_rows, AggregatedDataVariants & result,
ColumnRawPtrs & key_columns, AggregateColumns & aggregate_columns, bool & no_more_keys)
void Aggregator::prepareAggregateInstructions(Columns columns, AggregateColumns & aggregate_columns, Columns & materialized_columns,
AggregateFunctionInstructions & aggregate_functions_instructions, NestedColumnsHolder & nested_columns_holder)
{
if (isCancelled())
return true;
/// `result` will destroy the states of aggregate functions in the destructor
result.aggregator = this;
/// How to perform the aggregation?
if (result.empty())
{
result.init(method_chosen);
result.keys_size = params.keys_size;
result.key_sizes = key_sizes;
LOG_TRACE(log, "Aggregation method: {}", result.getMethodName());
}
if (isCancelled())
return true;
for (size_t i = 0; i < params.aggregates_size; ++i)
aggregate_columns[i].resize(params.aggregates[i].arguments.size());
/** Constant columns are not supported directly during aggregation.
* To make them work anyway, we materialize them.
*/
Columns materialized_columns;
/// Remember the columns we will work with
for (size_t i = 0; i < params.keys_size; ++i)
{
materialized_columns.push_back(columns.at(params.keys[i])->convertToFullColumnIfConst());
key_columns[i] = materialized_columns.back().get();
if (!result.isLowCardinality())
{
auto column_no_lc = recursiveRemoveLowCardinality(key_columns[i]->getPtr());
if (column_no_lc.get() != key_columns[i])
{
materialized_columns.emplace_back(std::move(column_no_lc));
key_columns[i] = materialized_columns.back().get();
}
}
}
AggregateFunctionInstructions aggregate_functions_instructions(params.aggregates_size + 1);
aggregate_functions_instructions.resize(params.aggregates_size + 1);
aggregate_functions_instructions[params.aggregates_size].that = nullptr;
std::vector<std::vector<const IColumn *>> nested_columns_holder;
for (size_t i = 0; i < params.aggregates_size; ++i)
{
for (size_t j = 0; j < aggregate_columns[i].size(); ++j)
@ -627,6 +597,62 @@ bool Aggregator::executeOnBlock(Columns columns, UInt64 num_rows, AggregatedData
aggregate_functions_instructions[i].batch_that = that;
}
}
bool Aggregator::executeOnBlock(const Block & block, AggregatedDataVariants & result,
ColumnRawPtrs & key_columns, AggregateColumns & aggregate_columns, bool & no_more_keys)
{
UInt64 num_rows = block.rows();
return executeOnBlock(block.getColumns(), num_rows, result, key_columns, aggregate_columns, no_more_keys);
}
bool Aggregator::executeOnBlock(Columns columns, UInt64 num_rows, AggregatedDataVariants & result,
ColumnRawPtrs & key_columns, AggregateColumns & aggregate_columns, bool & no_more_keys)
{
if (isCancelled())
return true;
/// `result` will destroy the states of aggregate functions in the destructor
result.aggregator = this;
/// How to perform the aggregation?
if (result.empty())
{
result.init(method_chosen);
result.keys_size = params.keys_size;
result.key_sizes = key_sizes;
LOG_TRACE(log, "Aggregation method: {}", result.getMethodName());
}
if (isCancelled())
return true;
/** Constant columns are not supported directly during aggregation.
* To make them work anyway, we materialize them.
*/
Columns materialized_columns;
/// Remember the columns we will work with
for (size_t i = 0; i < params.keys_size; ++i)
{
materialized_columns.push_back(columns.at(params.keys[i])->convertToFullColumnIfConst());
key_columns[i] = materialized_columns.back().get();
if (!result.isLowCardinality())
{
auto column_no_lc = recursiveRemoveLowCardinality(key_columns[i]->getPtr());
if (column_no_lc.get() != key_columns[i])
{
materialized_columns.emplace_back(std::move(column_no_lc));
key_columns[i] = materialized_columns.back().get();
}
}
}
NestedColumnsHolder nested_columns_holder;
AggregateFunctionInstructions aggregate_functions_instructions;
prepareAggregateInstructions(columns, aggregate_columns, materialized_columns, aggregate_functions_instructions, nested_columns_holder);
if (isCancelled())
return true;
@ -1112,7 +1138,39 @@ Block Aggregator::prepareBlockAndFill(
return res;
}
void Aggregator::fillAggregateColumnsWithSingleKey(
AggregatedDataVariants & data_variants,
MutableColumns & final_aggregate_columns)
{
AggregatedDataWithoutKey & data = data_variants.without_key;
for (size_t i = 0; i < params.aggregates_size; ++i)
{
ColumnAggregateFunction & column_aggregate_func = assert_cast<ColumnAggregateFunction &>(*final_aggregate_columns[i]);
for (auto & pool : data_variants.aggregates_pools)
{
column_aggregate_func.addArena(pool);
}
column_aggregate_func.getData().push_back(data + offsets_of_aggregate_states[i]);
}
data = nullptr;
}
void Aggregator::createStatesAndFillKeyColumnsWithSingleKey(
AggregatedDataVariants & data_variants,
Columns & key_columns,
size_t key_row,
MutableColumns & final_key_columns)
{
AggregateDataPtr place = data_variants.aggregates_pool->alignedAlloc(total_size_of_aggregate_states, align_aggregate_states);
createAggregateStates(place);
data_variants.without_key = place;
for (size_t i = 0; i < params.keys_size; ++i)
{
final_key_columns[i]->insertFrom(*key_columns[i].get(), key_row);
}
}
Block Aggregator::prepareBlockAndFillWithoutKey(AggregatedDataVariants & data_variants, bool final, bool is_overflows) const
{

View File

@ -1002,6 +1002,7 @@ protected:
friend class MergingAndConvertingBlockInputStream;
friend class ConvertingAggregatedToChunksTransform;
friend class ConvertingAggregatedToChunksSource;
friend class AggregatingInOrderTransform;
Params params;
@ -1033,12 +1034,13 @@ protected:
};
using AggregateFunctionInstructions = std::vector<AggregateFunctionInstruction>;
using NestedColumnsHolder = std::vector<std::vector<const IColumn *>>;
Sizes offsets_of_aggregate_states; /// The offset to the n-th aggregate function in a row of aggregate functions.
size_t total_size_of_aggregate_states = 0; /// The total size of the row from the aggregate functions.
// add info to track alignment requirement
// If there are states whose alignmentment are v1, ..vn, align_aggregate_states will be max(v1, ... vn)
// If there are states whose alignment are v1, ..vn, align_aggregate_states will be max(v1, ... vn)
size_t align_aggregate_states = 1;
bool all_aggregates_has_trivial_destructor = false;
@ -1105,6 +1107,13 @@ protected:
AggregateFunctionInstruction * aggregate_instructions,
Arena * arena);
static void executeOnIntervalWithoutKeyImpl(
AggregatedDataWithoutKey & res,
size_t row_begin,
size_t row_end,
AggregateFunctionInstruction * aggregate_instructions,
Arena * arena);
template <typename Method>
void writeToTemporaryFileImpl(
AggregatedDataVariants & data_variants,
@ -1250,6 +1259,22 @@ protected:
* - sets the variable no_more_keys to true.
*/
bool checkLimits(size_t result_size, bool & no_more_keys) const;
void prepareAggregateInstructions(
Columns columns,
AggregateColumns & aggregate_columns,
Columns & materialized_columns,
AggregateFunctionInstructions & instructions,
NestedColumnsHolder & nested_columns_holder);
void fillAggregateColumnsWithSingleKey(
AggregatedDataVariants & data_variants,
MutableColumns & final_aggregate_columns);
void createStatesAndFillKeyColumnsWithSingleKey(
AggregatedDataVariants & data_variants,
Columns & key_columns, size_t key_row,
MutableColumns & final_key_columns);
};

View File

@ -726,7 +726,8 @@ bool SelectQueryExpressionAnalyzer::appendWhere(ExpressionActionsChain & chain,
return true;
}
bool SelectQueryExpressionAnalyzer::appendGroupBy(ExpressionActionsChain & chain, bool only_types)
bool SelectQueryExpressionAnalyzer::appendGroupBy(ExpressionActionsChain & chain, bool only_types, bool optimize_aggregation_in_order,
ManyExpressionActions & group_by_elements_actions)
{
const auto * select_query = getAggregatingQuery();
@ -743,6 +744,20 @@ bool SelectQueryExpressionAnalyzer::appendGroupBy(ExpressionActionsChain & chain
getRootActions(ast, only_types, step.actions);
}
if (optimize_aggregation_in_order)
{
auto all_columns = sourceWithJoinedColumns();
for (auto & child : asts)
{
group_by_elements_actions.emplace_back(std::make_shared<ExpressionActions>(all_columns, context));
getRootActions(child, only_types, group_by_elements_actions.back());
}
// std::cerr << "group_by_elements_actions\n";
// for (const auto & elem : group_by_elements_actions) {
// std::cerr << elem->dumpActions() << "\n";
// }
}
return true;
}
@ -834,8 +849,11 @@ bool SelectQueryExpressionAnalyzer::appendOrderBy(ExpressionActionsChain & chain
order_by_elements_actions.emplace_back(std::make_shared<ExpressionActions>(all_columns, context));
getRootActions(child, only_types, order_by_elements_actions.back());
}
// std::cerr << "order_by_elements_actions\n";
// for (const auto & elem : order_by_elements_actions) {
// std::cerr << elem->dumpActions() << "\n";
// }
}
return true;
}
@ -1115,7 +1133,12 @@ ExpressionAnalysisResult::ExpressionAnalysisResult(
if (need_aggregate)
{
query_analyzer.appendGroupBy(chain, only_types || !first_stage);
/// TODO correct conditions
optimize_aggregation_in_order =
context.getSettingsRef().optimize_aggregation_in_order
&& storage && query.groupBy();
query_analyzer.appendGroupBy(chain, only_types || !first_stage, optimize_aggregation_in_order, group_by_elements_actions);
query_analyzer.appendAggregateFunctionsArguments(chain, only_types || !first_stage);
before_aggregation = chain.getLastActions();
@ -1128,13 +1151,13 @@ ExpressionAnalysisResult::ExpressionAnalysisResult(
}
}
bool has_stream_with_non_joned_rows = (before_join && before_join->getTableJoinAlgo()->hasStreamWithNonJoinedRows());
bool has_stream_with_non_joined_rows = (before_join && before_join->getTableJoinAlgo()->hasStreamWithNonJoinedRows());
optimize_read_in_order =
settings.optimize_read_in_order
&& storage && query.orderBy()
&& !query_analyzer.hasAggregation()
&& !query.final()
&& !has_stream_with_non_joned_rows;
&& !has_stream_with_non_joined_rows;
/// If there is aggregation, we execute expressions in SELECT and ORDER BY on the initiating server, otherwise on the source servers.
query_analyzer.appendSelect(chain, only_types || (need_aggregate ? !second_stage : !first_stage));

View File

@ -174,6 +174,7 @@ struct ExpressionAnalysisResult
bool remove_where_filter = false;
bool optimize_read_in_order = false;
bool optimize_aggregation_in_order = false;
ExpressionActionsPtr before_join; /// including JOIN
ExpressionActionsPtr before_where;
@ -195,6 +196,7 @@ struct ExpressionAnalysisResult
ConstantFilterDescription where_constant_filter_description;
/// Actions by every element of ORDER BY
ManyExpressionActions order_by_elements_actions;
ManyExpressionActions group_by_elements_actions;
ExpressionAnalysisResult() = default;
@ -303,7 +305,7 @@ private:
/// Columns in `additional_required_columns` will not be removed (they can be used for e.g. sampling or FINAL modifier).
bool appendPrewhere(ExpressionActionsChain & chain, bool only_types, const Names & additional_required_columns);
bool appendWhere(ExpressionActionsChain & chain, bool only_types);
bool appendGroupBy(ExpressionActionsChain & chain, bool only_types);
bool appendGroupBy(ExpressionActionsChain & chain, bool only_types, bool optimize_aggregation_in_order, ManyExpressionActions &);
void appendAggregateFunctionsArguments(ExpressionActionsChain & chain, bool only_types);
/// After aggregation:

View File

@ -71,6 +71,8 @@
#include <Processors/Pipe.h>
#include <Processors/Sources/SourceFromSingleChunk.h>
#include <Processors/Transforms/ConvertingTransform.h>
#include <Processors/Transforms/AggregatingInOrderTransform.h>
#include <Processors/Merges/AggregatingSortedTransform.h>
namespace DB
@ -601,6 +603,20 @@ static SortDescription getSortDescription(const ASTSelectQuery & query, const Co
return order_descr;
}
static SortDescription getSortDescriptionFromGroupBy(const ASTSelectQuery & query)
{
SortDescription order_descr;
order_descr.reserve(query.groupBy()->children.size());
for (const auto & elem : query.groupBy()->children)
{
String name = elem->getColumnName();
order_descr.emplace_back(name, 1, 1);
}
return order_descr;
}
static UInt64 getLimitUIntValue(const ASTPtr & node, const Context & context, const std::string & expr)
{
const auto & [field, type] = evaluateConstantExpression(node, context);
@ -739,7 +755,7 @@ void InterpreterSelectQuery::executeImpl(QueryPipeline & pipeline, const BlockIn
if (!expressions.second_stage && !expressions.need_aggregate && !expressions.hasHaving())
{
if (expressions.has_order_by)
executeOrder(pipeline, query_info.input_sorting_info);
executeOrder(pipeline, query_info.input_order_info);
if (expressions.has_order_by && query.limitLength())
executeDistinct(pipeline, false, expressions.selected_columns);
@ -832,7 +848,11 @@ void InterpreterSelectQuery::executeImpl(QueryPipeline & pipeline, const BlockIn
executeWhere(pipeline, expressions.before_where, expressions.remove_where_filter);
if (expressions.need_aggregate)
executeAggregation(pipeline, expressions.before_aggregation, aggregate_overflow_row, aggregate_final);
{
executeAggregation(pipeline, expressions.before_aggregation, aggregate_overflow_row, aggregate_final, query_info.input_order_info);
/// We need to reset input order info, so that executeOrder can't use it
query_info.input_order_info.reset();
}
else
{
executeExpression(pipeline, expressions.before_order_and_select);
@ -898,7 +918,7 @@ void InterpreterSelectQuery::executeImpl(QueryPipeline & pipeline, const BlockIn
if (!expressions.first_stage && !expressions.need_aggregate && !(query.group_by_with_totals && !aggregate_final))
executeMergeSorted(pipeline);
else /// Otherwise, just sort.
executeOrder(pipeline, query_info.input_sorting_info);
executeOrder(pipeline, query_info.input_order_info);
}
/** Optimization - if there are several sources and there is LIMIT, then first apply the preliminary LIMIT,
@ -1262,15 +1282,21 @@ void InterpreterSelectQuery::executeFetchColumns(
query_info.prewhere_info = prewhere_info;
/// Create optimizer with prepared actions.
/// Maybe we will need to calc input_sorting_info later, e.g. while reading from StorageMerge.
if (analysis_result.optimize_read_in_order)
/// Maybe we will need to calc input_order_info later, e.g. while reading from StorageMerge.
if (analysis_result.optimize_read_in_order || analysis_result.optimize_aggregation_in_order)
{
query_info.order_by_optimizer = std::make_shared<ReadInOrderOptimizer>(
analysis_result.order_by_elements_actions,
getSortDescription(query, *context),
query_info.syntax_analyzer_result);
if (analysis_result.optimize_read_in_order)
query_info.order_optimizer = std::make_shared<ReadInOrderOptimizer>(
analysis_result.order_by_elements_actions,
getSortDescription(query, *context),
query_info.syntax_analyzer_result);
else
query_info.order_optimizer = std::make_shared<ReadInOrderOptimizer>(
analysis_result.group_by_elements_actions,
getSortDescriptionFromGroupBy(query),
query_info.syntax_analyzer_result);
query_info.input_sorting_info = query_info.order_by_optimizer->getInputOrder(storage);
query_info.input_order_info = query_info.order_optimizer->getInputOrder(storage);
}
Pipes pipes = storage->read(required_columns, query_info, *context, processing_stage, max_block_size, max_streams);
@ -1376,7 +1402,7 @@ void InterpreterSelectQuery::executeWhere(QueryPipeline & pipeline, const Expres
}
void InterpreterSelectQuery::executeAggregation(QueryPipeline & pipeline, const ExpressionActionsPtr & expression, bool overflow_row, bool final)
void InterpreterSelectQuery::executeAggregation(QueryPipeline & pipeline, const ExpressionActionsPtr & expression, bool overflow_row, bool final, InputOrderInfoPtr group_by_info)
{
pipeline.addSimpleTransform([&](const Block & header)
{
@ -1414,6 +1440,62 @@ void InterpreterSelectQuery::executeAggregation(QueryPipeline & pipeline, const
/// Forget about current totals and extremes. They will be calculated again after aggregation if needed.
pipeline.dropTotalsAndExtremes();
if (group_by_info && settings.optimize_aggregation_in_order)
{
auto & query = getSelectQuery();
SortDescription group_by_descr = getSortDescriptionFromGroupBy(query);
bool need_finish_sorting = (group_by_info->order_key_prefix_descr.size() < group_by_descr.size());
if (need_finish_sorting)
{
/// TOO SLOW
}
else
{
if (pipeline.getNumStreams() > 1)
{
auto many_data = std::make_shared<ManyAggregatedData>(pipeline.getNumStreams());
size_t counter = 0;
pipeline.addSimpleTransform([&](const Block & header)
{
return std::make_shared<AggregatingInOrderTransform>(header, transform_params, group_by_descr, settings.max_block_size, many_data, counter++);
});
for (auto & column_description : group_by_descr)
{
if (!column_description.column_name.empty())
{
column_description.column_number = pipeline.getHeader().getPositionByName(column_description.column_name);
column_description.column_name.clear();
}
}
auto transform = std::make_shared<AggregatingSortedTransform>(
pipeline.getHeader(),
pipeline.getNumStreams(),
group_by_descr,
settings.max_block_size);
pipeline.addPipe({ std::move(transform) });
}
else
{
pipeline.addSimpleTransform([&](const Block & header)
{
return std::make_shared<AggregatingInOrderTransform>(header, transform_params, group_by_descr, settings.max_block_size);
});
}
pipeline.addSimpleTransform([&](const Block & header)
{
return std::make_shared<FinalizingSimpleTransform>(header, transform_params);
});
pipeline.enableQuotaForCurrentStreams();
return;
}
}
/// If there are several sources, then we perform parallel aggregation
if (pipeline.getNumStreams() > 1)
{
@ -1576,7 +1658,47 @@ void InterpreterSelectQuery::executeExpression(QueryPipeline & pipeline, const E
}
void InterpreterSelectQuery::executeOrder(QueryPipeline & pipeline, InputSortingInfoPtr input_sorting_info)
void InterpreterSelectQuery::executeOrderOptimized(QueryPipeline & pipeline, InputOrderInfoPtr input_sorting_info, UInt64 limit, SortDescription & output_order_descr)
{
const Settings & settings = context->getSettingsRef();
bool need_finish_sorting = (input_sorting_info->order_key_prefix_descr.size() < output_order_descr.size());
if (pipeline.getNumStreams() > 1)
{
UInt64 limit_for_merging = (need_finish_sorting ? 0 : limit);
auto transform = std::make_shared<MergingSortedTransform>(
pipeline.getHeader(),
pipeline.getNumStreams(),
input_sorting_info->order_key_prefix_descr,
settings.max_block_size, limit_for_merging);
pipeline.addPipe({ std::move(transform) });
}
pipeline.enableQuotaForCurrentStreams();
if (need_finish_sorting)
{
pipeline.addSimpleTransform([&](const Block & header, QueryPipeline::StreamType stream_type) -> ProcessorPtr
{
if (stream_type != QueryPipeline::StreamType::Main)
return nullptr;
return std::make_shared<PartialSortingTransform>(header, output_order_descr, limit);
});
/// NOTE limits are not applied to the size of temporary sets in FinishSortingTransform
pipeline.addSimpleTransform([&](const Block & header) -> ProcessorPtr
{
return std::make_shared<FinishSortingTransform>(
header, input_sorting_info->order_key_prefix_descr,
output_order_descr, settings.max_block_size, limit);
});
}
}
void InterpreterSelectQuery::executeOrder(QueryPipeline & pipeline, InputOrderInfoPtr input_sorting_info)
{
auto & query = getSelectQuery();
SortDescription output_order_descr = getSortDescription(query, *context);
@ -1596,43 +1718,7 @@ void InterpreterSelectQuery::executeOrder(QueryPipeline & pipeline, InputSorting
* and then merge them into one sorted stream.
* At this stage we merge per-thread streams into one.
*/
bool need_finish_sorting = (input_sorting_info->order_key_prefix_descr.size() < output_order_descr.size());
if (pipeline.getNumStreams() > 1)
{
UInt64 limit_for_merging = (need_finish_sorting ? 0 : limit);
auto transform = std::make_shared<MergingSortedTransform>(
pipeline.getHeader(),
pipeline.getNumStreams(),
input_sorting_info->order_key_prefix_descr,
settings.max_block_size, limit_for_merging);
pipeline.addPipe({ std::move(transform) });
}
pipeline.enableQuotaForCurrentStreams();
if (need_finish_sorting)
{
pipeline.addSimpleTransform([&](const Block & header, QueryPipeline::StreamType stream_type) -> ProcessorPtr
{
if (stream_type != QueryPipeline::StreamType::Main)
return nullptr;
return std::make_shared<PartialSortingTransform>(header, output_order_descr, limit);
});
/// NOTE limits are not applied to the size of temporary sets in FinishSortingTransform
pipeline.addSimpleTransform([&](const Block & header) -> ProcessorPtr
{
return std::make_shared<FinishSortingTransform>(
header, input_sorting_info->order_key_prefix_descr,
output_order_descr, settings.max_block_size, limit);
});
}
executeOrderOptimized(pipeline, input_sorting_info, limit, output_order_descr);
return;
}
@ -1905,8 +1991,8 @@ void InterpreterSelectQuery::executeExtremes(QueryPipeline & pipeline)
void InterpreterSelectQuery::executeSubqueriesInSetsAndJoins(QueryPipeline & pipeline, const SubqueriesForSets & subqueries_for_sets)
{
if (query_info.input_sorting_info)
executeMergeSorted(pipeline, query_info.input_sorting_info->order_key_prefix_descr, 0);
if (query_info.input_order_info)
executeMergeSorted(pipeline, query_info.input_order_info->order_key_prefix_descr, 0);
const Settings & settings = context->getSettingsRef();

View File

@ -113,12 +113,13 @@ private:
const Names & columns_to_remove_after_prewhere);
void executeWhere(QueryPipeline & pipeline, const ExpressionActionsPtr & expression, bool remove_filter);
void executeAggregation(QueryPipeline & pipeline, const ExpressionActionsPtr & expression, bool overflow_row, bool final);
void executeAggregation(QueryPipeline & pipeline, const ExpressionActionsPtr & expression, bool overflow_row, bool final, InputOrderInfoPtr group_by_info);
void executeMergeAggregated(QueryPipeline & pipeline, bool overflow_row, bool final);
void executeTotalsAndHaving(QueryPipeline & pipeline, bool has_having, const ExpressionActionsPtr & expression, bool overflow_row, bool final);
void executeHaving(QueryPipeline & pipeline, const ExpressionActionsPtr & expression);
static void executeExpression(QueryPipeline & pipeline, const ExpressionActionsPtr & expression);
void executeOrder(QueryPipeline & pipeline, InputSortingInfoPtr sorting_info);
void executeOrder(QueryPipeline & pipeline, InputOrderInfoPtr sorting_info);
void executeOrderOptimized(QueryPipeline & pipeline, InputOrderInfoPtr sorting_info, UInt64 limit, SortDescription & output_order_descr);
void executeWithFill(QueryPipeline & pipeline);
void executeMergeSorted(QueryPipeline & pipeline);
void executePreLimit(QueryPipeline & pipeline, bool do_not_skip_offset);

View File

@ -158,11 +158,11 @@ public:
static std::string statusToName(Status status);
/** Method 'prepare' is responsible for all cheap ("instantenous": O(1) of data volume, no wait) calculations.
/** Method 'prepare' is responsible for all cheap ("instantaneous": O(1) of data volume, no wait) calculations.
*
* It may access input and output ports,
* indicate the need for work by another processor by returning NeedData or PortFull,
* or indicate the absense of work by returning Finished or Unneeded,
* or indicate the absence of work by returning Finished or Unneeded,
* it may pull data from input ports and push data to output ports.
*
* The method is not thread-safe and must be called from a single thread in one moment of time,

View File

@ -0,0 +1,244 @@
#include <Processors/Transforms/AggregatingInOrderTransform.h>
#include <DataTypes/DataTypeLowCardinality.h>
namespace DB
{
AggregatingInOrderTransform::AggregatingInOrderTransform(
Block header, AggregatingTransformParamsPtr params_,
const SortDescription & group_by_description_, size_t res_block_size_)
: AggregatingInOrderTransform(std::move(header), std::move(params_)
, group_by_description_, res_block_size_, std::make_unique<ManyAggregatedData>(1), 0)
{
}
AggregatingInOrderTransform::AggregatingInOrderTransform(
Block header, AggregatingTransformParamsPtr params_,
const SortDescription & group_by_description_, size_t res_block_size_,
ManyAggregatedDataPtr many_data_, size_t current_variant)
: IProcessor({std::move(header)}, {params_->getCustomHeader(false)})
, res_block_size(res_block_size_)
, params(std::move(params_))
, group_by_description(group_by_description_)
, aggregate_columns(params->params.aggregates_size)
, many_data(std::move(many_data_))
, variants(*many_data->variants[current_variant])
{
/// We won't finalize states in order to merge same states (generated due to multi-thread execution) in AggregatingSortedTransform
res_header = params->getCustomHeader(false);
/// Replace column names to column position in description_sorted.
for (auto & column_description : group_by_description)
{
if (!column_description.column_name.empty())
{
column_description.column_number = res_header.getPositionByName(column_description.column_name);
column_description.column_name.clear();
}
}
}
AggregatingInOrderTransform::~AggregatingInOrderTransform() = default;
static bool less(const MutableColumns & lhs, const Columns & rhs, size_t i, size_t j, const SortDescription & descr)
{
for (const auto & elem : descr)
{
size_t ind = elem.column_number;
int res = elem.direction * lhs[ind]->compareAt(i, j, *rhs[ind], elem.nulls_direction);
if (res < 0)
return true;
else if (res > 0)
return false;
}
return false;
}
void AggregatingInOrderTransform::consume(Chunk chunk)
{
size_t rows = chunk.getNumRows();
if (rows == 0)
return;
if (!is_consume_started)
{
LOG_TRACE(log, "Aggregating in order");
is_consume_started = true;
}
src_rows += rows;
src_bytes += chunk.bytes();
Columns materialized_columns;
Columns key_columns(params->params.keys_size);
for (size_t i = 0; i < params->params.keys_size; ++i)
{
materialized_columns.push_back(chunk.getColumns().at(params->params.keys[i])->convertToFullColumnIfConst());
key_columns[i] = materialized_columns.back();
}
Aggregator::NestedColumnsHolder nested_columns_holder;
Aggregator::AggregateFunctionInstructions aggregate_function_instructions;
params->aggregator.prepareAggregateInstructions(chunk.getColumns(), aggregate_columns, materialized_columns, aggregate_function_instructions, nested_columns_holder);
size_t key_end = 0;
size_t key_begin = 0;
/// If we don't have a block we create it and fill with first key
if (!cur_block_size)
{
res_key_columns.resize(params->params.keys_size);
res_aggregate_columns.resize(params->params.aggregates_size);
for (size_t i = 0; i < params->params.keys_size; ++i)
{
res_key_columns[i] = res_header.safeGetByPosition(i).type->createColumn();
}
for (size_t i = 0; i < params->params.aggregates_size; ++i)
{
res_aggregate_columns[i] = res_header.safeGetByPosition(i + params->params.keys_size).type->createColumn();
}
params->aggregator.createStatesAndFillKeyColumnsWithSingleKey(variants, key_columns, key_begin, res_key_columns);
++cur_block_size;
}
size_t mid = 0;
size_t high = 0;
size_t low = -1;
/// Will split block into segments with the same key
while (key_end != rows)
{
high = rows;
/// Find the first position of new (not current) key in current chunk
while (high - low > 1)
{
mid = (low + high) / 2;
if (!less(res_key_columns, key_columns, cur_block_size - 1, mid, group_by_description))
low = mid;
else
high = mid;
}
key_end = high;
/// Add data to aggr. state if interval is not empty. Empty when haven't found current key in new block.
if (key_begin != key_end)
{
params->aggregator.executeOnIntervalWithoutKeyImpl(variants.without_key, key_begin, key_end, aggregate_function_instructions.data(), variants.aggregates_pool);
}
low = key_begin = key_end;
/// We finalize last key aggregation state if a new key found.
if (key_begin != rows)
{
params->aggregator.fillAggregateColumnsWithSingleKey(variants, res_aggregate_columns);
/// If res_block_size is reached we have to stop consuming and generate the block. Save the extra rows into new chunk.
if (cur_block_size == res_block_size)
{
Columns source_columns = chunk.detachColumns();
for (auto & source_column : source_columns)
source_column = source_column->cut(key_begin, rows - key_begin);
current_chunk = Chunk(source_columns, rows - key_begin);
block_end_reached = true;
need_generate = true;
cur_block_size = 0;
return;
}
/// We create a new state for the new key and update res_key_columns
params->aggregator.createStatesAndFillKeyColumnsWithSingleKey(variants, key_columns, key_begin, res_key_columns);
++cur_block_size;
}
}
block_end_reached = false;
}
void AggregatingInOrderTransform::work()
{
if (is_consume_finished || need_generate)
{
generate();
}
else
{
consume(std::move(current_chunk));
}
}
IProcessor::Status AggregatingInOrderTransform::prepare()
{
auto & output = outputs.front();
auto & input = inputs.back();
/// Check can output.
if (output.isFinished())
{
input.close();
return Status::Finished;
}
if (!output.canPush())
{
input.setNotNeeded();
return Status::PortFull;
}
if (block_end_reached)
{
if (need_generate)
{
return Status::Ready;
}
else
{
output.push(std::move(to_push_chunk));
return Status::Ready;
}
}
else
{
if (is_consume_finished)
{
output.push(std::move(to_push_chunk));
output.finish();
LOG_TRACE(log, "Aggregated. {} to {} rows (from {})", src_rows, res_rows,
formatReadableSizeWithBinarySuffix(src_bytes));
return Status::Finished;
}
if (input.isFinished())
{
is_consume_finished = true;
return Status::Ready;
}
}
if (!input.hasData())
{
input.setNeeded();
return Status::NeedData;
}
current_chunk = input.pull(!is_consume_finished);
return Status::Ready;
}
void AggregatingInOrderTransform::generate()
{
if (cur_block_size && is_consume_finished)
params->aggregator.fillAggregateColumnsWithSingleKey(variants, res_aggregate_columns);
Block res = res_header.cloneEmpty();
for (size_t i = 0; i < res_key_columns.size(); ++i)
{
res.getByPosition(i).column = std::move(res_key_columns[i]);
}
for (size_t i = 0; i < res_aggregate_columns.size(); ++i)
{
res.getByPosition(i + res_key_columns.size()).column = std::move(res_aggregate_columns[i]);
}
to_push_chunk = convertToChunk(res);
res_rows += to_push_chunk.getNumRows();
need_generate = false;
}
}

View File

@ -0,0 +1,92 @@
#pragma once
#include <Core/SortDescription.h>
#include <Interpreters/Aggregator.h>
#include <Processors/ISimpleTransform.h>
#include <Processors/Transforms/AggregatingTransform.h>
#include <Processors/Transforms/TotalsHavingTransform.h>
namespace DB
{
class AggregatingInOrderTransform : public IProcessor
{
public:
AggregatingInOrderTransform(Block header, AggregatingTransformParamsPtr params,
const SortDescription & group_by_description, size_t res_block_size,
ManyAggregatedDataPtr many_data, size_t current_variant);
AggregatingInOrderTransform(Block header, AggregatingTransformParamsPtr params,
const SortDescription & group_by_description, size_t res_block_size);
~AggregatingInOrderTransform() override;
String getName() const override { return "AggregatingInOrderTransform"; }
Status prepare() override;
void work() override;
void consume(Chunk chunk);
private:
void generate();
size_t res_block_size;
size_t cur_block_size = 0;
MutableColumns res_key_columns;
MutableColumns res_aggregate_columns;
AggregatingTransformParamsPtr params;
SortDescription group_by_description;
Aggregator::AggregateColumns aggregate_columns;
ManyAggregatedDataPtr many_data;
AggregatedDataVariants & variants;
UInt64 src_rows = 0;
UInt64 src_bytes = 0;
UInt64 res_rows = 0;
bool need_generate = false;
bool block_end_reached = false;
bool is_consume_started = false;
bool is_consume_finished = false;
Block res_header;
Chunk current_chunk;
Chunk to_push_chunk;
Poco::Logger * log = &Poco::Logger::get("AggregatingInOrderTransform");
};
class FinalizingSimpleTransform : public ISimpleTransform
{
public:
FinalizingSimpleTransform(Block header, AggregatingTransformParamsPtr params_)
: ISimpleTransform({std::move(header)}, {params_->getHeader()}, true)
, params(params_) {}
void transform(Chunk & chunk) override
{
if (params->final)
finalizeChunk(chunk);
else if (!chunk.getChunkInfo())
{
auto info = std::make_shared<AggregatedChunkInfo>();
chunk.setChunkInfo(std::move(info));
}
}
String getName() const override { return "FinalizingSimpleTransform"; }
private:
AggregatingTransformParamsPtr params;
};
}

View File

@ -19,23 +19,23 @@ namespace ErrorCodes
extern const int LOGICAL_ERROR;
}
/// Convert block to chunk.
/// Adds additional info about aggregation.
Chunk convertToChunk(const Block & block)
{
auto info = std::make_shared<AggregatedChunkInfo>();
info->bucket_num = block.info.bucket_num;
info->is_overflows = block.info.is_overflows;
UInt64 num_rows = block.rows();
Chunk chunk(block.getColumns(), num_rows);
chunk.setChunkInfo(std::move(info));
return chunk;
}
namespace
{
/// Convert block to chunk.
/// Adds additional info about aggregation.
Chunk convertToChunk(const Block & block)
{
auto info = std::make_shared<AggregatedChunkInfo>();
info->bucket_num = block.info.bucket_num;
info->is_overflows = block.info.is_overflows;
UInt64 num_rows = block.rows();
Chunk chunk(block.getColumns(), num_rows);
chunk.setChunkInfo(std::move(info));
return chunk;
}
const AggregatedChunkInfo * getInfoFromChunk(const Chunk & chunk)
{
const auto & info = chunk.getChunkInfo();

View File

@ -28,6 +28,8 @@ struct AggregatingTransformParams
: params(params_), aggregator(params), final(final_) {}
Block getHeader() const { return aggregator.getHeader(final); }
Block getCustomHeader(bool final_) const { return aggregator.getHeader(final_); }
};
struct ManyAggregatedData
@ -117,4 +119,6 @@ private:
void initGenerate();
};
Chunk convertToChunk(const Block & block);
}

View File

@ -112,7 +112,7 @@ void FinishSortingTransform::consume(Chunk chunk)
}
}
/// If we reach here, that means that current cunk is first in portion
/// If we reach here, that means that current chunk is first in portion
/// or it all consists of rows with the same key as tail of a previous chunk.
chunks.push_back(std::move(chunk));
}

View File

@ -1,5 +1,6 @@
#include <Processors/ISimpleTransform.h>
#pragma once
#include <Processors/ISimpleTransform.h>
#include <Common/Arena.h>
namespace DB

View File

@ -134,6 +134,7 @@ SRCS(
Transforms/RollupTransform.cpp
Transforms/SortingTransform.cpp
Transforms/TotalsHavingTransform.cpp
Transforms/AggregatingInOrderTransform.cpp
)
END()

View File

@ -637,9 +637,9 @@ Pipes MergeTreeDataSelectExecutor::readFromParts(
reader_settings,
result_projection);
}
else if (settings.optimize_read_in_order && query_info.input_sorting_info)
else if ((settings.optimize_read_in_order || settings.optimize_aggregation_in_order) && query_info.input_order_info)
{
size_t prefix_size = query_info.input_sorting_info->order_key_prefix_descr.size();
size_t prefix_size = query_info.input_order_info->order_key_prefix_descr.size();
auto order_key_prefix_ast = data.getSortingKey().expression_list_ast->clone();
order_key_prefix_ast->children.resize(prefix_size);
@ -855,7 +855,8 @@ Pipes MergeTreeDataSelectExecutor::spreadMarkRangesAmongStreamsWithOrder(
ExpressionActionsPtr & out_projection) const
{
size_t sum_marks = 0;
const InputSortingInfoPtr & input_sorting_info = query_info.input_sorting_info;
const InputOrderInfoPtr & input_order_info = query_info.input_order_info;
size_t adaptive_parts = 0;
std::vector<size_t> sum_marks_in_parts(parts.size());
const auto data_settings = data.getSettings();
@ -998,10 +999,9 @@ Pipes MergeTreeDataSelectExecutor::spreadMarkRangesAmongStreamsWithOrder(
}
parts.emplace_back(part);
}
ranges_to_get_from_part = split_ranges(ranges_to_get_from_part, input_order_info->direction);
ranges_to_get_from_part = split_ranges(ranges_to_get_from_part, input_sorting_info->direction);
if (input_sorting_info->direction == 1)
if (input_order_info->direction == 1)
{
pipes.emplace_back(std::make_shared<MergeTreeSelectProcessor>(
data, part.data_part, max_block_size, settings.preferred_block_size_bytes,
@ -1024,9 +1024,9 @@ Pipes MergeTreeDataSelectExecutor::spreadMarkRangesAmongStreamsWithOrder(
if (pipes.size() > 1)
{
SortDescription sort_description;
for (size_t j = 0; j < input_sorting_info->order_key_prefix_descr.size(); ++j)
for (size_t j = 0; j < input_order_info->order_key_prefix_descr.size(); ++j)
sort_description.emplace_back(data.getSortingKey().column_names[j],
input_sorting_info->direction, 1);
input_order_info->direction, 1);
/// Drop temporary columns, added by 'sorting_key_prefix_expr'
out_projection = createProjection(pipes.back(), data);

View File

@ -30,7 +30,7 @@ ReadInOrderOptimizer::ReadInOrderOptimizer(
forbidden_columns.insert(elem.first);
}
InputSortingInfoPtr ReadInOrderOptimizer::getInputOrder(const StoragePtr & storage) const
InputOrderInfoPtr ReadInOrderOptimizer::getInputOrder(const StoragePtr & storage) const
{
Names sorting_key_columns;
if (const auto * merge_tree = dynamic_cast<const MergeTreeData *>(storage.get()))
@ -122,7 +122,7 @@ InputSortingInfoPtr ReadInOrderOptimizer::getInputOrder(const StoragePtr & stora
if (order_key_prefix_descr.empty())
return {};
return std::make_shared<InputSortingInfo>(std::move(order_key_prefix_descr), read_direction);
return std::make_shared<InputOrderInfo>(std::move(order_key_prefix_descr), read_direction);
}
}

View File

@ -20,10 +20,10 @@ public:
const SortDescription & required_sort_description,
const SyntaxAnalyzerResultPtr & syntax_result);
InputSortingInfoPtr getInputOrder(const StoragePtr & storage) const;
InputOrderInfoPtr getInputOrder(const StoragePtr & storage) const;
private:
/// Actions for every element of order expression to analyze functions for monotonicicy
/// Actions for every element of order expression to analyze functions for monotonicity
ManyExpressionActions elements_actions;
NameSet forbidden_columns;
SortDescription required_sort_description;

View File

@ -2,6 +2,7 @@
#include <Interpreters/PreparedSets.h>
#include <Core/SortDescription.h>
#include <Core/Names.h>
#include <memory>
namespace DB
@ -35,25 +36,25 @@ struct FilterInfo
bool do_remove_column = false;
};
struct InputSortingInfo
struct InputOrderInfo
{
SortDescription order_key_prefix_descr;
int direction;
InputSortingInfo(const SortDescription & order_key_prefix_descr_, int direction_)
InputOrderInfo(const SortDescription & order_key_prefix_descr_, int direction_)
: order_key_prefix_descr(order_key_prefix_descr_), direction(direction_) {}
bool operator ==(const InputSortingInfo & other) const
bool operator ==(const InputOrderInfo & other) const
{
return order_key_prefix_descr == other.order_key_prefix_descr && direction == other.direction;
}
bool operator !=(const InputSortingInfo & other) const { return !(*this == other); }
bool operator !=(const InputOrderInfo & other) const { return !(*this == other); }
};
using PrewhereInfoPtr = std::shared_ptr<PrewhereInfo>;
using FilterInfoPtr = std::shared_ptr<FilterInfo>;
using InputSortingInfoPtr = std::shared_ptr<const InputSortingInfo>;
using InputOrderInfoPtr = std::shared_ptr<const InputOrderInfo>;
struct SyntaxAnalyzerResult;
using SyntaxAnalyzerResultPtr = std::shared_ptr<const SyntaxAnalyzerResult>;
@ -61,6 +62,7 @@ using SyntaxAnalyzerResultPtr = std::shared_ptr<const SyntaxAnalyzerResult>;
class ReadInOrderOptimizer;
using ReadInOrderOptimizerPtr = std::shared_ptr<const ReadInOrderOptimizer>;
/** Query along with some additional data,
* that can be used during query processing
* inside storage engines.
@ -73,9 +75,9 @@ struct SelectQueryInfo
PrewhereInfoPtr prewhere_info;
ReadInOrderOptimizerPtr order_by_optimizer;
ReadInOrderOptimizerPtr order_optimizer;
/// We can modify it while reading from storage
mutable InputSortingInfoPtr input_sorting_info;
mutable InputOrderInfoPtr input_order_info;
/// Prepared sets are used for indices by storage engine.
/// Example: x IN (1, 2, 3)

View File

@ -171,8 +171,8 @@ Pipes StorageBuffer::read(
if (dst_has_same_structure)
{
if (query_info.order_by_optimizer)
query_info.input_sorting_info = query_info.order_by_optimizer->getInputOrder(destination);
if (query_info.order_optimizer)
query_info.input_order_info = query_info.order_optimizer->getInputOrder(destination);
/// The destination table has the same structure of the requested columns and we can simply read blocks from there.
pipes_from_dst = destination->read(column_names, query_info, context, processed_stage, max_block_size, num_streams);

View File

@ -180,8 +180,8 @@ Pipes StorageMaterializedView::read(
auto lock = storage->lockStructureForShare(
false, context.getCurrentQueryId(), context.getSettingsRef().lock_acquire_timeout);
if (query_info.order_by_optimizer)
query_info.input_sorting_info = query_info.order_by_optimizer->getInputOrder(storage);
if (query_info.order_optimizer)
query_info.input_order_info = query_info.order_optimizer->getInputOrder(storage);
Pipes pipes = storage->read(column_names, query_info, context, processed_stage, max_block_size, num_streams);

View File

@ -172,12 +172,12 @@ Pipes StorageMerge::read(
num_streams *= num_streams_multiplier;
size_t remaining_streams = num_streams;
InputSortingInfoPtr input_sorting_info;
if (query_info.order_by_optimizer)
InputOrderInfoPtr input_sorting_info;
if (query_info.order_optimizer)
{
for (auto it = selected_tables.begin(); it != selected_tables.end(); ++it)
{
auto current_info = query_info.order_by_optimizer->getInputOrder(std::get<0>(*it));
auto current_info = query_info.order_optimizer->getInputOrder(std::get<0>(*it));
if (it == selected_tables.begin())
input_sorting_info = current_info;
else if (!current_info || (input_sorting_info && *current_info != *input_sorting_info))
@ -187,7 +187,7 @@ Pipes StorageMerge::read(
break;
}
query_info.input_sorting_info = input_sorting_info;
query_info.input_order_info = input_sorting_info;
}
for (const auto & table : selected_tables)

View File

@ -0,0 +1,23 @@
<test>
<preconditions>
<table_exists>hits_10m_single</table_exists>
<table_exists>hits_100m_single</table_exists>
</preconditions>
<settings><optimize_aggregation_in_order>1</optimize_aggregation_in_order></settings>
<substitutions>
<substitution>
<name>table</name>
<values>
<value>hits_10m_single</value>
<value>hits_100m_single</value>
</values>
</substitution>
</substitutions>
<query>SELECT avg(length(URL)) as x from hits_100m_single GROUP BY CounterID FORMAT Null</query>
<query>SELECT avg(length(URL)) as x from {table} GROUP BY CounterID, EventDate FORMAT Null</query>
<query>SELECT avg(length(URL)) as x from hits_10m_single GROUP BY CounterID, EventDate, intHash32(UserID) FORMAT Null</query>
</test>

View File

@ -0,0 +1,41 @@
1 1
1 2
1 3
1 4
1 5
1 6
2 1
2 2
2 3
2 4
1
2
1 1 101 1
1 2 102 1
1 3 103 1
1 4 104 1
1 5 104 1
1 6 105 1
2 1 213 2
2 2 107 2
2 3 108 2
2 4 109 2
1 619 1
2 537 2
1 619 1
2 537 2
2019-05-05 00:00:00 -45363190
2019-05-05 00:00:00 -1249512288
2019-05-05 00:00:00 345522721
2019-05-05 00:00:00 486601715
2019-05-05 00:00:00 1449669396
2019-05-05 00:00:00 45
2019-05-06 00:00:00 46
2019-05-07 00:00:00 47
2019-05-08 00:00:00 48
2019-05-09 00:00:00 49
2019-05-05 00:00:00 0 1900940608
2019-05-06 00:00:00 1 1857737272
2019-05-07 00:00:00 2 1996614413
2019-05-08 00:00:00 3 1873725230
2019-05-09 00:00:00 4 1831412253

View File

@ -0,0 +1,33 @@
DROP TABLE IF EXISTS pk_order;
SET optimize_aggregation_in_order = 1;
CREATE TABLE pk_order(a UInt64, b UInt64, c UInt64, d UInt64) ENGINE=MergeTree() ORDER BY (a, b);
INSERT INTO pk_order(a, b, c, d) VALUES (1, 1, 101, 1), (1, 2, 102, 1), (1, 3, 103, 1), (1, 4, 104, 1);
INSERT INTO pk_order(a, b, c, d) VALUES (1, 5, 104, 1), (1, 6, 105, 1), (2, 1, 106, 2), (2, 1, 107, 2);
INSERT INTO pk_order(a, b, c, d) VALUES (2, 2, 107, 2), (2, 3, 108, 2), (2, 4, 109, 2);
-- Order after group by in order is determined
SELECT a, b FROM pk_order GROUP BY a, b;
SELECT a FROM pk_order GROUP BY a;
SELECT a, b, sum(c), avg(d) FROM pk_order GROUP BY a, b;
SELECT a, sum(c), avg(d) FROM pk_order GROUP BY a;
SELECT a, sum(c), avg(d) FROM pk_order GROUP BY -a;
DROP TABLE IF EXISTS pk_order;
CREATE TABLE pk_order (d DateTime, a Int32, b Int32) ENGINE = MergeTree ORDER BY (d, a)
PARTITION BY toDate(d) SETTINGS index_granularity=1;
INSERT INTO pk_order
SELECT toDateTime('2019-05-05 00:00:00') + INTERVAL number % 10 DAY, number, intHash32(number) from numbers(100);
set max_block_size = 1;
SELECT d, max(b) FROM pk_order GROUP BY d, a LIMIT 5;
SELECT d, avg(a) FROM pk_order GROUP BY toString(d) LIMIT 5;
SELECT toStartOfHour(d) as d1, min(a), max(b) FROM pk_order GROUP BY d1 LIMIT 5;
DROP TABLE pk_order;

View File

@ -8,3 +8,13 @@
59183 85379
33010362 77807
800784 77492
1704509 523264
732797 475698
598875 337212
792887 252197
3807842 196036
25703952 147211
716829 90109
59183 85379
33010362 77807
800784 77492

View File

@ -1 +1,2 @@
SELECT CounterID, count() AS c FROM test.hits GROUP BY CounterID ORDER BY c DESC LIMIT 10
SELECT CounterID, count() AS c FROM test.hits GROUP BY CounterID ORDER BY c DESC LIMIT 10;
SELECT CounterID, count() AS c FROM test.hits GROUP BY CounterID ORDER BY c DESC LIMIT 10 SETTINGS optimize_aggregation_in_order = 1

View File

@ -98,3 +98,103 @@
7901143 10022 █▌
194599 9997 █▌
21052498 9780 █▍
1704509 523264 ████████████████████████████████████████████████████████████████████████████████
732797 475698 ████████████████████████████████████████████████████████████████████████▋
598875 337212 ███████████████████████████████████████████████████▌
792887 252197 ██████████████████████████████████████▌
3807842 196036 █████████████████████████████▊
25703952 147211 ██████████████████████▌
716829 90109 █████████████▋
59183 85379 █████████████
33010362 77807 ███████████▊
800784 77492 ███████████▋
20810645 73213 ███████████▏
25843850 68945 ██████████▌
23447120 67570 ██████████▎
14739804 64174 █████████▋
32077710 60456 █████████▏
22446879 58389 ████████▊
170282 57017 ████████▋
11482817 52345 ████████
63469 52142 ███████▊
29103473 47758 ███████▎
10136747 44080 ██████▋
27528801 43395 ██████▋
10581377 43279 ██████▌
9841201 40581 ██████▏
20310963 37562 █████▋
17337667 34301 █████▏
28600281 32776 █████
32046685 28788 ████▍
10130880 26603 ████
8676831 25733 ███▊
53230 25595 ███▊
20271226 25585 ███▊
17420663 25496 ███▊
631207 25270 ███▋
633130 24744 ███▋
14324015 23349 ███▌
8537965 21270 ███▎
11285298 20825 ███▏
14937615 20788 ███▏
185050 20785 ███▏
16368233 19897 ███
81602 19724 ███
62896 19717 ███
12967664 19402 ██▊
15996597 18557 ██▋
4379238 18370 ██▋
90982 17443 ██▋
18211045 17390 ██▋
14625884 17302 ██▋
12864910 17279 ██▋
126096 16959 ██▌
30296134 16849 ██▌
26360482 16175 ██▍
17788950 16017 ██▍
5928716 15340 ██▎
15469035 15171 ██▎
29732125 15146 ██▎
32946244 15104 ██▎
20957241 14719 ██▎
9495695 14584 ██▏
29241146 14540 ██▏
109805 14199 ██▏
26905788 13972 ██▏
212019 13930 ██▏
171509 13792 ██
23913162 13615 ██
1861993 13509 ██
125776 13308 ██
11312316 13181 ██
32667326 13181 ██
28628973 12922 █▊
122804 12520 █▊
12322758 12352 █▊
1301819 12283 █▊
10769545 12183 █▋
21566939 12170 █▋
28905364 12158 █▋
4250765 12049 █▋
15009727 11818 █▋
12761932 11733 █▋
26995888 11658 █▋
12759346 11514 █▋
1507911 11452 █▋
968488 11444 █▋
15736172 11358 █▋
54310 11193 █▋
17027391 11047 █▋
17439919 10936 █▋
4480860 10747 █▋
26738469 10738 █▋
9986231 10656 █▋
1539995 10655 █▋
214556 10625 █▌
219339 10522 █▌
3266 10503 █▌
30563429 10128 █▌
1960469 10098 █▌
7901143 10022 █▌
194599 9997 █▌
21052498 9780 █▍

View File

@ -1 +1,2 @@
SELECT CounterID, count() AS c, bar(c, 0, 523264) FROM test.hits GROUP BY CounterID ORDER BY c DESC, CounterID ASC LIMIT 100
SELECT CounterID, count() AS c, bar(c, 0, 523264) FROM test.hits GROUP BY CounterID ORDER BY c DESC, CounterID ASC LIMIT 100;
SELECT CounterID, count() AS c, bar(c, 0, 523264) FROM test.hits GROUP BY CounterID ORDER BY c DESC, CounterID ASC LIMIT 100 SETTINGS optimize_aggregation_in_order = 1

View File

@ -18,3 +18,23 @@
11482817 52345 я скачать игры
63469 52142 яндекс марте рокус надписями я любимому у полосы фото минск
29103473 47758
1704509 523264 نيك امريكي نيك افلام سكس جامد
732797 475698 نيك سكس سيحاق
598875 337212 سکس باصات
792887 252197 №2267 отзыв
3807842 196036 ярмаркетовара 200кг купить по неделю тебелье
25703952 147211
716829 90109 яндекс повыш
59183 85379 франция машину угловы крузер из кофе
33010362 77807 ярмаркетовара 200кг купить по неделю тебелье
800784 77492 ярмаркур смерти теплицы из чего
20810645 73213 ярмаркетовара 200кг купить по неделю тебе перево метиков детский
25843850 68945 электросчет-фактура
23447120 67570 южная степанов
14739804 64174 штангал волк
32077710 60456
22446879 58389 فیلم سكس امريكي نيك
170282 57017 ل افلام السكس
11482817 52345 я скачать игры
63469 52142 яндекс марте рокус надписями я любимому у полосы фото минск
29103473 47758

View File

@ -1 +1,2 @@
SELECT CounterID, count(), maxIf(SearchPhrase, notEmpty(SearchPhrase)) FROM test.hits GROUP BY CounterID ORDER BY count() DESC LIMIT 20
SELECT CounterID, count(), maxIf(SearchPhrase, notEmpty(SearchPhrase)) FROM test.hits GROUP BY CounterID ORDER BY count() DESC LIMIT 20;
SELECT CounterID, count(), maxIf(SearchPhrase, notEmpty(SearchPhrase)) FROM test.hits GROUP BY CounterID ORDER BY count() DESC LIMIT 20 SETTINGS optimize_aggregation_in_order = 1

View File

@ -18,3 +18,23 @@
11482817 4611708000353743073 9223337838355779113
63469 4611695097019173921 9223353530156141191
29103473 4611744585914335132 9223333530281362537
1704509 4611700827100483880 9223360787015464643
732797 4611701940806302259 9223355550934604746
598875 4611701407242345792 9223362250391155632
792887 4611699550286611812 9223290551912005343
3807842 4611710821592843606 9223326163906184987
25703952 4611709443519524003 9223353913449113943
716829 4611852156092872082 9223361623076951140
59183 4611730685242027332 9223354909338698162
33010362 4611704682869732882 9223268545373999677
800784 4611752907938305166 9223340418389788041
20810645 4611712185532639162 9223218900001937412
25843850 4611690025407720929 9223346023778617822
23447120 4611796031755620254 9223329309291309758
14739804 4611692230555590277 9223313509005166531
32077710 4611884228437061959 9223352444952988904
22446879 4611846229717089436 9223124373140579096
170282 4611833225706935900 9223371583739401906
11482817 4611708000353743073 9223337838355779113
63469 4611695097019173921 9223353530156141191
29103473 4611744585914335132 9223333530281362537

View File

@ -1 +1,2 @@
SELECT CounterID, min(WatchID), max(WatchID) FROM test.hits GROUP BY CounterID ORDER BY count() DESC LIMIT 20
SELECT CounterID, min(WatchID), max(WatchID) FROM test.hits GROUP BY CounterID ORDER BY count() DESC LIMIT 20;
SELECT CounterID, min(WatchID), max(WatchID) FROM test.hits GROUP BY CounterID ORDER BY count() DESC LIMIT 20 SETTINGS optimize_aggregation_in_order = 1

View File

@ -18,3 +18,23 @@
11482817 52345 [] [] []
63469 52142 [] [] []
29103473 47758 [6185451] [] [6185451]
1704509 523264 [271264] [] [271264]
732797 475698 [] [] []
598875 337212 [] [] []
792887 252197 [2094893,2028343] [] [1272031]
3807842 196036 [1710269] [] [1134660]
25703952 147211 [] [] []
716829 90109 [4186138] [] [1800405]
59183 85379 [] [] []
33010362 77807 [] [] []
800784 77492 [4002316] [] [1270480]
20810645 73213 [] [] []
25843850 68945 [4028285] [] [4028285]
23447120 67570 [6503091,2762273] [] [2098132]
14739804 64174 [4180720] [] [664490]
32077710 60456 [] [] []
22446879 58389 [] [] []
170282 57017 [4166114] [] [34386,1240412,1248634,1616213,2928740,1458582]
11482817 52345 [] [] []
63469 52142 [] [] []
29103473 47758 [6185451] [] [6185451]

View File

@ -1 +1,2 @@
SELECT CounterID, count(), max(GoalsReached), min(GoalsReached), minIf(GoalsReached, notEmpty(GoalsReached)) FROM test.hits GROUP BY CounterID ORDER BY count() DESC LIMIT 20
SELECT CounterID, count(), max(GoalsReached), min(GoalsReached), minIf(GoalsReached, notEmpty(GoalsReached)) FROM test.hits GROUP BY CounterID ORDER BY count() DESC LIMIT 20;
SELECT CounterID, count(), max(GoalsReached), min(GoalsReached), minIf(GoalsReached, notEmpty(GoalsReached)) FROM test.hits GROUP BY CounterID ORDER BY count() DESC LIMIT 20 SETTINGS optimize_aggregation_in_order = 1

View File

@ -1,3 +1,5 @@
SET max_rows_to_read = 1000;
SELECT CounterID, uniq(UserID) FROM test.hits WHERE 0 != 0 GROUP BY CounterID;
SELECT CounterID, uniq(UserID) FROM test.hits WHERE 0 != 0 GROUP BY CounterID SETTINGS optimize_aggregation_in_order = 1;
SELECT CounterID, uniq(UserID) FROM test.hits WHERE 0 AND CounterID = 1704509 GROUP BY CounterID;
SELECT CounterID, uniq(UserID) FROM test.hits WHERE 0 AND CounterID = 1704509 GROUP BY CounterID SETTINGS optimize_aggregation_in_order = 1;

View File

@ -1 +1,2 @@
10726001768429413598
10726001768429413598

View File

@ -1 +1,2 @@
SELECT sum(cityHash64(*)) FROM (SELECT CounterID, quantileTiming(0.5)(SendTiming), count() FROM remote('127.0.0.{1,2,3,4,5,6,7,8,9,10}', test.hits) WHERE SendTiming != -1 GROUP BY CounterID);
SELECT sum(cityHash64(*)) FROM (SELECT CounterID, quantileTiming(0.5)(SendTiming), count() FROM remote('127.0.0.{1,2,3,4,5,6,7,8,9,10}', test.hits) WHERE SendTiming != -1 GROUP BY CounterID) SETTINGS optimize_aggregation_in_order = 1;

View File

@ -1 +1,2 @@
4379238 1868 1879 5755 0.006
4379238 1868 1879 5755 0.006

View File

@ -1 +1,2 @@
SELECT CounterID, quantileTiming(0.5)(SendTiming) AS qt, least(30000, quantileExact(0.5)(SendTiming)) AS qe, count() AS c, round(abs(qt - qe) / greatest(qt, qe) AS diff, 3) AS rounded_diff FROM test.hits WHERE SendTiming != -1 GROUP BY CounterID HAVING diff != 0 ORDER BY diff DESC;
SELECT CounterID, quantileTiming(0.5)(SendTiming) AS qt, least(30000, quantileExact(0.5)(SendTiming)) AS qe, count() AS c, round(abs(qt - qe) / greatest(qt, qe) AS diff, 3) AS rounded_diff FROM test.hits WHERE SendTiming != -1 GROUP BY CounterID HAVING diff != 0 ORDER BY diff DESC SETTINGS optimize_aggregation_in_order = 1;