some changes

This commit is contained in:
fanzhou 2021-09-06 10:18:42 +08:00 committed by Dmitry Novik
parent ac6f08edf3
commit 43db4594ba
9 changed files with 40 additions and 66 deletions

View File

@ -8,6 +8,6 @@ namespace DB
{
using ColumnNumbers = std::vector<size_t>;
using ColumnNumbersTwoDimension = std::vector<ColumnNumbers>;
using ColumnNumbersList = std::vector<ColumnNumbers>;
}

View File

@ -107,7 +107,7 @@ public:
std::optional<NameAndTypePair> tryGetByName(const std::string & name) const;
};
using TwoDimensionNamesAndTypesList = std::list<NamesAndTypesList>;
using NamesAndTypesLists = std::vector<NamesAndTypesList>;
}

View File

@ -879,7 +879,7 @@ public:
/// What to count.
ColumnNumbers keys;
const ColumnNumbersTwoDimension keys_vector;
const ColumnNumbersList keys_vector;
const AggregateDescriptions aggregates;
size_t keys_size;
const size_t aggregates_size;
@ -943,7 +943,7 @@ public:
Params(
const Block & src_header_,
const ColumnNumbers & keys_,
const ColumnNumbersTwoDimension & keys_vector_, const AggregateDescriptions & aggregates_,
const ColumnNumbersList & keys_vector_, const AggregateDescriptions & aggregates_,
bool overflow_row_, size_t max_rows_to_group_by_, OverflowMode group_by_overflow_mode_,
size_t group_by_two_level_threshold_, size_t group_by_two_level_threshold_bytes_,
size_t max_bytes_before_external_group_by_,

View File

@ -401,7 +401,7 @@ void ExpressionAnalyzer::analyzeAggregation(ActionsDAGPtr & temp_actions)
}
}
aggregation_keys_list.push_back(grouping_set_list);
aggregation_keys_list.push_back(std::move(grouping_set_list));
}
else
{

View File

@ -64,8 +64,8 @@ struct ExpressionAnalyzerData
bool has_aggregation = false;
NamesAndTypesList aggregation_keys;
NamesAndTypesLists aggregation_keys_list;
bool has_const_aggregation_keys = false;
TwoDimensionNamesAndTypesList aggregation_keys_list;
AggregateDescriptions aggregate_descriptions;
WindowDescriptions window_descriptions;
@ -324,7 +324,7 @@ public:
const NamesAndTypesList & aggregationKeys() const { return aggregation_keys; }
bool hasConstAggregationKeys() const { return has_const_aggregation_keys; }
const TwoDimensionNamesAndTypesList & aggregationKeysList() const { return aggregation_keys_list; }
const NamesAndTypesLists & aggregationKeysList() const { return aggregation_keys_list; }
const AggregateDescriptions & aggregates() const { return aggregate_descriptions; }
const PreparedSets & getPreparedSets() const { return prepared_sets; }

View File

@ -2058,61 +2058,35 @@ void InterpreterSelectQuery::executeAggregation(QueryPlan & query_plan, const Ac
return;
const auto & header_before_aggregation = query_plan.getCurrentDataStream().header;
ColumnNumbers keys;
ColumnNumbers all_keys;
ColumnNumbersTwoDimension keys_vector;
auto & query = getSelectQuery();
if (query.group_by_with_grouping_sets)
{
std::set<size_t> keys_set;
for (const auto & aggregation_keys : query_analyzer->aggregationKeysList())
{
keys.clear();
for (const auto & key : aggregation_keys)
{
size_t key_name_pos = header_before_aggregation.getPositionByName(key.name);
if (!keys_set.count(key_name_pos))
{
keys_set.insert(key_name_pos);
}
keys.push_back(key_name_pos);
LOG_DEBUG(
log,
"execute aggregation with grouping sets add key with name {} and number {}",
key.name,
header_before_aggregation.getPositionByName(key.name));
}
keys_vector.push_back(keys);
LOG_DEBUG(
log,
"execute aggregation with grouping sets add keys set of size {}",
keys.size());
}
all_keys.assign(keys_set.begin(), keys_set.end());
LOG_DEBUG(
log,
"execute aggregation with grouping sets add all keys of size {}",
all_keys.size());
}
else
{
for (const auto & key : query_analyzer->aggregationKeys())
{
keys.push_back(header_before_aggregation.getPositionByName(key.name));
LOG_DEBUG(log, "execute aggregation without grouping sets pushed back key with name {} and number {}", key.name, header_before_aggregation.getPositionByName(key.name));
}
}
AggregateDescriptions aggregates = query_analyzer->aggregates();
for (auto & descr : aggregates)
if (descr.arguments.empty())
for (const auto & name : descr.argument_names)
descr.arguments.push_back(header_before_aggregation.getPositionByName(name));
const Settings & settings = context->getSettingsRef();
std::shared_ptr<Aggregator::Params> params_ptr;
auto & query = getSelectQuery();
if (query.group_by_with_grouping_sets)
{
ColumnNumbers keys;
ColumnNumbers all_keys;
ColumnNumbersList keys_vector;
std::unordered_set<size_t> keys_set;
for (const auto & aggregation_keys : query_analyzer->aggregationKeysList())
{
keys.clear();
for (const auto & key : aggregation_keys)
{
size_t key_name_pos = header_before_aggregation.getPositionByName(key.name);
keys_set.insert(key_name_pos);
keys.push_back(key_name_pos);
}
keys_vector.push_back(keys);
}
all_keys.assign(keys_set.begin(), keys_set.end());
params_ptr = std::make_shared<Aggregator::Params>(
header_before_aggregation,
all_keys,
@ -2135,6 +2109,10 @@ void InterpreterSelectQuery::executeAggregation(QueryPlan & query_plan, const Ac
}
else
{
ColumnNumbers keys;
for (const auto & key : query_analyzer->aggregationKeys())
keys.push_back(header_before_aggregation.getPositionByName(key.name));
params_ptr = std::make_shared<Aggregator::Params>(
header_before_aggregation,
keys,
@ -2156,13 +2134,10 @@ void InterpreterSelectQuery::executeAggregation(QueryPlan & query_plan, const Ac
}
SortDescription group_by_sort_description;
if (group_by_info && settings.optimize_aggregation_in_order && !query.group_by_with_grouping_sets) {
if (group_by_info && settings.optimize_aggregation_in_order && !query.group_by_with_grouping_sets)
group_by_sort_description = getSortDescriptionFromGroupBy(query);
LOG_DEBUG(log, "execute aggregation without grouping sets got group_by_sort_description");
} else {
else
group_by_info = nullptr;
LOG_DEBUG(log, "execute aggregation didn't get group_by_sort_description");
}
auto merge_threads = max_streams;
auto temporary_data_merge_threads = settings.aggregation_memory_efficient_merge_threads
@ -2171,7 +2146,6 @@ void InterpreterSelectQuery::executeAggregation(QueryPlan & query_plan, const Ac
bool storage_has_evenly_distributed_read = storage && storage->hasEvenlyDistributedRead();
LOG_DEBUG(log, "execute aggregation header structure before step: {}", query_plan.getCurrentDataStream().header.dumpStructure());
auto aggregating_step = std::make_unique<AggregatingStep>(
query_plan.getCurrentDataStream(),
*params_ptr,
@ -2183,7 +2157,6 @@ void InterpreterSelectQuery::executeAggregation(QueryPlan & query_plan, const Ac
storage_has_evenly_distributed_read,
std::move(group_by_info),
std::move(group_by_sort_description));
LOG_DEBUG(log, "execute aggregation header structure after step: {}", aggregating_step->getOutputStream().header.dumpStructure());
query_plan.addStep(std::move(aggregating_step));
}
@ -2241,7 +2214,7 @@ void InterpreterSelectQuery::executeRollupOrCube(QueryPlan & query_plan, Modific
const auto & header_before_transform = query_plan.getCurrentDataStream().header;
ColumnNumbers keys;
ColumnNumbers all_keys;
ColumnNumbersTwoDimension keys_vector;
ColumnNumbersList keys_vector;
auto & query = getSelectQuery();
if (query.group_by_with_grouping_sets)
{

View File

@ -18,7 +18,7 @@ protected:
private:
AggregatingTransformParamsPtr params;
ColumnNumbers keys;
ColumnNumbersTwoDimension keys_vector;
ColumnNumbersList keys_vector;
Chunks consumed_chunks;
Chunk grouping_sets_chunk;

View File

@ -464,9 +464,9 @@ void Pipe::addParallelTransforms(Processors transforms)
"but " + std::to_string(output_ports.size()) + " expected", ErrorCodes::LOGICAL_ERROR);
size_t next_output = 0;
for (auto * input : inputs)
for (size_t i = 0; i < inputs.size(); ++i)
{
connect(*output_ports[next_output], *input);
connect(*output_ports[next_output], *inputs[i]);
++next_output;
}

View File

@ -18,6 +18,7 @@
2 3 0 5200
2 4 0 4800
2 5 0 5400
0 0 1 1 4500
0 0 2 2 4600
0 0 3 3 4700
@ -38,6 +39,7 @@
2 3 0 0 5200
2 4 0 0 4800
2 5 0 0 5400
1 0 24500
1 1 4500
1 3 4700
@ -50,8 +52,8 @@
2 6 5000
2 8 5200
2 10 5400
0 0 49500
1 0 24500
1 1 4500
1 3 4700
@ -64,5 +66,4 @@
2 6 5000
2 8 5200
2 10 5400
0 0 49500
0 0 49500