remove unused code

This commit is contained in:
Anton Popov 2020-12-23 20:34:00 +03:00
parent ddfe9f61e0
commit 204a20fe29
5 changed files with 13 additions and 303 deletions

View File

@ -13,9 +13,7 @@
#include <Columns/ColumnArray.h>
#include <Columns/ColumnTuple.h>
#include <Columns/ColumnLowCardinality.h>
#include <DataStreams/IBlockInputStream.h>
#include <DataStreams/NativeBlockOutputStream.h>
#include <DataStreams/NullBlockInputStream.h>
#include <DataStreams/materializeBlock.h>
#include <IO/WriteBufferFromFile.h>
#include <Compression/CompressedWriteBuffer.h>
@ -1026,56 +1024,6 @@ bool Aggregator::checkLimits(size_t result_size, bool & no_more_keys) const
}
void Aggregator::execute(const BlockInputStreamPtr & stream, AggregatedDataVariants & result)
{
if (isCancelled())
return;
ColumnRawPtrs key_columns(params.keys_size);
AggregateColumns aggregate_columns(params.aggregates_size);
/** Used if there is a limit on the maximum number of rows in the aggregation,
* and if group_by_overflow_mode == ANY.
* In this case, new keys are not added to the set, but aggregation is performed only by
* keys that have already managed to get into the set.
*/
bool no_more_keys = false;
LOG_TRACE(log, "Aggregating");
Stopwatch watch;
size_t src_rows = 0;
size_t src_bytes = 0;
/// Read all the data
while (Block block = stream->read())
{
if (isCancelled())
return;
src_rows += block.rows();
src_bytes += block.bytes();
if (!executeOnBlock(block, result, key_columns, aggregate_columns, no_more_keys))
break;
}
/// If there was no data, and we aggregate without keys, and we must return single row with the result of empty aggregation.
/// To do this, we pass a block with zero rows to aggregate.
if (result.empty() && params.keys_size == 0 && !params.empty_result_for_aggregation_by_empty_set)
executeOnBlock(stream->getHeader(), result, key_columns, aggregate_columns, no_more_keys);
double elapsed_seconds = watch.elapsedSeconds();
size_t rows = result.sizeWithoutOverflowRow();
LOG_TRACE(log, "Aggregated. {} to {} rows (from {}) in {} sec. ({} rows/sec., {}/sec.)",
src_rows, rows, ReadableSize(src_bytes),
elapsed_seconds, src_rows / elapsed_seconds,
ReadableSize(src_bytes / elapsed_seconds));
}
template <typename Method, typename Table>
void Aggregator::convertToBlockImpl(
Method & method,
@ -1769,206 +1717,6 @@ void NO_INLINE Aggregator::mergeBucketImpl(
}
}
/** Combines aggregation states together, turns them into blocks, and outputs streams.
* If the aggregation states are two-level, then it produces blocks strictly in order of 'bucket_num'.
* (This is important for distributed processing.)
* In doing so, it can handle different buckets in parallel, using up to `threads` threads.
*/
class MergingAndConvertingBlockInputStream : public IBlockInputStream
{
public:
/** The input is a set of non-empty sets of partially aggregated data,
* which are all either single-level, or are two-level.
*/
MergingAndConvertingBlockInputStream(const Aggregator & aggregator_, ManyAggregatedDataVariants & data_, bool final_, size_t threads_)
: aggregator(aggregator_), data(data_), final(final_), threads(threads_)
{
/// At least we need one arena in first data item per thread
if (!data.empty() && threads > data[0]->aggregates_pools.size())
{
Arenas & first_pool = data[0]->aggregates_pools;
for (size_t j = first_pool.size(); j < threads; j++)
first_pool.emplace_back(std::make_shared<Arena>());
}
}
String getName() const override { return "MergingAndConverting"; }
Block getHeader() const override { return aggregator.getHeader(final); }
~MergingAndConvertingBlockInputStream() override
{
LOG_TRACE(&Poco::Logger::get(__PRETTY_FUNCTION__), "Waiting for threads to finish");
/// We need to wait for threads to finish before destructor of 'parallel_merge_data',
/// because the threads access 'parallel_merge_data'.
if (parallel_merge_data)
parallel_merge_data->pool.wait();
}
protected:
Block readImpl() override
{
if (data.empty())
return {};
if (current_bucket_num >= NUM_BUCKETS)
return {};
AggregatedDataVariantsPtr & first = data[0];
if (current_bucket_num == -1)
{
++current_bucket_num;
if (first->type == AggregatedDataVariants::Type::without_key || aggregator.params.overflow_row)
{
aggregator.mergeWithoutKeyDataImpl(data);
return aggregator.prepareBlockAndFillWithoutKey(
*first, final, first->type != AggregatedDataVariants::Type::without_key);
}
}
if (!first->isTwoLevel())
{
if (current_bucket_num > 0)
return {};
if (first->type == AggregatedDataVariants::Type::without_key)
return {};
++current_bucket_num;
#define M(NAME) \
else if (first->type == AggregatedDataVariants::Type::NAME) \
aggregator.mergeSingleLevelDataImpl<decltype(first->NAME)::element_type>(data);
if (false) {} // NOLINT
APPLY_FOR_VARIANTS_SINGLE_LEVEL(M)
#undef M
else
throw Exception("Unknown aggregated data variant.", ErrorCodes::UNKNOWN_AGGREGATED_DATA_VARIANT);
return aggregator.prepareBlockAndFillSingleLevel(*first, final);
}
else
{
if (!parallel_merge_data)
{
parallel_merge_data = std::make_unique<ParallelMergeData>(threads);
for (size_t i = 0; i < threads; ++i)
scheduleThreadForNextBucket();
}
Block res;
while (true)
{
std::unique_lock lock(parallel_merge_data->mutex);
if (parallel_merge_data->exception)
std::rethrow_exception(parallel_merge_data->exception);
auto it = parallel_merge_data->ready_blocks.find(current_bucket_num);
if (it != parallel_merge_data->ready_blocks.end())
{
++current_bucket_num;
scheduleThreadForNextBucket();
if (it->second)
{
res.swap(it->second);
break;
}
else if (current_bucket_num >= NUM_BUCKETS)
break;
}
parallel_merge_data->condvar.wait(lock);
}
return res;
}
}
private:
const Aggregator & aggregator;
ManyAggregatedDataVariants data;
bool final;
size_t threads;
Int32 current_bucket_num = -1;
Int32 max_scheduled_bucket_num = -1;
static constexpr Int32 NUM_BUCKETS = 256;
struct ParallelMergeData
{
std::map<Int32, Block> ready_blocks;
std::exception_ptr exception;
std::mutex mutex;
std::condition_variable condvar;
ThreadPool pool;
explicit ParallelMergeData(size_t threads_) : pool(threads_) {}
};
std::unique_ptr<ParallelMergeData> parallel_merge_data;
void scheduleThreadForNextBucket()
{
++max_scheduled_bucket_num;
if (max_scheduled_bucket_num >= NUM_BUCKETS)
return;
parallel_merge_data->pool.scheduleOrThrowOnError(
[this, max_scheduled_bucket_num = max_scheduled_bucket_num, group = CurrentThread::getGroup()]
{ return thread(max_scheduled_bucket_num, group); });
}
void thread(Int32 bucket_num, ThreadGroupStatusPtr thread_group)
{
try
{
setThreadName("MergingAggregtd");
if (thread_group)
CurrentThread::attachToIfDetached(thread_group);
CurrentMetrics::Increment metric_increment{CurrentMetrics::QueryThread};
/// TODO: add no_more_keys support maybe
auto & merged_data = *data[0];
auto method = merged_data.type;
Block block;
/// Select Arena to avoid race conditions
size_t thread_number = static_cast<size_t>(bucket_num) % threads;
Arena * arena = merged_data.aggregates_pools.at(thread_number).get();
if (false) {} // NOLINT
#define M(NAME) \
else if (method == AggregatedDataVariants::Type::NAME) \
{ \
aggregator.mergeBucketImpl<decltype(merged_data.NAME)::element_type>(data, bucket_num, arena); \
block = aggregator.convertOneBucketToBlock(merged_data, *merged_data.NAME, final, bucket_num); \
}
APPLY_FOR_VARIANTS_TWO_LEVEL(M)
#undef M
std::lock_guard lock(parallel_merge_data->mutex);
parallel_merge_data->ready_blocks[bucket_num] = std::move(block);
}
catch (...)
{
std::lock_guard lock(parallel_merge_data->mutex);
if (!parallel_merge_data->exception)
parallel_merge_data->exception = std::current_exception();
}
parallel_merge_data->condvar.notify_all();
}
};
ManyAggregatedDataVariants Aggregator::prepareVariantsToMerge(ManyAggregatedDataVariants & data_variants) const
{
if (data_variants.empty())
@ -2030,18 +1778,6 @@ ManyAggregatedDataVariants Aggregator::prepareVariantsToMerge(ManyAggregatedData
return non_empty_data;
}
std::unique_ptr<IBlockInputStream> Aggregator::mergeAndConvertToBlocks(
ManyAggregatedDataVariants & data_variants, bool final, size_t max_threads) const
{
ManyAggregatedDataVariants non_empty_data = prepareVariantsToMerge(data_variants);
if (non_empty_data.empty())
return std::make_unique<NullBlockInputStream>(getHeader(final));
return std::make_unique<MergingAndConvertingBlockInputStream>(*this, non_empty_data, final, max_threads);
}
template <bool no_more_keys, typename Method, typename Table>
void NO_INLINE Aggregator::mergeStreamsImplCase(
Block & block,
@ -2159,38 +1895,6 @@ void NO_INLINE Aggregator::mergeWithoutKeyStreamsImpl(
}
void Aggregator::mergeStream(const BlockInputStreamPtr & stream, AggregatedDataVariants & result, size_t max_threads)
{
if (isCancelled())
return;
/** If the remote servers used a two-level aggregation method,
* then blocks will contain information about the number of the bucket.
* Then the calculations can be parallelized by buckets.
* We decompose the blocks to the bucket numbers indicated in them.
*/
BucketToBlocks bucket_to_blocks;
/// Read all the data.
LOG_TRACE(log, "Reading blocks of partially aggregated data.");
size_t total_input_rows = 0;
size_t total_input_blocks = 0;
while (Block block = stream->read())
{
if (isCancelled())
return;
total_input_rows += block.rows();
++total_input_blocks;
bucket_to_blocks[block.info.bucket_num].emplace_back(std::move(block));
}
LOG_TRACE(log, "Read {} blocks of partially aggregated data, total {} rows.", total_input_blocks, total_input_rows);
mergeBlocks(bucket_to_blocks, result, max_threads);
}
void Aggregator::mergeBlocks(BucketToBlocks bucket_to_blocks, AggregatedDataVariants & result, size_t max_threads)
{
if (bucket_to_blocks.empty())
@ -2429,7 +2133,6 @@ Block Aggregator::mergeBlocks(BlocksList & blocks, bool final)
return block;
}
template <typename Method>
void NO_INLINE Aggregator::convertBlockToTwoLevelImpl(
Method & method,

View File

@ -928,9 +928,6 @@ public:
Aggregator(const Params & params_);
/// Aggregate the source. Get the result in the form of one of the data structures.
void execute(const BlockInputStreamPtr & stream, AggregatedDataVariants & result);
using AggregateColumns = std::vector<ColumnRawPtrs>;
using AggregateColumnsData = std::vector<ColumnAggregateFunction::Container *>;
using AggregateColumnsConstData = std::vector<const ColumnAggregateFunction::Container *>;
@ -954,9 +951,6 @@ public:
*/
BlocksList convertToBlocks(AggregatedDataVariants & data_variants, bool final, size_t max_threads) const;
/** Merge several aggregation data structures and output the result as a block stream.
*/
std::unique_ptr<IBlockInputStream> mergeAndConvertToBlocks(ManyAggregatedDataVariants & data_variants, bool final, size_t max_threads) const;
ManyAggregatedDataVariants prepareVariantsToMerge(ManyAggregatedDataVariants & data_variants) const;
/** Merge the stream of partially aggregated blocks into one data structure.

View File

@ -379,6 +379,7 @@ private:
for (size_t thread = 0; thread < num_threads; ++thread)
{
/// Select Arena to avoid race conditions
Arena * arena = first->aggregates_pools.at(thread).get();
auto source = std::make_shared<ConvertingAggregatedToChunksSource>(
params, data, shared_data, arena);

View File

@ -103,6 +103,12 @@ private:
ColumnRawPtrs key_columns;
Aggregator::AggregateColumns aggregate_columns;
/** Used if there is a limit on the maximum number of rows in the aggregation,
* and if group_by_overflow_mode == ANY.
* In this case, new keys are not added to the set, but aggregation is performed only by
* keys that have already managed to get into the set.
*/
bool no_more_keys = false;
ManyAggregatedDataPtr many_data;

View File

@ -34,6 +34,12 @@ void MergingAggregatedTransform::consume(Chunk chunk)
if (!agg_info)
throw Exception("Chunk should have AggregatedChunkInfo in MergingAggregatedTransform.", ErrorCodes::LOGICAL_ERROR);
/** If the remote servers used a two-level aggregation method,
* then blocks will contain information about the number of the bucket.
* Then the calculations can be parallelized by buckets.
* We decompose the blocks to the bucket numbers indicated in them.
*/
auto block = getInputPort().getHeader().cloneWithColumns(chunk.getColumns());
block.info.is_overflows = agg_info->is_overflows;
block.info.bucket_num = agg_info->bucket_num;