Aggregator parallel merge cancellation

This commit is contained in:
Maksim Kita 2024-03-27 20:14:52 +03:00
parent 010fd38b71
commit d8b06588b8
16 changed files with 122 additions and 63 deletions

View File

@ -457,9 +457,9 @@ public:
detail::Adder<T, Data>::add(this->data(place), columns, num_args, row_begin, row_end, flags, null_map);
}
bool isParallelizeMergePrepareNeeded() const override { return is_parallelize_merge_prepare_needed;}
bool isParallelizeMergePrepareNeeded() const override { return is_parallelize_merge_prepare_needed; }
void parallelizeMergePrepare(AggregateDataPtrs & places, ThreadPool & thread_pool) const override
void parallelizeMergePrepare(AggregateDataPtrs & places, ThreadPool & thread_pool, std::atomic<bool> * is_cancelled) const override
{
if constexpr (is_parallelize_merge_prepare_needed)
{
@ -469,7 +469,7 @@ public:
for (size_t i = 0; i < data_vec.size(); ++i)
data_vec[i] = &this->data(places[i]).set;
DataSet::parallelizeMergePrepare(data_vec, thread_pool);
DataSet::parallelizeMergePrepare(data_vec, thread_pool, is_cancelled);
}
else
{
@ -485,10 +485,10 @@ public:
bool isAbleToParallelizeMerge() const override { return is_able_to_parallelize_merge; }
bool canOptimizeEqualKeysRanges() const override { return !is_able_to_parallelize_merge; }
void merge(AggregateDataPtr __restrict place, ConstAggregateDataPtr rhs, ThreadPool & thread_pool, Arena *) const override
void merge(AggregateDataPtr __restrict place, ConstAggregateDataPtr rhs, ThreadPool & thread_pool, std::atomic<bool> * is_cancelled, Arena *) const override
{
if constexpr (is_able_to_parallelize_merge)
this->data(place).set.merge(this->data(rhs).set, &thread_pool);
this->data(place).set.merge(this->data(rhs).set, &thread_pool, is_cancelled);
else
this->data(place).set.merge(this->data(rhs).set);
}
@ -579,10 +579,10 @@ public:
bool isAbleToParallelizeMerge() const override { return is_able_to_parallelize_merge; }
bool canOptimizeEqualKeysRanges() const override { return !is_able_to_parallelize_merge; }
void merge(AggregateDataPtr __restrict place, ConstAggregateDataPtr rhs, ThreadPool & thread_pool, Arena *) const override
void merge(AggregateDataPtr __restrict place, ConstAggregateDataPtr rhs, ThreadPool & thread_pool, std::atomic<bool> * is_cancelled, Arena *) const override
{
if constexpr (is_able_to_parallelize_merge)
this->data(place).set.merge(this->data(rhs).set, &thread_pool);
this->data(place).set.merge(this->data(rhs).set, &thread_pool, is_cancelled);
else
this->data(place).set.merge(this->data(rhs).set);
}

View File

@ -144,9 +144,14 @@ public:
bool isAbleToParallelizeMerge() const override { return nested_func->isAbleToParallelizeMerge(); }
bool canOptimizeEqualKeysRanges() const override { return nested_func->canOptimizeEqualKeysRanges(); }
void merge(AggregateDataPtr __restrict place, ConstAggregateDataPtr rhs, ThreadPool & thread_pool, Arena * arena) const override
void parallelizeMergePrepare(AggregateDataPtrs & places, ThreadPool & thread_pool, std::atomic<bool> * is_cancelled) const override
{
nested_func->merge(place, rhs, thread_pool, arena);
nested_func->parallelizeMergePrepare(places, thread_pool, is_cancelled);
}
void merge(AggregateDataPtr __restrict place, ConstAggregateDataPtr rhs, ThreadPool & thread_pool, std::atomic<bool> * is_cancelled, Arena * arena) const override
{
nested_func->merge(place, rhs, thread_pool, is_cancelled, arena);
}
void serialize(ConstAggregateDataPtr __restrict place, WriteBuffer & buf, std::optional<size_t> version) const override

View File

@ -167,9 +167,14 @@ public:
bool isAbleToParallelizeMerge() const override { return nested_func->isAbleToParallelizeMerge(); }
bool canOptimizeEqualKeysRanges() const override { return nested_func->canOptimizeEqualKeysRanges(); }
void merge(AggregateDataPtr __restrict place, ConstAggregateDataPtr rhs, ThreadPool & thread_pool, Arena * arena) const override
void parallelizeMergePrepare(AggregateDataPtrs & places, ThreadPool & thread_pool, std::atomic<bool> * is_cancelled) const override
{
nested_func->merge(place, rhs, thread_pool, arena);
nested_func->parallelizeMergePrepare(places, thread_pool, is_cancelled);
}
void merge(AggregateDataPtr __restrict place, ConstAggregateDataPtr rhs, ThreadPool & thread_pool, std::atomic<bool> * is_cancelled, Arena * arena) const override
{
nested_func->merge(place, rhs, thread_pool, is_cancelled, arena);
}
void mergeBatch(

View File

@ -113,9 +113,14 @@ public:
bool isAbleToParallelizeMerge() const override { return nested_func->isAbleToParallelizeMerge(); }
bool canOptimizeEqualKeysRanges() const override { return nested_func->canOptimizeEqualKeysRanges(); }
void merge(AggregateDataPtr __restrict place, ConstAggregateDataPtr rhs, ThreadPool & thread_pool, Arena * arena) const override
void parallelizeMergePrepare(AggregateDataPtrs & places, ThreadPool & thread_pool, std::atomic<bool> * is_cancelled) const override
{
nested_func->merge(place, rhs, thread_pool, arena);
nested_func->parallelizeMergePrepare(places, thread_pool, is_cancelled);
}
void merge(AggregateDataPtr __restrict place, ConstAggregateDataPtr rhs, ThreadPool & thread_pool, std::atomic<bool> * is_cancelled, Arena * arena) const override
{
nested_func->merge(place, rhs, thread_pool, is_cancelled, arena);
}
void serialize(ConstAggregateDataPtr __restrict place, WriteBuffer & buf, std::optional<size_t> version) const override

View File

@ -154,9 +154,18 @@ public:
bool isAbleToParallelizeMerge() const override { return nested_function->isAbleToParallelizeMerge(); }
bool canOptimizeEqualKeysRanges() const override { return nested_function->canOptimizeEqualKeysRanges(); }
void merge(AggregateDataPtr __restrict place, ConstAggregateDataPtr rhs, ThreadPool & thread_pool, Arena * arena) const override
void parallelizeMergePrepare(AggregateDataPtrs & places, ThreadPool & thread_pool, std::atomic<bool> * is_cancelled) const override
{
nested_function->merge(nestedPlace(place), nestedPlace(rhs), thread_pool, arena);
AggregateDataPtrs nested_places(places.begin(), places.end());
for (auto & nested_place : nested_places)
nested_place = nestedPlace(nested_place);
nested_function->parallelizeMergePrepare(nested_places, thread_pool, is_cancelled);
}
void merge(AggregateDataPtr __restrict place, ConstAggregateDataPtr rhs, ThreadPool & thread_pool, std::atomic<bool> * is_cancelled, Arena * arena) const override
{
nested_function->merge(nestedPlace(place), nestedPlace(rhs), thread_pool, is_cancelled, arena);
}
void serialize(ConstAggregateDataPtr __restrict place, WriteBuffer & buf, std::optional<size_t> version) const override

View File

@ -94,9 +94,14 @@ public:
bool isAbleToParallelizeMerge() const override { return nested_func->isAbleToParallelizeMerge(); }
bool canOptimizeEqualKeysRanges() const override { return nested_func->canOptimizeEqualKeysRanges(); }
void merge(AggregateDataPtr __restrict place, ConstAggregateDataPtr rhs, ThreadPool & thread_pool, Arena * arena) const override
void parallelizeMergePrepare(AggregateDataPtrs & places, ThreadPool & thread_pool, std::atomic<bool> * is_cancelled) const override
{
nested_func->merge(place, rhs, thread_pool, arena);
nested_func->parallelizeMergePrepare(places, thread_pool, is_cancelled);
}
void merge(AggregateDataPtr __restrict place, ConstAggregateDataPtr rhs, ThreadPool & thread_pool, std::atomic<bool> * is_cancelled, Arena * arena) const override
{
nested_func->merge(place, rhs, thread_pool, is_cancelled, arena);
}
void serialize(ConstAggregateDataPtr __restrict place, WriteBuffer & buf, std::optional<size_t> version) const override

View File

@ -151,7 +151,7 @@ public:
virtual bool isParallelizeMergePrepareNeeded() const { return false; }
virtual void parallelizeMergePrepare(AggregateDataPtrs & /*places*/, ThreadPool & /*thread_pool*/) const
virtual void parallelizeMergePrepare(AggregateDataPtrs & /*places*/, ThreadPool & /*thread_pool*/, std::atomic<bool> * /*is_cancelled*/) const
{
throw Exception(ErrorCodes::NOT_IMPLEMENTED, "parallelizeMergePrepare() with thread pool parameter isn't implemented for {} ", getName());
}
@ -168,7 +168,7 @@ public:
/// Should be used only if isAbleToParallelizeMerge() returned true.
virtual void
merge(AggregateDataPtr __restrict /*place*/, ConstAggregateDataPtr /*rhs*/, ThreadPool & /*thread_pool*/, Arena * /*arena*/) const
merge(AggregateDataPtr __restrict /*place*/, ConstAggregateDataPtr /*rhs*/, ThreadPool & /*thread_pool*/, std::atomic<bool> * /*is_cancelled*/, Arena * /*arena*/) const
{
throw Exception(ErrorCodes::NOT_IMPLEMENTED, "merge() with thread pool parameter isn't implemented for {} ", getName());
}

View File

@ -37,7 +37,7 @@ public:
/// In merge, if one of the lhs and rhs is twolevelset and the other is singlelevelset, then the singlelevelset will need to convertToTwoLevel().
/// It's not in parallel and will cost extra large time if the thread_num is large.
/// This method will convert all the SingleLevelSet to TwoLevelSet in parallel if the hashsets are not all singlelevel or not all twolevel.
static void parallelizeMergePrepare(const std::vector<UniqExactSet *> & data_vec, ThreadPool & thread_pool)
static void parallelizeMergePrepare(const std::vector<UniqExactSet *> & data_vec, ThreadPool & thread_pool, std::atomic<bool> * is_cancelled)
{
UInt64 single_level_set_num = 0;
UInt64 all_single_hash_size = 0;
@ -63,7 +63,7 @@ public:
try
{
auto data_vec_atomic_index = std::make_shared<std::atomic_uint32_t>(0);
auto thread_func = [data_vec, data_vec_atomic_index, thread_group = CurrentThread::getGroup()]()
auto thread_func = [data_vec, data_vec_atomic_index, is_cancelled, thread_group = CurrentThread::getGroup()]()
{
SCOPE_EXIT_SAFE(
if (thread_group)
@ -76,6 +76,9 @@ public:
while (true)
{
if (is_cancelled && is_cancelled->load(std::memory_order_seq_cst))
return;
const auto i = data_vec_atomic_index->fetch_add(1);
if (i >= data_vec.size())
return;
@ -96,7 +99,7 @@ public:
}
}
auto merge(const UniqExactSet & other, ThreadPool * thread_pool = nullptr)
auto merge(const UniqExactSet & other, ThreadPool * thread_pool = nullptr, std::atomic<bool> * is_cancelled = nullptr)
{
if (isSingleLevel() && other.isTwoLevel())
convertToTwoLevel();
@ -113,7 +116,12 @@ public:
if (!thread_pool)
{
for (size_t i = 0; i < rhs.NUM_BUCKETS; ++i)
{
if (is_cancelled && is_cancelled->load(std::memory_order_seq_cst))
return;
lhs.impls[i].merge(rhs.impls[i]);
}
}
else
{
@ -121,7 +129,7 @@ public:
{
auto next_bucket_to_merge = std::make_shared<std::atomic_uint32_t>(0);
auto thread_func = [&lhs, &rhs, next_bucket_to_merge, thread_group = CurrentThread::getGroup()]()
auto thread_func = [&lhs, &rhs, next_bucket_to_merge, is_cancelled, thread_group = CurrentThread::getGroup()]()
{
SCOPE_EXIT_SAFE(
if (thread_group)
@ -133,6 +141,9 @@ public:
while (true)
{
if (is_cancelled && is_cancelled->load(std::memory_order_seq_cst))
return;
const auto bucket = next_bucket_to_merge->fetch_add(1);
if (bucket >= rhs.NUM_BUCKETS)
return;

View File

@ -1435,13 +1435,14 @@ void NO_INLINE Aggregator::mergeOnIntervalWithoutKey(
AggregatedDataVariants & data_variants,
size_t row_begin,
size_t row_end,
const AggregateColumnsConstData & aggregate_columns_data) const
const AggregateColumnsConstData & aggregate_columns_data,
std::atomic<bool> * is_cancelled) const
{
/// `data_variants` will destroy the states of aggregate functions in the destructor
data_variants.aggregator = this;
data_variants.init(AggregatedDataVariants::Type::without_key);
mergeWithoutKeyStreamsImpl(data_variants, row_begin, row_end, aggregate_columns_data);
mergeWithoutKeyStreamsImpl(data_variants, row_begin, row_end, aggregate_columns_data, is_cancelled);
}
@ -2636,7 +2637,8 @@ void NO_INLINE Aggregator::mergeDataOnlyExistingKeysImpl(
void NO_INLINE Aggregator::mergeWithoutKeyDataImpl(
ManyAggregatedDataVariants & non_empty_data) const
ManyAggregatedDataVariants & non_empty_data,
std::atomic<bool> * is_cancelled) const
{
ThreadPool thread_pool{CurrentMetrics::AggregatorThreads, CurrentMetrics::AggregatorThreadsActive, CurrentMetrics::AggregatorThreadsScheduled, params.max_threads};
@ -2652,7 +2654,7 @@ void NO_INLINE Aggregator::mergeWithoutKeyDataImpl(
for (size_t result_num = 0; result_num < size; ++result_num)
data_vec.emplace_back(non_empty_data[result_num]->without_key + offsets_of_aggregate_states[i]);
aggregate_functions[i]->parallelizeMergePrepare(data_vec, thread_pool);
aggregate_functions[i]->parallelizeMergePrepare(data_vec, thread_pool, is_cancelled);
}
}
@ -2668,6 +2670,7 @@ void NO_INLINE Aggregator::mergeWithoutKeyDataImpl(
res_data + offsets_of_aggregate_states[i],
current_data + offsets_of_aggregate_states[i],
thread_pool,
is_cancelled,
res->aggregates_pool);
else
aggregate_functions[i]->merge(
@ -2954,17 +2957,19 @@ void NO_INLINE Aggregator::mergeStreamsImpl(
void NO_INLINE Aggregator::mergeBlockWithoutKeyStreamsImpl(
Block block,
AggregatedDataVariants & result) const
AggregatedDataVariants & result,
std::atomic<bool> * is_cancelled) const
{
AggregateColumnsConstData aggregate_columns = params.makeAggregateColumnsData(block);
mergeWithoutKeyStreamsImpl(result, 0, block.rows(), aggregate_columns);
mergeWithoutKeyStreamsImpl(result, 0, block.rows(), aggregate_columns, is_cancelled);
}
void NO_INLINE Aggregator::mergeWithoutKeyStreamsImpl(
AggregatedDataVariants & result,
size_t row_begin,
size_t row_end,
const AggregateColumnsConstData & aggregate_columns_data) const
const AggregateColumnsConstData & aggregate_columns_data,
std::atomic<bool> * is_cancelled) const
{
using namespace CurrentMetrics;
@ -2986,12 +2991,12 @@ void NO_INLINE Aggregator::mergeWithoutKeyStreamsImpl(
if (aggregate_functions[i]->isParallelizeMergePrepareNeeded())
{
std::vector<AggregateDataPtr> data_vec{res + offsets_of_aggregate_states[i], (*aggregate_columns_data[i])[row]};
aggregate_functions[i]->parallelizeMergePrepare(data_vec, thread_pool);
aggregate_functions[i]->parallelizeMergePrepare(data_vec, thread_pool, is_cancelled);
}
if (aggregate_functions[i]->isAbleToParallelizeMerge())
aggregate_functions[i]->merge(
res + offsets_of_aggregate_states[i], (*aggregate_columns_data[i])[row], thread_pool, result.aggregates_pool);
res + offsets_of_aggregate_states[i], (*aggregate_columns_data[i])[row], thread_pool, is_cancelled, result.aggregates_pool);
else
aggregate_functions[i]->merge(
res + offsets_of_aggregate_states[i], (*aggregate_columns_data[i])[row], result.aggregates_pool);
@ -3000,7 +3005,7 @@ void NO_INLINE Aggregator::mergeWithoutKeyStreamsImpl(
}
bool Aggregator::mergeOnBlock(Block block, AggregatedDataVariants & result, bool & no_more_keys) const
bool Aggregator::mergeOnBlock(Block block, AggregatedDataVariants & result, bool & no_more_keys, std::atomic<bool> * is_cancelled) const
{
/// `result` will destroy the states of aggregate functions in the destructor
result.aggregator = this;
@ -3022,7 +3027,7 @@ bool Aggregator::mergeOnBlock(Block block, AggregatedDataVariants & result, bool
}
if (result.type == AggregatedDataVariants::Type::without_key || block.info.is_overflows)
mergeBlockWithoutKeyStreamsImpl(std::move(block), result);
mergeBlockWithoutKeyStreamsImpl(std::move(block), result, is_cancelled);
#define M(NAME, IS_TWO_LEVEL) \
else if (result.type == AggregatedDataVariants::Type::NAME) \
mergeStreamsImpl(std::move(block), result.aggregates_pool, *result.NAME, result.NAME->data, result.without_key, result.consecutive_keys_cache_stats, no_more_keys);
@ -3070,7 +3075,7 @@ bool Aggregator::mergeOnBlock(Block block, AggregatedDataVariants & result, bool
}
void Aggregator::mergeBlocks(BucketToBlocks bucket_to_blocks, AggregatedDataVariants & result, size_t max_threads)
void Aggregator::mergeBlocks(BucketToBlocks bucket_to_blocks, AggregatedDataVariants & result, size_t max_threads, std::atomic<bool> * is_cancelled)
{
if (bucket_to_blocks.empty())
return;
@ -3183,7 +3188,7 @@ void Aggregator::mergeBlocks(BucketToBlocks bucket_to_blocks, AggregatedDataVari
break;
if (result.type == AggregatedDataVariants::Type::without_key || block.info.is_overflows)
mergeBlockWithoutKeyStreamsImpl(std::move(block), result);
mergeBlockWithoutKeyStreamsImpl(std::move(block), result, is_cancelled);
#define M(NAME, IS_TWO_LEVEL) \
else if (result.type == AggregatedDataVariants::Type::NAME) \
@ -3202,7 +3207,7 @@ void Aggregator::mergeBlocks(BucketToBlocks bucket_to_blocks, AggregatedDataVari
}
Block Aggregator::mergeBlocks(BlocksList & blocks, bool final)
Block Aggregator::mergeBlocks(BlocksList & blocks, bool final, std::atomic<bool> * is_cancelled)
{
if (blocks.empty())
return {};
@ -3264,7 +3269,7 @@ Block Aggregator::mergeBlocks(BlocksList & blocks, bool final)
bucket_num = -1;
if (result.type == AggregatedDataVariants::Type::without_key || is_overflows)
mergeBlockWithoutKeyStreamsImpl(std::move(block), result);
mergeBlockWithoutKeyStreamsImpl(std::move(block), result, is_cancelled);
#define M(NAME, IS_TWO_LEVEL) \
else if (result.type == AggregatedDataVariants::Type::NAME) \

View File

@ -266,7 +266,10 @@ public:
AggregateFunctionInstruction * aggregate_instructions) const;
/// Used for aggregate projection.
bool mergeOnBlock(Block block, AggregatedDataVariants & result, bool & no_more_keys) const;
bool mergeOnBlock(Block block,
AggregatedDataVariants & result,
bool & no_more_keys,
std::atomic<bool> * is_cancelled) const;
void mergeOnBlockSmall(
AggregatedDataVariants & result,
@ -279,7 +282,8 @@ public:
AggregatedDataVariants & data_variants,
size_t row_begin,
size_t row_end,
const AggregateColumnsConstData & aggregate_columns_data) const;
const AggregateColumnsConstData & aggregate_columns_data,
std::atomic<bool> * is_cancelled) const;
/** Convert the aggregation data structure into a block.
* If overflow_row = true, then aggregates for rows that are not included in max_rows_to_group_by are put in the first block.
@ -294,13 +298,13 @@ public:
using BucketToBlocks = std::map<Int32, BlocksList>;
/// Merge partially aggregated blocks separated to buckets into one data structure.
void mergeBlocks(BucketToBlocks bucket_to_blocks, AggregatedDataVariants & result, size_t max_threads);
void mergeBlocks(BucketToBlocks bucket_to_blocks, AggregatedDataVariants & result, size_t max_threads, std::atomic<bool> * is_cancelled);
/// Merge several partially aggregated blocks into one.
/// Precondition: for all blocks block.info.is_overflows flag must be the same.
/// (either all blocks are from overflow data or none blocks are).
/// The resulting block has the same value of is_overflows flag.
Block mergeBlocks(BlocksList & blocks, bool final);
Block mergeBlocks(BlocksList & blocks, bool final, std::atomic<bool> * is_cancelled);
/** Split block with partially-aggregated data to many blocks, as if two-level method of aggregation was used.
* This is needed to simplify merging of that data with other results, that are already two-level.
@ -486,7 +490,8 @@ private:
Arena * arena) const;
void mergeWithoutKeyDataImpl(
ManyAggregatedDataVariants & non_empty_data) const;
ManyAggregatedDataVariants & non_empty_data,
std::atomic<bool> * is_cancelled) const;
template <typename Method>
void mergeSingleLevelDataImpl(
@ -597,13 +602,15 @@ private:
void mergeBlockWithoutKeyStreamsImpl(
Block block,
AggregatedDataVariants & result) const;
AggregatedDataVariants & result,
std::atomic<bool> * is_cancelled) const;
void mergeWithoutKeyStreamsImpl(
AggregatedDataVariants & result,
size_t row_begin,
size_t row_end,
const AggregateColumnsConstData & aggregate_columns_data) const;
const AggregateColumnsConstData & aggregate_columns_data,
std::atomic<bool> * is_cancelled) const;
template <typename Method>
void mergeBucketImpl(

View File

@ -369,6 +369,8 @@ public:
protected:
virtual void onCancel() {}
std::atomic<bool> is_cancelled{false};
private:
/// For:
/// - elapsed_us
@ -378,8 +380,6 @@ private:
/// - output_wait_elapsed_us
friend class ExecutingGraph;
std::atomic<bool> is_cancelled{false};
std::string processor_description;
/// For processors_profile_log

View File

@ -160,7 +160,7 @@ void AggregatingInOrderTransform::consume(Chunk chunk)
if (group_by_key)
params->aggregator.mergeOnBlockSmall(variants, key_begin, key_end, aggregate_columns_data, key_columns_raw);
else
params->aggregator.mergeOnIntervalWithoutKey(variants, key_begin, key_end, aggregate_columns_data);
params->aggregator.mergeOnIntervalWithoutKey(variants, key_begin, key_end, aggregate_columns_data, &is_cancelled);
}
else
{

View File

@ -285,7 +285,11 @@ class ConvertingAggregatedToChunksTransform : public IProcessor
{
public:
ConvertingAggregatedToChunksTransform(AggregatingTransformParamsPtr params_, ManyAggregatedDataVariantsPtr data_, size_t num_threads_)
: IProcessor({}, {params_->getHeader()}), params(std::move(params_)), data(std::move(data_)), num_threads(num_threads_)
: IProcessor({}, {params_->getHeader()})
, params(std::move(params_))
, data(std::move(data_))
, shared_data(std::make_shared<ConvertingAggregatedToChunksWithMergingSource::SharedData>())
, num_threads(num_threads_)
{
}
@ -346,8 +350,7 @@ public:
for (auto & input : inputs)
input.close();
if (shared_data)
shared_data->is_cancelled.store(true);
shared_data->is_cancelled.store(true);
return Status::Finished;
}
@ -372,6 +375,11 @@ public:
return prepareTwoLevel();
}
void onCancel() override
{
shared_data->is_cancelled.store(true, std::memory_order_seq_cst);
}
private:
IProcessor::Status preparePushToOutput()
{
@ -464,7 +472,7 @@ private:
if (first->type == AggregatedDataVariants::Type::without_key || params->params.overflow_row)
{
params->aggregator.mergeWithoutKeyDataImpl(*data);
params->aggregator.mergeWithoutKeyDataImpl(*data, &shared_data->is_cancelled);
auto block = params->aggregator.prepareBlockAndFillWithoutKey(
*first, params->final, first->type != AggregatedDataVariants::Type::without_key);
@ -506,7 +514,7 @@ private:
void createSources()
{
AggregatedDataVariantsPtr & first = data->at(0);
shared_data = std::make_shared<ConvertingAggregatedToChunksWithMergingSource::SharedData>();
processors.reserve(num_threads);
for (size_t thread = 0; thread < num_threads; ++thread)
{
@ -684,7 +692,7 @@ void AggregatingTransform::consume(Chunk chunk)
{
auto block = getInputs().front().getHeader().cloneWithColumns(chunk.detachColumns());
block = materializeBlock(block);
if (!params->aggregator.mergeOnBlock(block, variants, no_more_keys))
if (!params->aggregator.mergeOnBlock(block, variants, no_more_keys, &is_cancelled))
is_consume_finished = true;
}
else
@ -704,7 +712,7 @@ void AggregatingTransform::initGenerate()
if (variants.empty() && params->params.keys_size == 0 && !params->params.empty_result_for_aggregation_by_empty_set)
{
if (params->params.only_merge)
params->aggregator.mergeOnBlock(getInputs().front().getHeader(), variants, no_more_keys);
params->aggregator.mergeOnBlock(getInputs().front().getHeader(), variants, no_more_keys, &is_cancelled);
else
params->aggregator.executeOnBlock(getInputs().front().getHeader(), variants, key_columns, aggregate_columns, no_more_keys);
}

View File

@ -363,7 +363,7 @@ void MergingAggregatedBucketTransform::transform(Chunk & chunk)
res_info->chunk_num = chunks_to_merge->chunk_num;
chunk.setChunkInfo(std::move(res_info));
auto block = params->aggregator.mergeBlocks(blocks_list, params->final);
auto block = params->aggregator.mergeBlocks(blocks_list, params->final, &is_cancelled);
if (!required_sort_description.empty())
sortBlock(block, required_sort_description);

View File

@ -39,11 +39,10 @@ void MergingAggregatedTransform::consume(Chunk chunk)
if (const auto * agg_info = typeid_cast<const AggregatedChunkInfo *>(info.get()))
{
/** If the remote servers used a two-level aggregation method,
* then blocks will contain information about the number of the bucket.
* Then the calculations can be parallelized by buckets.
* We decompose the blocks to the bucket numbers indicated in them.
*/
* then blocks will contain information about the number of the bucket.
* Then the calculations can be parallelized by buckets.
* We decompose the blocks to the bucket numbers indicated in them.
*/
auto block = getInputPort().getHeader().cloneWithColumns(chunk.getColumns());
block.info.is_overflows = agg_info->is_overflows;
block.info.bucket_num = agg_info->bucket_num;
@ -73,7 +72,7 @@ Chunk MergingAggregatedTransform::generate()
next_block = blocks.begin();
/// TODO: this operation can be made async. Add async for IAccumulatingTransform.
params->aggregator.mergeBlocks(std::move(bucket_to_blocks), data_variants, max_threads);
params->aggregator.mergeBlocks(std::move(bucket_to_blocks), data_variants, max_threads, &is_cancelled);
blocks = params->aggregator.convertToBlocks(data_variants, params->final, max_threads);
next_block = blocks.begin();
}

View File

@ -57,7 +57,7 @@ Chunk GroupByModifierTransform::merge(Chunks && chunks, bool is_input, bool fina
for (auto & chunk : chunks)
blocks.emplace_back(header.cloneWithColumns(chunk.detachColumns()));
auto current_block = is_input ? params->aggregator.mergeBlocks(blocks, final) : output_aggregator->mergeBlocks(blocks, final);
auto current_block = is_input ? params->aggregator.mergeBlocks(blocks, final, &is_cancelled) : output_aggregator->mergeBlocks(blocks, final, &is_cancelled);
auto num_rows = current_block.rows();
return Chunk(current_block.getColumns(), num_rows);
}