From 204a20fe29f1ee552b7420e79bf2a947b66c9e3f Mon Sep 17 00:00:00 2001 From: Anton Popov Date: Wed, 23 Dec 2020 20:34:00 +0300 Subject: [PATCH] remove unused code --- src/Interpreters/Aggregator.cpp | 297 ------------------ src/Interpreters/Aggregator.h | 6 - .../Transforms/AggregatingTransform.cpp | 1 + .../Transforms/AggregatingTransform.h | 6 + .../Transforms/MergingAggregatedTransform.cpp | 6 + 5 files changed, 13 insertions(+), 303 deletions(-) diff --git a/src/Interpreters/Aggregator.cpp b/src/Interpreters/Aggregator.cpp index 87abca4d7cd..15fc36d2364 100644 --- a/src/Interpreters/Aggregator.cpp +++ b/src/Interpreters/Aggregator.cpp @@ -13,9 +13,7 @@ #include #include #include -#include #include -#include #include #include #include @@ -1026,56 +1024,6 @@ bool Aggregator::checkLimits(size_t result_size, bool & no_more_keys) const } -void Aggregator::execute(const BlockInputStreamPtr & stream, AggregatedDataVariants & result) -{ - if (isCancelled()) - return; - - ColumnRawPtrs key_columns(params.keys_size); - AggregateColumns aggregate_columns(params.aggregates_size); - - /** Used if there is a limit on the maximum number of rows in the aggregation, - * and if group_by_overflow_mode == ANY. - * In this case, new keys are not added to the set, but aggregation is performed only by - * keys that have already managed to get into the set. - */ - bool no_more_keys = false; - - LOG_TRACE(log, "Aggregating"); - - Stopwatch watch; - - size_t src_rows = 0; - size_t src_bytes = 0; - - /// Read all the data - while (Block block = stream->read()) - { - if (isCancelled()) - return; - - src_rows += block.rows(); - src_bytes += block.bytes(); - - if (!executeOnBlock(block, result, key_columns, aggregate_columns, no_more_keys)) - break; - } - - /// If there was no data, and we aggregate without keys, and we must return single row with the result of empty aggregation. - /// To do this, we pass a block with zero rows to aggregate. - if (result.empty() && params.keys_size == 0 && !params.empty_result_for_aggregation_by_empty_set) - executeOnBlock(stream->getHeader(), result, key_columns, aggregate_columns, no_more_keys); - - double elapsed_seconds = watch.elapsedSeconds(); - size_t rows = result.sizeWithoutOverflowRow(); - - LOG_TRACE(log, "Aggregated. {} to {} rows (from {}) in {} sec. ({} rows/sec., {}/sec.)", - src_rows, rows, ReadableSize(src_bytes), - elapsed_seconds, src_rows / elapsed_seconds, - ReadableSize(src_bytes / elapsed_seconds)); -} - - template void Aggregator::convertToBlockImpl( Method & method, @@ -1769,206 +1717,6 @@ void NO_INLINE Aggregator::mergeBucketImpl( } } - -/** Combines aggregation states together, turns them into blocks, and outputs streams. - * If the aggregation states are two-level, then it produces blocks strictly in order of 'bucket_num'. - * (This is important for distributed processing.) - * In doing so, it can handle different buckets in parallel, using up to `threads` threads. - */ -class MergingAndConvertingBlockInputStream : public IBlockInputStream -{ -public: - /** The input is a set of non-empty sets of partially aggregated data, - * which are all either single-level, or are two-level. - */ - MergingAndConvertingBlockInputStream(const Aggregator & aggregator_, ManyAggregatedDataVariants & data_, bool final_, size_t threads_) - : aggregator(aggregator_), data(data_), final(final_), threads(threads_) - { - /// At least we need one arena in first data item per thread - if (!data.empty() && threads > data[0]->aggregates_pools.size()) - { - Arenas & first_pool = data[0]->aggregates_pools; - for (size_t j = first_pool.size(); j < threads; j++) - first_pool.emplace_back(std::make_shared()); - } - } - - String getName() const override { return "MergingAndConverting"; } - - Block getHeader() const override { return aggregator.getHeader(final); } - - ~MergingAndConvertingBlockInputStream() override - { - LOG_TRACE(&Poco::Logger::get(__PRETTY_FUNCTION__), "Waiting for threads to finish"); - - /// We need to wait for threads to finish before destructor of 'parallel_merge_data', - /// because the threads access 'parallel_merge_data'. - if (parallel_merge_data) - parallel_merge_data->pool.wait(); - } - -protected: - Block readImpl() override - { - if (data.empty()) - return {}; - - if (current_bucket_num >= NUM_BUCKETS) - return {}; - - AggregatedDataVariantsPtr & first = data[0]; - - if (current_bucket_num == -1) - { - ++current_bucket_num; - - if (first->type == AggregatedDataVariants::Type::without_key || aggregator.params.overflow_row) - { - aggregator.mergeWithoutKeyDataImpl(data); - return aggregator.prepareBlockAndFillWithoutKey( - *first, final, first->type != AggregatedDataVariants::Type::without_key); - } - } - - if (!first->isTwoLevel()) - { - if (current_bucket_num > 0) - return {}; - - if (first->type == AggregatedDataVariants::Type::without_key) - return {}; - - ++current_bucket_num; - - #define M(NAME) \ - else if (first->type == AggregatedDataVariants::Type::NAME) \ - aggregator.mergeSingleLevelDataImplNAME)::element_type>(data); - if (false) {} // NOLINT - APPLY_FOR_VARIANTS_SINGLE_LEVEL(M) - #undef M - else - throw Exception("Unknown aggregated data variant.", ErrorCodes::UNKNOWN_AGGREGATED_DATA_VARIANT); - - return aggregator.prepareBlockAndFillSingleLevel(*first, final); - } - else - { - if (!parallel_merge_data) - { - parallel_merge_data = std::make_unique(threads); - for (size_t i = 0; i < threads; ++i) - scheduleThreadForNextBucket(); - } - - Block res; - - while (true) - { - std::unique_lock lock(parallel_merge_data->mutex); - - if (parallel_merge_data->exception) - std::rethrow_exception(parallel_merge_data->exception); - - auto it = parallel_merge_data->ready_blocks.find(current_bucket_num); - if (it != parallel_merge_data->ready_blocks.end()) - { - ++current_bucket_num; - scheduleThreadForNextBucket(); - - if (it->second) - { - res.swap(it->second); - break; - } - else if (current_bucket_num >= NUM_BUCKETS) - break; - } - - parallel_merge_data->condvar.wait(lock); - } - - return res; - } - } - -private: - const Aggregator & aggregator; - ManyAggregatedDataVariants data; - bool final; - size_t threads; - - Int32 current_bucket_num = -1; - Int32 max_scheduled_bucket_num = -1; - static constexpr Int32 NUM_BUCKETS = 256; - - struct ParallelMergeData - { - std::map ready_blocks; - std::exception_ptr exception; - std::mutex mutex; - std::condition_variable condvar; - ThreadPool pool; - - explicit ParallelMergeData(size_t threads_) : pool(threads_) {} - }; - - std::unique_ptr parallel_merge_data; - - void scheduleThreadForNextBucket() - { - ++max_scheduled_bucket_num; - if (max_scheduled_bucket_num >= NUM_BUCKETS) - return; - - parallel_merge_data->pool.scheduleOrThrowOnError( - [this, max_scheduled_bucket_num = max_scheduled_bucket_num, group = CurrentThread::getGroup()] - { return thread(max_scheduled_bucket_num, group); }); - } - - void thread(Int32 bucket_num, ThreadGroupStatusPtr thread_group) - { - try - { - setThreadName("MergingAggregtd"); - if (thread_group) - CurrentThread::attachToIfDetached(thread_group); - CurrentMetrics::Increment metric_increment{CurrentMetrics::QueryThread}; - - /// TODO: add no_more_keys support maybe - - auto & merged_data = *data[0]; - auto method = merged_data.type; - Block block; - - /// Select Arena to avoid race conditions - size_t thread_number = static_cast(bucket_num) % threads; - Arena * arena = merged_data.aggregates_pools.at(thread_number).get(); - - if (false) {} // NOLINT - #define M(NAME) \ - else if (method == AggregatedDataVariants::Type::NAME) \ - { \ - aggregator.mergeBucketImpl(data, bucket_num, arena); \ - block = aggregator.convertOneBucketToBlock(merged_data, *merged_data.NAME, final, bucket_num); \ - } - - APPLY_FOR_VARIANTS_TWO_LEVEL(M) - #undef M - - std::lock_guard lock(parallel_merge_data->mutex); - parallel_merge_data->ready_blocks[bucket_num] = std::move(block); - } - catch (...) - { - std::lock_guard lock(parallel_merge_data->mutex); - if (!parallel_merge_data->exception) - parallel_merge_data->exception = std::current_exception(); - } - - parallel_merge_data->condvar.notify_all(); - } -}; - ManyAggregatedDataVariants Aggregator::prepareVariantsToMerge(ManyAggregatedDataVariants & data_variants) const { if (data_variants.empty()) @@ -2030,18 +1778,6 @@ ManyAggregatedDataVariants Aggregator::prepareVariantsToMerge(ManyAggregatedData return non_empty_data; } -std::unique_ptr Aggregator::mergeAndConvertToBlocks( - ManyAggregatedDataVariants & data_variants, bool final, size_t max_threads) const -{ - ManyAggregatedDataVariants non_empty_data = prepareVariantsToMerge(data_variants); - - if (non_empty_data.empty()) - return std::make_unique(getHeader(final)); - - return std::make_unique(*this, non_empty_data, final, max_threads); -} - - template void NO_INLINE Aggregator::mergeStreamsImplCase( Block & block, @@ -2159,38 +1895,6 @@ void NO_INLINE Aggregator::mergeWithoutKeyStreamsImpl( } -void Aggregator::mergeStream(const BlockInputStreamPtr & stream, AggregatedDataVariants & result, size_t max_threads) -{ - if (isCancelled()) - return; - - /** If the remote servers used a two-level aggregation method, - * then blocks will contain information about the number of the bucket. - * Then the calculations can be parallelized by buckets. - * We decompose the blocks to the bucket numbers indicated in them. - */ - BucketToBlocks bucket_to_blocks; - - /// Read all the data. - LOG_TRACE(log, "Reading blocks of partially aggregated data."); - - size_t total_input_rows = 0; - size_t total_input_blocks = 0; - while (Block block = stream->read()) - { - if (isCancelled()) - return; - - total_input_rows += block.rows(); - ++total_input_blocks; - bucket_to_blocks[block.info.bucket_num].emplace_back(std::move(block)); - } - - LOG_TRACE(log, "Read {} blocks of partially aggregated data, total {} rows.", total_input_blocks, total_input_rows); - - mergeBlocks(bucket_to_blocks, result, max_threads); -} - void Aggregator::mergeBlocks(BucketToBlocks bucket_to_blocks, AggregatedDataVariants & result, size_t max_threads) { if (bucket_to_blocks.empty()) @@ -2429,7 +2133,6 @@ Block Aggregator::mergeBlocks(BlocksList & blocks, bool final) return block; } - template void NO_INLINE Aggregator::convertBlockToTwoLevelImpl( Method & method, diff --git a/src/Interpreters/Aggregator.h b/src/Interpreters/Aggregator.h index c688da9d32d..a07dbbb7e59 100644 --- a/src/Interpreters/Aggregator.h +++ b/src/Interpreters/Aggregator.h @@ -928,9 +928,6 @@ public: Aggregator(const Params & params_); - /// Aggregate the source. Get the result in the form of one of the data structures. - void execute(const BlockInputStreamPtr & stream, AggregatedDataVariants & result); - using AggregateColumns = std::vector; using AggregateColumnsData = std::vector; using AggregateColumnsConstData = std::vector; @@ -954,9 +951,6 @@ public: */ BlocksList convertToBlocks(AggregatedDataVariants & data_variants, bool final, size_t max_threads) const; - /** Merge several aggregation data structures and output the result as a block stream. - */ - std::unique_ptr mergeAndConvertToBlocks(ManyAggregatedDataVariants & data_variants, bool final, size_t max_threads) const; ManyAggregatedDataVariants prepareVariantsToMerge(ManyAggregatedDataVariants & data_variants) const; /** Merge the stream of partially aggregated blocks into one data structure. diff --git a/src/Processors/Transforms/AggregatingTransform.cpp b/src/Processors/Transforms/AggregatingTransform.cpp index 0a97cc3d4cb..c6907202d31 100644 --- a/src/Processors/Transforms/AggregatingTransform.cpp +++ b/src/Processors/Transforms/AggregatingTransform.cpp @@ -379,6 +379,7 @@ private: for (size_t thread = 0; thread < num_threads; ++thread) { + /// Select Arena to avoid race conditions Arena * arena = first->aggregates_pools.at(thread).get(); auto source = std::make_shared( params, data, shared_data, arena); diff --git a/src/Processors/Transforms/AggregatingTransform.h b/src/Processors/Transforms/AggregatingTransform.h index ab18992151a..86f6ecb4a36 100644 --- a/src/Processors/Transforms/AggregatingTransform.h +++ b/src/Processors/Transforms/AggregatingTransform.h @@ -103,6 +103,12 @@ private: ColumnRawPtrs key_columns; Aggregator::AggregateColumns aggregate_columns; + + /** Used if there is a limit on the maximum number of rows in the aggregation, + * and if group_by_overflow_mode == ANY. + * In this case, new keys are not added to the set, but aggregation is performed only by + * keys that have already managed to get into the set. + */ bool no_more_keys = false; ManyAggregatedDataPtr many_data; diff --git a/src/Processors/Transforms/MergingAggregatedTransform.cpp b/src/Processors/Transforms/MergingAggregatedTransform.cpp index f386cbe4bec..1a04f85fd9c 100644 --- a/src/Processors/Transforms/MergingAggregatedTransform.cpp +++ b/src/Processors/Transforms/MergingAggregatedTransform.cpp @@ -34,6 +34,12 @@ void MergingAggregatedTransform::consume(Chunk chunk) if (!agg_info) throw Exception("Chunk should have AggregatedChunkInfo in MergingAggregatedTransform.", ErrorCodes::LOGICAL_ERROR); + /** If the remote servers used a two-level aggregation method, + * then blocks will contain information about the number of the bucket. + * Then the calculations can be parallelized by buckets. + * We decompose the blocks to the bucket numbers indicated in them. + */ + auto block = getInputPort().getHeader().cloneWithColumns(chunk.getColumns()); block.info.is_overflows = agg_info->is_overflows; block.info.bucket_num = agg_info->bucket_num;