diff --git a/src/AggregateFunctions/AggregateFunctionUniq.h b/src/AggregateFunctions/AggregateFunctionUniq.h index 891f2ac4284..c53b5e3bdb7 100644 --- a/src/AggregateFunctions/AggregateFunctionUniq.h +++ b/src/AggregateFunctions/AggregateFunctionUniq.h @@ -457,9 +457,9 @@ public: detail::Adder::add(this->data(place), columns, num_args, row_begin, row_end, flags, null_map); } - bool isParallelizeMergePrepareNeeded() const override { return is_parallelize_merge_prepare_needed;} + bool isParallelizeMergePrepareNeeded() const override { return is_parallelize_merge_prepare_needed; } - void parallelizeMergePrepare(AggregateDataPtrs & places, ThreadPool & thread_pool) const override + void parallelizeMergePrepare(AggregateDataPtrs & places, ThreadPool & thread_pool, std::atomic * is_cancelled) const override { if constexpr (is_parallelize_merge_prepare_needed) { @@ -469,7 +469,7 @@ public: for (size_t i = 0; i < data_vec.size(); ++i) data_vec[i] = &this->data(places[i]).set; - DataSet::parallelizeMergePrepare(data_vec, thread_pool); + DataSet::parallelizeMergePrepare(data_vec, thread_pool, is_cancelled); } else { @@ -485,10 +485,10 @@ public: bool isAbleToParallelizeMerge() const override { return is_able_to_parallelize_merge; } bool canOptimizeEqualKeysRanges() const override { return !is_able_to_parallelize_merge; } - void merge(AggregateDataPtr __restrict place, ConstAggregateDataPtr rhs, ThreadPool & thread_pool, Arena *) const override + void merge(AggregateDataPtr __restrict place, ConstAggregateDataPtr rhs, ThreadPool & thread_pool, std::atomic * is_cancelled, Arena *) const override { if constexpr (is_able_to_parallelize_merge) - this->data(place).set.merge(this->data(rhs).set, &thread_pool); + this->data(place).set.merge(this->data(rhs).set, &thread_pool, is_cancelled); else this->data(place).set.merge(this->data(rhs).set); } @@ -579,10 +579,10 @@ public: bool isAbleToParallelizeMerge() const override { return is_able_to_parallelize_merge; } bool canOptimizeEqualKeysRanges() const override { return !is_able_to_parallelize_merge; } - void merge(AggregateDataPtr __restrict place, ConstAggregateDataPtr rhs, ThreadPool & thread_pool, Arena *) const override + void merge(AggregateDataPtr __restrict place, ConstAggregateDataPtr rhs, ThreadPool & thread_pool, std::atomic * is_cancelled, Arena *) const override { if constexpr (is_able_to_parallelize_merge) - this->data(place).set.merge(this->data(rhs).set, &thread_pool); + this->data(place).set.merge(this->data(rhs).set, &thread_pool, is_cancelled); else this->data(place).set.merge(this->data(rhs).set); } diff --git a/src/AggregateFunctions/Combinators/AggregateFunctionArray.h b/src/AggregateFunctions/Combinators/AggregateFunctionArray.h index 6b918926d0d..9dc5e274dab 100644 --- a/src/AggregateFunctions/Combinators/AggregateFunctionArray.h +++ b/src/AggregateFunctions/Combinators/AggregateFunctionArray.h @@ -144,9 +144,14 @@ public: bool isAbleToParallelizeMerge() const override { return nested_func->isAbleToParallelizeMerge(); } bool canOptimizeEqualKeysRanges() const override { return nested_func->canOptimizeEqualKeysRanges(); } - void merge(AggregateDataPtr __restrict place, ConstAggregateDataPtr rhs, ThreadPool & thread_pool, Arena * arena) const override + void parallelizeMergePrepare(AggregateDataPtrs & places, ThreadPool & thread_pool, std::atomic * is_cancelled) const override { - nested_func->merge(place, rhs, thread_pool, arena); + nested_func->parallelizeMergePrepare(places, thread_pool, is_cancelled); + } + + void merge(AggregateDataPtr __restrict place, ConstAggregateDataPtr rhs, ThreadPool & thread_pool, std::atomic * is_cancelled, Arena * arena) const override + { + nested_func->merge(place, rhs, thread_pool, is_cancelled, arena); } void serialize(ConstAggregateDataPtr __restrict place, WriteBuffer & buf, std::optional version) const override diff --git a/src/AggregateFunctions/Combinators/AggregateFunctionIf.h b/src/AggregateFunctions/Combinators/AggregateFunctionIf.h index df23398a10d..91dcfa4db0b 100644 --- a/src/AggregateFunctions/Combinators/AggregateFunctionIf.h +++ b/src/AggregateFunctions/Combinators/AggregateFunctionIf.h @@ -167,9 +167,14 @@ public: bool isAbleToParallelizeMerge() const override { return nested_func->isAbleToParallelizeMerge(); } bool canOptimizeEqualKeysRanges() const override { return nested_func->canOptimizeEqualKeysRanges(); } - void merge(AggregateDataPtr __restrict place, ConstAggregateDataPtr rhs, ThreadPool & thread_pool, Arena * arena) const override + void parallelizeMergePrepare(AggregateDataPtrs & places, ThreadPool & thread_pool, std::atomic * is_cancelled) const override { - nested_func->merge(place, rhs, thread_pool, arena); + nested_func->parallelizeMergePrepare(places, thread_pool, is_cancelled); + } + + void merge(AggregateDataPtr __restrict place, ConstAggregateDataPtr rhs, ThreadPool & thread_pool, std::atomic * is_cancelled, Arena * arena) const override + { + nested_func->merge(place, rhs, thread_pool, is_cancelled, arena); } void mergeBatch( diff --git a/src/AggregateFunctions/Combinators/AggregateFunctionMerge.h b/src/AggregateFunctions/Combinators/AggregateFunctionMerge.h index 53c24bd60c1..5bc478116e0 100644 --- a/src/AggregateFunctions/Combinators/AggregateFunctionMerge.h +++ b/src/AggregateFunctions/Combinators/AggregateFunctionMerge.h @@ -113,9 +113,14 @@ public: bool isAbleToParallelizeMerge() const override { return nested_func->isAbleToParallelizeMerge(); } bool canOptimizeEqualKeysRanges() const override { return nested_func->canOptimizeEqualKeysRanges(); } - void merge(AggregateDataPtr __restrict place, ConstAggregateDataPtr rhs, ThreadPool & thread_pool, Arena * arena) const override + void parallelizeMergePrepare(AggregateDataPtrs & places, ThreadPool & thread_pool, std::atomic * is_cancelled) const override { - nested_func->merge(place, rhs, thread_pool, arena); + nested_func->parallelizeMergePrepare(places, thread_pool, is_cancelled); + } + + void merge(AggregateDataPtr __restrict place, ConstAggregateDataPtr rhs, ThreadPool & thread_pool, std::atomic * is_cancelled, Arena * arena) const override + { + nested_func->merge(place, rhs, thread_pool, is_cancelled, arena); } void serialize(ConstAggregateDataPtr __restrict place, WriteBuffer & buf, std::optional version) const override diff --git a/src/AggregateFunctions/Combinators/AggregateFunctionNull.h b/src/AggregateFunctions/Combinators/AggregateFunctionNull.h index 72ab3cf5acb..eef5f8bf66b 100644 --- a/src/AggregateFunctions/Combinators/AggregateFunctionNull.h +++ b/src/AggregateFunctions/Combinators/AggregateFunctionNull.h @@ -154,9 +154,18 @@ public: bool isAbleToParallelizeMerge() const override { return nested_function->isAbleToParallelizeMerge(); } bool canOptimizeEqualKeysRanges() const override { return nested_function->canOptimizeEqualKeysRanges(); } - void merge(AggregateDataPtr __restrict place, ConstAggregateDataPtr rhs, ThreadPool & thread_pool, Arena * arena) const override + void parallelizeMergePrepare(AggregateDataPtrs & places, ThreadPool & thread_pool, std::atomic * is_cancelled) const override { - nested_function->merge(nestedPlace(place), nestedPlace(rhs), thread_pool, arena); + AggregateDataPtrs nested_places(places.begin(), places.end()); + for (auto & nested_place : nested_places) + nested_place = nestedPlace(nested_place); + + nested_function->parallelizeMergePrepare(nested_places, thread_pool, is_cancelled); + } + + void merge(AggregateDataPtr __restrict place, ConstAggregateDataPtr rhs, ThreadPool & thread_pool, std::atomic * is_cancelled, Arena * arena) const override + { + nested_function->merge(nestedPlace(place), nestedPlace(rhs), thread_pool, is_cancelled, arena); } void serialize(ConstAggregateDataPtr __restrict place, WriteBuffer & buf, std::optional version) const override diff --git a/src/AggregateFunctions/Combinators/AggregateFunctionState.h b/src/AggregateFunctions/Combinators/AggregateFunctionState.h index b0ab6d49604..7b2933d42c9 100644 --- a/src/AggregateFunctions/Combinators/AggregateFunctionState.h +++ b/src/AggregateFunctions/Combinators/AggregateFunctionState.h @@ -94,9 +94,14 @@ public: bool isAbleToParallelizeMerge() const override { return nested_func->isAbleToParallelizeMerge(); } bool canOptimizeEqualKeysRanges() const override { return nested_func->canOptimizeEqualKeysRanges(); } - void merge(AggregateDataPtr __restrict place, ConstAggregateDataPtr rhs, ThreadPool & thread_pool, Arena * arena) const override + void parallelizeMergePrepare(AggregateDataPtrs & places, ThreadPool & thread_pool, std::atomic * is_cancelled) const override { - nested_func->merge(place, rhs, thread_pool, arena); + nested_func->parallelizeMergePrepare(places, thread_pool, is_cancelled); + } + + void merge(AggregateDataPtr __restrict place, ConstAggregateDataPtr rhs, ThreadPool & thread_pool, std::atomic * is_cancelled, Arena * arena) const override + { + nested_func->merge(place, rhs, thread_pool, is_cancelled, arena); } void serialize(ConstAggregateDataPtr __restrict place, WriteBuffer & buf, std::optional version) const override diff --git a/src/AggregateFunctions/IAggregateFunction.h b/src/AggregateFunctions/IAggregateFunction.h index 499185320e6..b33d4b20a16 100644 --- a/src/AggregateFunctions/IAggregateFunction.h +++ b/src/AggregateFunctions/IAggregateFunction.h @@ -151,7 +151,7 @@ public: virtual bool isParallelizeMergePrepareNeeded() const { return false; } - virtual void parallelizeMergePrepare(AggregateDataPtrs & /*places*/, ThreadPool & /*thread_pool*/) const + virtual void parallelizeMergePrepare(AggregateDataPtrs & /*places*/, ThreadPool & /*thread_pool*/, std::atomic * /*is_cancelled*/) const { throw Exception(ErrorCodes::NOT_IMPLEMENTED, "parallelizeMergePrepare() with thread pool parameter isn't implemented for {} ", getName()); } @@ -168,7 +168,7 @@ public: /// Should be used only if isAbleToParallelizeMerge() returned true. virtual void - merge(AggregateDataPtr __restrict /*place*/, ConstAggregateDataPtr /*rhs*/, ThreadPool & /*thread_pool*/, Arena * /*arena*/) const + merge(AggregateDataPtr __restrict /*place*/, ConstAggregateDataPtr /*rhs*/, ThreadPool & /*thread_pool*/, std::atomic * /*is_cancelled*/, Arena * /*arena*/) const { throw Exception(ErrorCodes::NOT_IMPLEMENTED, "merge() with thread pool parameter isn't implemented for {} ", getName()); } diff --git a/src/AggregateFunctions/UniqExactSet.h b/src/AggregateFunctions/UniqExactSet.h index 131c59b9ed6..18b0b830cb8 100644 --- a/src/AggregateFunctions/UniqExactSet.h +++ b/src/AggregateFunctions/UniqExactSet.h @@ -37,7 +37,7 @@ public: /// In merge, if one of the lhs and rhs is twolevelset and the other is singlelevelset, then the singlelevelset will need to convertToTwoLevel(). /// It's not in parallel and will cost extra large time if the thread_num is large. /// This method will convert all the SingleLevelSet to TwoLevelSet in parallel if the hashsets are not all singlelevel or not all twolevel. - static void parallelizeMergePrepare(const std::vector & data_vec, ThreadPool & thread_pool) + static void parallelizeMergePrepare(const std::vector & data_vec, ThreadPool & thread_pool, std::atomic * is_cancelled) { UInt64 single_level_set_num = 0; UInt64 all_single_hash_size = 0; @@ -63,7 +63,7 @@ public: try { auto data_vec_atomic_index = std::make_shared(0); - auto thread_func = [data_vec, data_vec_atomic_index, thread_group = CurrentThread::getGroup()]() + auto thread_func = [data_vec, data_vec_atomic_index, is_cancelled, thread_group = CurrentThread::getGroup()]() { SCOPE_EXIT_SAFE( if (thread_group) @@ -76,6 +76,9 @@ public: while (true) { + if (is_cancelled && is_cancelled->load(std::memory_order_seq_cst)) + return; + const auto i = data_vec_atomic_index->fetch_add(1); if (i >= data_vec.size()) return; @@ -96,7 +99,7 @@ public: } } - auto merge(const UniqExactSet & other, ThreadPool * thread_pool = nullptr) + auto merge(const UniqExactSet & other, ThreadPool * thread_pool = nullptr, std::atomic * is_cancelled = nullptr) { if (isSingleLevel() && other.isTwoLevel()) convertToTwoLevel(); @@ -113,7 +116,12 @@ public: if (!thread_pool) { for (size_t i = 0; i < rhs.NUM_BUCKETS; ++i) + { + if (is_cancelled && is_cancelled->load(std::memory_order_seq_cst)) + return; + lhs.impls[i].merge(rhs.impls[i]); + } } else { @@ -121,7 +129,7 @@ public: { auto next_bucket_to_merge = std::make_shared(0); - auto thread_func = [&lhs, &rhs, next_bucket_to_merge, thread_group = CurrentThread::getGroup()]() + auto thread_func = [&lhs, &rhs, next_bucket_to_merge, is_cancelled, thread_group = CurrentThread::getGroup()]() { SCOPE_EXIT_SAFE( if (thread_group) @@ -133,6 +141,9 @@ public: while (true) { + if (is_cancelled && is_cancelled->load(std::memory_order_seq_cst)) + return; + const auto bucket = next_bucket_to_merge->fetch_add(1); if (bucket >= rhs.NUM_BUCKETS) return; diff --git a/src/Interpreters/Aggregator.cpp b/src/Interpreters/Aggregator.cpp index 7f3b961a598..837b4e47fba 100644 --- a/src/Interpreters/Aggregator.cpp +++ b/src/Interpreters/Aggregator.cpp @@ -1435,13 +1435,14 @@ void NO_INLINE Aggregator::mergeOnIntervalWithoutKey( AggregatedDataVariants & data_variants, size_t row_begin, size_t row_end, - const AggregateColumnsConstData & aggregate_columns_data) const + const AggregateColumnsConstData & aggregate_columns_data, + std::atomic * is_cancelled) const { /// `data_variants` will destroy the states of aggregate functions in the destructor data_variants.aggregator = this; data_variants.init(AggregatedDataVariants::Type::without_key); - mergeWithoutKeyStreamsImpl(data_variants, row_begin, row_end, aggregate_columns_data); + mergeWithoutKeyStreamsImpl(data_variants, row_begin, row_end, aggregate_columns_data, is_cancelled); } @@ -2636,7 +2637,8 @@ void NO_INLINE Aggregator::mergeDataOnlyExistingKeysImpl( void NO_INLINE Aggregator::mergeWithoutKeyDataImpl( - ManyAggregatedDataVariants & non_empty_data) const + ManyAggregatedDataVariants & non_empty_data, + std::atomic * is_cancelled) const { ThreadPool thread_pool{CurrentMetrics::AggregatorThreads, CurrentMetrics::AggregatorThreadsActive, CurrentMetrics::AggregatorThreadsScheduled, params.max_threads}; @@ -2652,7 +2654,7 @@ void NO_INLINE Aggregator::mergeWithoutKeyDataImpl( for (size_t result_num = 0; result_num < size; ++result_num) data_vec.emplace_back(non_empty_data[result_num]->without_key + offsets_of_aggregate_states[i]); - aggregate_functions[i]->parallelizeMergePrepare(data_vec, thread_pool); + aggregate_functions[i]->parallelizeMergePrepare(data_vec, thread_pool, is_cancelled); } } @@ -2668,6 +2670,7 @@ void NO_INLINE Aggregator::mergeWithoutKeyDataImpl( res_data + offsets_of_aggregate_states[i], current_data + offsets_of_aggregate_states[i], thread_pool, + is_cancelled, res->aggregates_pool); else aggregate_functions[i]->merge( @@ -2954,17 +2957,19 @@ void NO_INLINE Aggregator::mergeStreamsImpl( void NO_INLINE Aggregator::mergeBlockWithoutKeyStreamsImpl( Block block, - AggregatedDataVariants & result) const + AggregatedDataVariants & result, + std::atomic * is_cancelled) const { AggregateColumnsConstData aggregate_columns = params.makeAggregateColumnsData(block); - mergeWithoutKeyStreamsImpl(result, 0, block.rows(), aggregate_columns); + mergeWithoutKeyStreamsImpl(result, 0, block.rows(), aggregate_columns, is_cancelled); } void NO_INLINE Aggregator::mergeWithoutKeyStreamsImpl( AggregatedDataVariants & result, size_t row_begin, size_t row_end, - const AggregateColumnsConstData & aggregate_columns_data) const + const AggregateColumnsConstData & aggregate_columns_data, + std::atomic * is_cancelled) const { using namespace CurrentMetrics; @@ -2986,12 +2991,12 @@ void NO_INLINE Aggregator::mergeWithoutKeyStreamsImpl( if (aggregate_functions[i]->isParallelizeMergePrepareNeeded()) { std::vector data_vec{res + offsets_of_aggregate_states[i], (*aggregate_columns_data[i])[row]}; - aggregate_functions[i]->parallelizeMergePrepare(data_vec, thread_pool); + aggregate_functions[i]->parallelizeMergePrepare(data_vec, thread_pool, is_cancelled); } if (aggregate_functions[i]->isAbleToParallelizeMerge()) aggregate_functions[i]->merge( - res + offsets_of_aggregate_states[i], (*aggregate_columns_data[i])[row], thread_pool, result.aggregates_pool); + res + offsets_of_aggregate_states[i], (*aggregate_columns_data[i])[row], thread_pool, is_cancelled, result.aggregates_pool); else aggregate_functions[i]->merge( res + offsets_of_aggregate_states[i], (*aggregate_columns_data[i])[row], result.aggregates_pool); @@ -3000,7 +3005,7 @@ void NO_INLINE Aggregator::mergeWithoutKeyStreamsImpl( } -bool Aggregator::mergeOnBlock(Block block, AggregatedDataVariants & result, bool & no_more_keys) const +bool Aggregator::mergeOnBlock(Block block, AggregatedDataVariants & result, bool & no_more_keys, std::atomic * is_cancelled) const { /// `result` will destroy the states of aggregate functions in the destructor result.aggregator = this; @@ -3022,7 +3027,7 @@ bool Aggregator::mergeOnBlock(Block block, AggregatedDataVariants & result, bool } if (result.type == AggregatedDataVariants::Type::without_key || block.info.is_overflows) - mergeBlockWithoutKeyStreamsImpl(std::move(block), result); + mergeBlockWithoutKeyStreamsImpl(std::move(block), result, is_cancelled); #define M(NAME, IS_TWO_LEVEL) \ else if (result.type == AggregatedDataVariants::Type::NAME) \ mergeStreamsImpl(std::move(block), result.aggregates_pool, *result.NAME, result.NAME->data, result.without_key, result.consecutive_keys_cache_stats, no_more_keys); @@ -3070,7 +3075,7 @@ bool Aggregator::mergeOnBlock(Block block, AggregatedDataVariants & result, bool } -void Aggregator::mergeBlocks(BucketToBlocks bucket_to_blocks, AggregatedDataVariants & result, size_t max_threads) +void Aggregator::mergeBlocks(BucketToBlocks bucket_to_blocks, AggregatedDataVariants & result, size_t max_threads, std::atomic * is_cancelled) { if (bucket_to_blocks.empty()) return; @@ -3183,7 +3188,7 @@ void Aggregator::mergeBlocks(BucketToBlocks bucket_to_blocks, AggregatedDataVari break; if (result.type == AggregatedDataVariants::Type::without_key || block.info.is_overflows) - mergeBlockWithoutKeyStreamsImpl(std::move(block), result); + mergeBlockWithoutKeyStreamsImpl(std::move(block), result, is_cancelled); #define M(NAME, IS_TWO_LEVEL) \ else if (result.type == AggregatedDataVariants::Type::NAME) \ @@ -3202,7 +3207,7 @@ void Aggregator::mergeBlocks(BucketToBlocks bucket_to_blocks, AggregatedDataVari } -Block Aggregator::mergeBlocks(BlocksList & blocks, bool final) +Block Aggregator::mergeBlocks(BlocksList & blocks, bool final, std::atomic * is_cancelled) { if (blocks.empty()) return {}; @@ -3264,7 +3269,7 @@ Block Aggregator::mergeBlocks(BlocksList & blocks, bool final) bucket_num = -1; if (result.type == AggregatedDataVariants::Type::without_key || is_overflows) - mergeBlockWithoutKeyStreamsImpl(std::move(block), result); + mergeBlockWithoutKeyStreamsImpl(std::move(block), result, is_cancelled); #define M(NAME, IS_TWO_LEVEL) \ else if (result.type == AggregatedDataVariants::Type::NAME) \ diff --git a/src/Interpreters/Aggregator.h b/src/Interpreters/Aggregator.h index e339047063c..4bce700a099 100644 --- a/src/Interpreters/Aggregator.h +++ b/src/Interpreters/Aggregator.h @@ -266,7 +266,10 @@ public: AggregateFunctionInstruction * aggregate_instructions) const; /// Used for aggregate projection. - bool mergeOnBlock(Block block, AggregatedDataVariants & result, bool & no_more_keys) const; + bool mergeOnBlock(Block block, + AggregatedDataVariants & result, + bool & no_more_keys, + std::atomic * is_cancelled) const; void mergeOnBlockSmall( AggregatedDataVariants & result, @@ -279,7 +282,8 @@ public: AggregatedDataVariants & data_variants, size_t row_begin, size_t row_end, - const AggregateColumnsConstData & aggregate_columns_data) const; + const AggregateColumnsConstData & aggregate_columns_data, + std::atomic * is_cancelled) const; /** Convert the aggregation data structure into a block. * If overflow_row = true, then aggregates for rows that are not included in max_rows_to_group_by are put in the first block. @@ -294,13 +298,13 @@ public: using BucketToBlocks = std::map; /// Merge partially aggregated blocks separated to buckets into one data structure. - void mergeBlocks(BucketToBlocks bucket_to_blocks, AggregatedDataVariants & result, size_t max_threads); + void mergeBlocks(BucketToBlocks bucket_to_blocks, AggregatedDataVariants & result, size_t max_threads, std::atomic * is_cancelled); /// Merge several partially aggregated blocks into one. /// Precondition: for all blocks block.info.is_overflows flag must be the same. /// (either all blocks are from overflow data or none blocks are). /// The resulting block has the same value of is_overflows flag. - Block mergeBlocks(BlocksList & blocks, bool final); + Block mergeBlocks(BlocksList & blocks, bool final, std::atomic * is_cancelled); /** Split block with partially-aggregated data to many blocks, as if two-level method of aggregation was used. * This is needed to simplify merging of that data with other results, that are already two-level. @@ -486,7 +490,8 @@ private: Arena * arena) const; void mergeWithoutKeyDataImpl( - ManyAggregatedDataVariants & non_empty_data) const; + ManyAggregatedDataVariants & non_empty_data, + std::atomic * is_cancelled) const; template void mergeSingleLevelDataImpl( @@ -597,13 +602,15 @@ private: void mergeBlockWithoutKeyStreamsImpl( Block block, - AggregatedDataVariants & result) const; + AggregatedDataVariants & result, + std::atomic * is_cancelled) const; void mergeWithoutKeyStreamsImpl( AggregatedDataVariants & result, size_t row_begin, size_t row_end, - const AggregateColumnsConstData & aggregate_columns_data) const; + const AggregateColumnsConstData & aggregate_columns_data, + std::atomic * is_cancelled) const; template void mergeBucketImpl( diff --git a/src/Processors/IProcessor.h b/src/Processors/IProcessor.h index c6bef186877..56b4509fe00 100644 --- a/src/Processors/IProcessor.h +++ b/src/Processors/IProcessor.h @@ -369,6 +369,8 @@ public: protected: virtual void onCancel() {} + std::atomic is_cancelled{false}; + private: /// For: /// - elapsed_us @@ -378,8 +380,6 @@ private: /// - output_wait_elapsed_us friend class ExecutingGraph; - std::atomic is_cancelled{false}; - std::string processor_description; /// For processors_profile_log diff --git a/src/Processors/Transforms/AggregatingInOrderTransform.cpp b/src/Processors/Transforms/AggregatingInOrderTransform.cpp index a39a0db1311..f959b2b01b4 100644 --- a/src/Processors/Transforms/AggregatingInOrderTransform.cpp +++ b/src/Processors/Transforms/AggregatingInOrderTransform.cpp @@ -160,7 +160,7 @@ void AggregatingInOrderTransform::consume(Chunk chunk) if (group_by_key) params->aggregator.mergeOnBlockSmall(variants, key_begin, key_end, aggregate_columns_data, key_columns_raw); else - params->aggregator.mergeOnIntervalWithoutKey(variants, key_begin, key_end, aggregate_columns_data); + params->aggregator.mergeOnIntervalWithoutKey(variants, key_begin, key_end, aggregate_columns_data, &is_cancelled); } else { diff --git a/src/Processors/Transforms/AggregatingTransform.cpp b/src/Processors/Transforms/AggregatingTransform.cpp index ea5c525d5f2..767448edc64 100644 --- a/src/Processors/Transforms/AggregatingTransform.cpp +++ b/src/Processors/Transforms/AggregatingTransform.cpp @@ -285,7 +285,11 @@ class ConvertingAggregatedToChunksTransform : public IProcessor { public: ConvertingAggregatedToChunksTransform(AggregatingTransformParamsPtr params_, ManyAggregatedDataVariantsPtr data_, size_t num_threads_) - : IProcessor({}, {params_->getHeader()}), params(std::move(params_)), data(std::move(data_)), num_threads(num_threads_) + : IProcessor({}, {params_->getHeader()}) + , params(std::move(params_)) + , data(std::move(data_)) + , shared_data(std::make_shared()) + , num_threads(num_threads_) { } @@ -346,8 +350,7 @@ public: for (auto & input : inputs) input.close(); - if (shared_data) - shared_data->is_cancelled.store(true); + shared_data->is_cancelled.store(true); return Status::Finished; } @@ -372,6 +375,11 @@ public: return prepareTwoLevel(); } + void onCancel() override + { + shared_data->is_cancelled.store(true, std::memory_order_seq_cst); + } + private: IProcessor::Status preparePushToOutput() { @@ -464,7 +472,7 @@ private: if (first->type == AggregatedDataVariants::Type::without_key || params->params.overflow_row) { - params->aggregator.mergeWithoutKeyDataImpl(*data); + params->aggregator.mergeWithoutKeyDataImpl(*data, &shared_data->is_cancelled); auto block = params->aggregator.prepareBlockAndFillWithoutKey( *first, params->final, first->type != AggregatedDataVariants::Type::without_key); @@ -506,7 +514,7 @@ private: void createSources() { AggregatedDataVariantsPtr & first = data->at(0); - shared_data = std::make_shared(); + processors.reserve(num_threads); for (size_t thread = 0; thread < num_threads; ++thread) { @@ -684,7 +692,7 @@ void AggregatingTransform::consume(Chunk chunk) { auto block = getInputs().front().getHeader().cloneWithColumns(chunk.detachColumns()); block = materializeBlock(block); - if (!params->aggregator.mergeOnBlock(block, variants, no_more_keys)) + if (!params->aggregator.mergeOnBlock(block, variants, no_more_keys, &is_cancelled)) is_consume_finished = true; } else @@ -704,7 +712,7 @@ void AggregatingTransform::initGenerate() if (variants.empty() && params->params.keys_size == 0 && !params->params.empty_result_for_aggregation_by_empty_set) { if (params->params.only_merge) - params->aggregator.mergeOnBlock(getInputs().front().getHeader(), variants, no_more_keys); + params->aggregator.mergeOnBlock(getInputs().front().getHeader(), variants, no_more_keys, &is_cancelled); else params->aggregator.executeOnBlock(getInputs().front().getHeader(), variants, key_columns, aggregate_columns, no_more_keys); } diff --git a/src/Processors/Transforms/MergingAggregatedMemoryEfficientTransform.cpp b/src/Processors/Transforms/MergingAggregatedMemoryEfficientTransform.cpp index a92e2253314..3bfd7874ac7 100644 --- a/src/Processors/Transforms/MergingAggregatedMemoryEfficientTransform.cpp +++ b/src/Processors/Transforms/MergingAggregatedMemoryEfficientTransform.cpp @@ -363,7 +363,7 @@ void MergingAggregatedBucketTransform::transform(Chunk & chunk) res_info->chunk_num = chunks_to_merge->chunk_num; chunk.setChunkInfo(std::move(res_info)); - auto block = params->aggregator.mergeBlocks(blocks_list, params->final); + auto block = params->aggregator.mergeBlocks(blocks_list, params->final, &is_cancelled); if (!required_sort_description.empty()) sortBlock(block, required_sort_description); diff --git a/src/Processors/Transforms/MergingAggregatedTransform.cpp b/src/Processors/Transforms/MergingAggregatedTransform.cpp index e4955d06859..64207093568 100644 --- a/src/Processors/Transforms/MergingAggregatedTransform.cpp +++ b/src/Processors/Transforms/MergingAggregatedTransform.cpp @@ -39,11 +39,10 @@ void MergingAggregatedTransform::consume(Chunk chunk) if (const auto * agg_info = typeid_cast(info.get())) { /** If the remote servers used a two-level aggregation method, - * then blocks will contain information about the number of the bucket. - * Then the calculations can be parallelized by buckets. - * We decompose the blocks to the bucket numbers indicated in them. - */ - + * then blocks will contain information about the number of the bucket. + * Then the calculations can be parallelized by buckets. + * We decompose the blocks to the bucket numbers indicated in them. + */ auto block = getInputPort().getHeader().cloneWithColumns(chunk.getColumns()); block.info.is_overflows = agg_info->is_overflows; block.info.bucket_num = agg_info->bucket_num; @@ -73,7 +72,7 @@ Chunk MergingAggregatedTransform::generate() next_block = blocks.begin(); /// TODO: this operation can be made async. Add async for IAccumulatingTransform. - params->aggregator.mergeBlocks(std::move(bucket_to_blocks), data_variants, max_threads); + params->aggregator.mergeBlocks(std::move(bucket_to_blocks), data_variants, max_threads, &is_cancelled); blocks = params->aggregator.convertToBlocks(data_variants, params->final, max_threads); next_block = blocks.begin(); } diff --git a/src/Processors/Transforms/RollupTransform.cpp b/src/Processors/Transforms/RollupTransform.cpp index a5d67fb2f15..20ee91a203a 100644 --- a/src/Processors/Transforms/RollupTransform.cpp +++ b/src/Processors/Transforms/RollupTransform.cpp @@ -57,7 +57,7 @@ Chunk GroupByModifierTransform::merge(Chunks && chunks, bool is_input, bool fina for (auto & chunk : chunks) blocks.emplace_back(header.cloneWithColumns(chunk.detachColumns())); - auto current_block = is_input ? params->aggregator.mergeBlocks(blocks, final) : output_aggregator->mergeBlocks(blocks, final); + auto current_block = is_input ? params->aggregator.mergeBlocks(blocks, final, &is_cancelled) : output_aggregator->mergeBlocks(blocks, final, &is_cancelled); auto num_rows = current_block.rows(); return Chunk(current_block.getColumns(), num_rows); }