mirror of
https://github.com/ClickHouse/ClickHouse.git
synced 2024-12-18 04:12:19 +00:00
Merge pull request #24168 from amosbird/projection-fix3
Fix distributed processing when using projection
This commit is contained in:
commit
98e6c27a95
@ -915,8 +915,10 @@ public:
|
||||
size_t max_bytes_before_external_group_by_,
|
||||
bool empty_result_for_aggregation_by_empty_set_,
|
||||
VolumePtr tmp_volume_, size_t max_threads_,
|
||||
size_t min_free_disk_space_)
|
||||
size_t min_free_disk_space_,
|
||||
const Block & intermediate_header_ = {})
|
||||
: src_header(src_header_),
|
||||
intermediate_header(intermediate_header_),
|
||||
keys(keys_), aggregates(aggregates_), keys_size(keys.size()), aggregates_size(aggregates.size()),
|
||||
overflow_row(overflow_row_), max_rows_to_group_by(max_rows_to_group_by_), group_by_overflow_mode(group_by_overflow_mode_),
|
||||
group_by_two_level_threshold(group_by_two_level_threshold_), group_by_two_level_threshold_bytes(group_by_two_level_threshold_bytes_),
|
||||
|
@ -339,7 +339,7 @@ private:
|
||||
{
|
||||
params->aggregator.mergeWithoutKeyDataImpl(*data);
|
||||
auto block = params->aggregator.prepareBlockAndFillWithoutKey(
|
||||
*first, params->final, first->type != AggregatedDataVariants::Type::without_key);
|
||||
*first, params->final, first->type != AggregatedDataVariants::Type::without_key);
|
||||
|
||||
setCurrentChunk(convertToChunk(block));
|
||||
}
|
||||
@ -381,8 +381,7 @@ private:
|
||||
{
|
||||
/// Select Arena to avoid race conditions
|
||||
Arena * arena = first->aggregates_pools.at(thread).get();
|
||||
auto source = std::make_shared<ConvertingAggregatedToChunksSource>(
|
||||
params, data, shared_data, arena);
|
||||
auto source = std::make_shared<ConvertingAggregatedToChunksSource>(params, data, shared_data, arena);
|
||||
|
||||
processors.emplace_back(std::move(source));
|
||||
}
|
||||
@ -614,7 +613,12 @@ void AggregatingTransform::initGenerate()
|
||||
pipe = Pipe::unitePipes(std::move(pipes));
|
||||
}
|
||||
|
||||
LOG_DEBUG(log, "Will merge {} temporary files of size {} compressed, {} uncompressed.", files.files.size(), ReadableSize(files.sum_size_compressed), ReadableSize(files.sum_size_uncompressed));
|
||||
LOG_DEBUG(
|
||||
log,
|
||||
"Will merge {} temporary files of size {} compressed, {} uncompressed.",
|
||||
files.files.size(),
|
||||
ReadableSize(files.sum_size_compressed),
|
||||
ReadableSize(files.sum_size_uncompressed));
|
||||
|
||||
addMergingAggregatedMemoryEfficientTransform(pipe, params, temporary_data_merge_threads);
|
||||
|
||||
|
@ -284,25 +284,27 @@ QueryPlanPtr MergeTreeDataSelectExecutor::read(
|
||||
descr.arguments.push_back(header_before_aggregation.getPositionByName(name));
|
||||
}
|
||||
|
||||
Aggregator::Params params(
|
||||
header_before_aggregation,
|
||||
keys,
|
||||
aggregates,
|
||||
query_info.projection->aggregate_overflow_row,
|
||||
settings.max_rows_to_group_by,
|
||||
settings.group_by_overflow_mode,
|
||||
settings.group_by_two_level_threshold,
|
||||
settings.group_by_two_level_threshold_bytes,
|
||||
settings.max_bytes_before_external_group_by,
|
||||
settings.empty_result_for_aggregation_by_empty_set,
|
||||
context->getTemporaryVolume(),
|
||||
settings.max_threads,
|
||||
settings.min_free_disk_space_for_temporary_data);
|
||||
|
||||
auto transform_params = std::make_shared<AggregatingTransformParams>(std::move(params), query_info.projection->aggregate_final);
|
||||
|
||||
AggregatingTransformParamsPtr transform_params;
|
||||
if (projection)
|
||||
{
|
||||
Aggregator::Params params(
|
||||
header_before_aggregation,
|
||||
keys,
|
||||
aggregates,
|
||||
query_info.projection->aggregate_overflow_row,
|
||||
settings.max_rows_to_group_by,
|
||||
settings.group_by_overflow_mode,
|
||||
settings.group_by_two_level_threshold,
|
||||
settings.group_by_two_level_threshold_bytes,
|
||||
settings.max_bytes_before_external_group_by,
|
||||
settings.empty_result_for_aggregation_by_empty_set,
|
||||
context->getTemporaryVolume(),
|
||||
settings.max_threads,
|
||||
settings.min_free_disk_space_for_temporary_data,
|
||||
header_before_aggregation); // The source header is also an intermediate header
|
||||
|
||||
transform_params = std::make_shared<AggregatingTransformParams>(std::move(params), query_info.projection->aggregate_final);
|
||||
|
||||
/// This part is hacky.
|
||||
/// We want AggregatingTransform to work with aggregate states instead of normal columns.
|
||||
/// It is almost the same, just instead of adding new data to aggregation state we merge it with existing.
|
||||
@ -312,6 +314,25 @@ QueryPlanPtr MergeTreeDataSelectExecutor::read(
|
||||
/// * is not split into buckets (so if we just use MergingAggregated, it will use single thread)
|
||||
transform_params->only_merge = true;
|
||||
}
|
||||
else
|
||||
{
|
||||
Aggregator::Params params(
|
||||
header_before_aggregation,
|
||||
keys,
|
||||
aggregates,
|
||||
query_info.projection->aggregate_overflow_row,
|
||||
settings.max_rows_to_group_by,
|
||||
settings.group_by_overflow_mode,
|
||||
settings.group_by_two_level_threshold,
|
||||
settings.group_by_two_level_threshold_bytes,
|
||||
settings.max_bytes_before_external_group_by,
|
||||
settings.empty_result_for_aggregation_by_empty_set,
|
||||
context->getTemporaryVolume(),
|
||||
settings.max_threads,
|
||||
settings.min_free_disk_space_for_temporary_data);
|
||||
|
||||
transform_params = std::make_shared<AggregatingTransformParams>(std::move(params), query_info.projection->aggregate_final);
|
||||
}
|
||||
|
||||
pipe.resize(pipe.numOutputPorts(), true, true);
|
||||
|
||||
|
@ -0,0 +1,4 @@
|
||||
2020-10-24 00:00:00 -93943555724557365
|
||||
-93943555724557365
|
||||
2020-10-24 00:00:00 -187887111449114730
|
||||
-187887111449114730
|
@ -0,0 +1,20 @@
|
||||
drop table if exists projection_test;
|
||||
|
||||
create table projection_test (dt DateTime, cost Int64, projection p (select toStartOfMinute(dt) dt_m, sum(cost) group by dt_m)) engine MergeTree partition by toDate(dt) order by dt;
|
||||
|
||||
insert into projection_test with rowNumberInAllBlocks() as id select toDateTime('2020-10-24 00:00:00') + (id / 20), * from generateRandom('cost Int64', 10, 10, 1) limit 1000 settings max_threads = 1;
|
||||
|
||||
set allow_experimental_projection_optimization = 1, force_optimize_projection = 1;
|
||||
|
||||
select toStartOfMinute(dt) dt_m, sum(cost) from projection_test group by dt_m;
|
||||
select sum(cost) from projection_test;
|
||||
|
||||
drop table if exists projection_test_d;
|
||||
|
||||
create table projection_test_d (dt DateTime, cost Int64) engine Distributed(test_cluster_two_shards, currentDatabase(), projection_test);
|
||||
|
||||
select toStartOfMinute(dt) dt_m, sum(cost) from projection_test_d group by dt_m;
|
||||
select sum(cost) from projection_test_d;
|
||||
|
||||
drop table projection_test;
|
||||
drop table projection_test_d;
|
Loading…
Reference in New Issue
Block a user