mirror of
https://github.com/ClickHouse/ClickHouse.git
synced 2024-11-19 14:11:58 +00:00
some changes
This commit is contained in:
parent
4b25d30783
commit
83e9e5d0e5
@ -8,6 +8,6 @@ namespace DB
|
||||
{
|
||||
|
||||
using ColumnNumbers = std::vector<size_t>;
|
||||
using ColumnNumbersTwoDimension = std::vector<ColumnNumbers>;
|
||||
using ColumnNumbersList = std::vector<ColumnNumbers>;
|
||||
|
||||
}
|
||||
|
@ -109,7 +109,7 @@ public:
|
||||
std::optional<NameAndTypePair> tryGetByName(const std::string & name) const;
|
||||
};
|
||||
|
||||
using TwoDimensionNamesAndTypesList = std::list<NamesAndTypesList>;
|
||||
using NamesAndTypesLists = std::vector<NamesAndTypesList>;
|
||||
|
||||
}
|
||||
|
||||
|
@ -879,7 +879,7 @@ public:
|
||||
|
||||
/// What to count.
|
||||
ColumnNumbers keys;
|
||||
const ColumnNumbersTwoDimension keys_vector;
|
||||
const ColumnNumbersList keys_vector;
|
||||
const AggregateDescriptions aggregates;
|
||||
size_t keys_size;
|
||||
const size_t aggregates_size;
|
||||
@ -943,7 +943,7 @@ public:
|
||||
Params(
|
||||
const Block & src_header_,
|
||||
const ColumnNumbers & keys_,
|
||||
const ColumnNumbersTwoDimension & keys_vector_, const AggregateDescriptions & aggregates_,
|
||||
const ColumnNumbersList & keys_vector_, const AggregateDescriptions & aggregates_,
|
||||
bool overflow_row_, size_t max_rows_to_group_by_, OverflowMode group_by_overflow_mode_,
|
||||
size_t group_by_two_level_threshold_, size_t group_by_two_level_threshold_bytes_,
|
||||
size_t max_bytes_before_external_group_by_,
|
||||
|
@ -395,7 +395,7 @@ void ExpressionAnalyzer::analyzeAggregation(ActionsDAGPtr & temp_actions)
|
||||
}
|
||||
}
|
||||
|
||||
aggregation_keys_list.push_back(grouping_set_list);
|
||||
aggregation_keys_list.push_back(std::move(grouping_set_list));
|
||||
}
|
||||
else
|
||||
{
|
||||
|
@ -64,8 +64,8 @@ struct ExpressionAnalyzerData
|
||||
|
||||
bool has_aggregation = false;
|
||||
NamesAndTypesList aggregation_keys;
|
||||
NamesAndTypesLists aggregation_keys_list;
|
||||
bool has_const_aggregation_keys = false;
|
||||
TwoDimensionNamesAndTypesList aggregation_keys_list;
|
||||
AggregateDescriptions aggregate_descriptions;
|
||||
|
||||
WindowDescriptions window_descriptions;
|
||||
@ -324,7 +324,7 @@ public:
|
||||
|
||||
const NamesAndTypesList & aggregationKeys() const { return aggregation_keys; }
|
||||
bool hasConstAggregationKeys() const { return has_const_aggregation_keys; }
|
||||
const TwoDimensionNamesAndTypesList & aggregationKeysList() const { return aggregation_keys_list; }
|
||||
const NamesAndTypesLists & aggregationKeysList() const { return aggregation_keys_list; }
|
||||
const AggregateDescriptions & aggregates() const { return aggregate_descriptions; }
|
||||
|
||||
const PreparedSets & getPreparedSets() const { return prepared_sets; }
|
||||
|
@ -2062,61 +2062,35 @@ void InterpreterSelectQuery::executeAggregation(QueryPlan & query_plan, const Ac
|
||||
return;
|
||||
|
||||
const auto & header_before_aggregation = query_plan.getCurrentDataStream().header;
|
||||
ColumnNumbers keys;
|
||||
ColumnNumbers all_keys;
|
||||
ColumnNumbersTwoDimension keys_vector;
|
||||
auto & query = getSelectQuery();
|
||||
if (query.group_by_with_grouping_sets)
|
||||
{
|
||||
std::set<size_t> keys_set;
|
||||
for (const auto & aggregation_keys : query_analyzer->aggregationKeysList())
|
||||
{
|
||||
keys.clear();
|
||||
for (const auto & key : aggregation_keys)
|
||||
{
|
||||
size_t key_name_pos = header_before_aggregation.getPositionByName(key.name);
|
||||
if (!keys_set.count(key_name_pos))
|
||||
{
|
||||
keys_set.insert(key_name_pos);
|
||||
}
|
||||
keys.push_back(key_name_pos);
|
||||
LOG_DEBUG(
|
||||
log,
|
||||
"execute aggregation with grouping sets add key with name {} and number {}",
|
||||
key.name,
|
||||
header_before_aggregation.getPositionByName(key.name));
|
||||
}
|
||||
keys_vector.push_back(keys);
|
||||
LOG_DEBUG(
|
||||
log,
|
||||
"execute aggregation with grouping sets add keys set of size {}",
|
||||
keys.size());
|
||||
}
|
||||
all_keys.assign(keys_set.begin(), keys_set.end());
|
||||
LOG_DEBUG(
|
||||
log,
|
||||
"execute aggregation with grouping sets add all keys of size {}",
|
||||
all_keys.size());
|
||||
}
|
||||
else
|
||||
{
|
||||
for (const auto & key : query_analyzer->aggregationKeys())
|
||||
{
|
||||
keys.push_back(header_before_aggregation.getPositionByName(key.name));
|
||||
LOG_DEBUG(log, "execute aggregation without grouping sets pushed back key with name {} and number {}", key.name, header_before_aggregation.getPositionByName(key.name));
|
||||
}
|
||||
}
|
||||
|
||||
AggregateDescriptions aggregates = query_analyzer->aggregates();
|
||||
for (auto & descr : aggregates)
|
||||
if (descr.arguments.empty())
|
||||
for (const auto & name : descr.argument_names)
|
||||
descr.arguments.push_back(header_before_aggregation.getPositionByName(name));
|
||||
|
||||
const Settings & settings = context->getSettingsRef();
|
||||
std::shared_ptr<Aggregator::Params> params_ptr;
|
||||
|
||||
auto & query = getSelectQuery();
|
||||
if (query.group_by_with_grouping_sets)
|
||||
{
|
||||
ColumnNumbers keys;
|
||||
ColumnNumbers all_keys;
|
||||
ColumnNumbersList keys_vector;
|
||||
std::unordered_set<size_t> keys_set;
|
||||
for (const auto & aggregation_keys : query_analyzer->aggregationKeysList())
|
||||
{
|
||||
keys.clear();
|
||||
for (const auto & key : aggregation_keys)
|
||||
{
|
||||
size_t key_name_pos = header_before_aggregation.getPositionByName(key.name);
|
||||
keys_set.insert(key_name_pos);
|
||||
keys.push_back(key_name_pos);
|
||||
}
|
||||
keys_vector.push_back(keys);
|
||||
}
|
||||
all_keys.assign(keys_set.begin(), keys_set.end());
|
||||
|
||||
params_ptr = std::make_shared<Aggregator::Params>(
|
||||
header_before_aggregation,
|
||||
all_keys,
|
||||
@ -2139,6 +2113,10 @@ void InterpreterSelectQuery::executeAggregation(QueryPlan & query_plan, const Ac
|
||||
}
|
||||
else
|
||||
{
|
||||
ColumnNumbers keys;
|
||||
for (const auto & key : query_analyzer->aggregationKeys())
|
||||
keys.push_back(header_before_aggregation.getPositionByName(key.name));
|
||||
|
||||
params_ptr = std::make_shared<Aggregator::Params>(
|
||||
header_before_aggregation,
|
||||
keys,
|
||||
@ -2160,13 +2138,10 @@ void InterpreterSelectQuery::executeAggregation(QueryPlan & query_plan, const Ac
|
||||
}
|
||||
SortDescription group_by_sort_description;
|
||||
|
||||
if (group_by_info && settings.optimize_aggregation_in_order && !query.group_by_with_grouping_sets) {
|
||||
if (group_by_info && settings.optimize_aggregation_in_order && !query.group_by_with_grouping_sets)
|
||||
group_by_sort_description = getSortDescriptionFromGroupBy(query);
|
||||
LOG_DEBUG(log, "execute aggregation without grouping sets got group_by_sort_description");
|
||||
} else {
|
||||
else
|
||||
group_by_info = nullptr;
|
||||
LOG_DEBUG(log, "execute aggregation didn't get group_by_sort_description");
|
||||
}
|
||||
|
||||
auto merge_threads = max_streams;
|
||||
auto temporary_data_merge_threads = settings.aggregation_memory_efficient_merge_threads
|
||||
@ -2175,7 +2150,6 @@ void InterpreterSelectQuery::executeAggregation(QueryPlan & query_plan, const Ac
|
||||
|
||||
bool storage_has_evenly_distributed_read = storage && storage->hasEvenlyDistributedRead();
|
||||
|
||||
LOG_DEBUG(log, "execute aggregation header structure before step: {}", query_plan.getCurrentDataStream().header.dumpStructure());
|
||||
auto aggregating_step = std::make_unique<AggregatingStep>(
|
||||
query_plan.getCurrentDataStream(),
|
||||
*params_ptr,
|
||||
@ -2187,7 +2161,6 @@ void InterpreterSelectQuery::executeAggregation(QueryPlan & query_plan, const Ac
|
||||
storage_has_evenly_distributed_read,
|
||||
std::move(group_by_info),
|
||||
std::move(group_by_sort_description));
|
||||
LOG_DEBUG(log, "execute aggregation header structure after step: {}", aggregating_step->getOutputStream().header.dumpStructure());
|
||||
query_plan.addStep(std::move(aggregating_step));
|
||||
}
|
||||
|
||||
@ -2245,7 +2218,7 @@ void InterpreterSelectQuery::executeRollupOrCube(QueryPlan & query_plan, Modific
|
||||
const auto & header_before_transform = query_plan.getCurrentDataStream().header;
|
||||
ColumnNumbers keys;
|
||||
ColumnNumbers all_keys;
|
||||
ColumnNumbersTwoDimension keys_vector;
|
||||
ColumnNumbersList keys_vector;
|
||||
auto & query = getSelectQuery();
|
||||
if (query.group_by_with_grouping_sets)
|
||||
{
|
||||
|
@ -18,7 +18,7 @@ protected:
|
||||
private:
|
||||
AggregatingTransformParamsPtr params;
|
||||
ColumnNumbers keys;
|
||||
ColumnNumbersTwoDimension keys_vector;
|
||||
ColumnNumbersList keys_vector;
|
||||
|
||||
Chunks consumed_chunks;
|
||||
Chunk grouping_sets_chunk;
|
||||
|
@ -464,9 +464,9 @@ void Pipe::addParallelTransforms(Processors transforms)
|
||||
"but " + std::to_string(output_ports.size()) + " expected", ErrorCodes::LOGICAL_ERROR);
|
||||
|
||||
size_t next_output = 0;
|
||||
for (auto * input : inputs)
|
||||
for (size_t i = 0; i < inputs.size(); ++i)
|
||||
{
|
||||
connect(*output_ports[next_output], *input);
|
||||
connect(*output_ports[next_output], *inputs[i]);
|
||||
++next_output;
|
||||
}
|
||||
|
||||
|
@ -18,6 +18,7 @@
|
||||
2 3 0 5200
|
||||
2 4 0 4800
|
||||
2 5 0 5400
|
||||
|
||||
0 0 1 1 4500
|
||||
0 0 2 2 4600
|
||||
0 0 3 3 4700
|
||||
@ -38,6 +39,7 @@
|
||||
2 3 0 0 5200
|
||||
2 4 0 0 4800
|
||||
2 5 0 0 5400
|
||||
|
||||
1 0 24500
|
||||
1 1 4500
|
||||
1 3 4700
|
||||
@ -50,8 +52,8 @@
|
||||
2 6 5000
|
||||
2 8 5200
|
||||
2 10 5400
|
||||
|
||||
0 0 49500
|
||||
|
||||
1 0 24500
|
||||
1 1 4500
|
||||
1 3 4700
|
||||
@ -64,5 +66,4 @@
|
||||
2 6 5000
|
||||
2 8 5200
|
||||
2 10 5400
|
||||
|
||||
0 0 49500
|
||||
0 0 49500
|
||||
|
Loading…
Reference in New Issue
Block a user