Aggregator remove unused code

This commit is contained in:
Maksim Kita 2021-04-26 01:17:24 +03:00
parent 38f3723048
commit cac4a85286
3 changed files with 67 additions and 101 deletions

View File

@ -86,9 +86,6 @@ struct HashMapCell
/// Do I need to store the zero key separately (that is, can a zero key be inserted into the hash table).
static constexpr bool need_zero_value_storage = true;
/// Whether the cell was deleted.
bool isDeleted() const { return false; }
void setMapped(const value_type & value_) { value.second = value_.second; }
/// Serialization, in binary and text form.
@ -195,13 +192,26 @@ public:
/// and func is invoked with the third argument emplaced set to true. Otherwise
/// emplaced is set to false.
template <typename Func>
void ALWAYS_INLINE mergeToViaEmplace(Self & that, Func && func)
void ALWAYS_INLINE mergeToViaEmplace(Self & that [[maybe_unused]], Func && func)
{
for (auto it = this->begin(), end = this->end(); it != end; ++it)
typename Self::LookupResult res_it;
bool inserted;
if (this->hasZero())
{
typename Self::LookupResult res_it;
bool inserted;
that.emplace(Cell::getKey(it->getValue()), res_it, inserted, it.getHash());
auto * zero_cell = this->zeroValue();
that.emplace(Cell::getKey(zero_cell->getValue()), res_it, inserted, zero_cell->getHash(*this));
func(res_it->getMapped(), zero_cell->getMapped(), inserted);
}
size_t buf_size = this->grower.bufSize();
for (size_t i = 0; i < buf_size; ++i)
{
auto * it = (this->buf + i);
if (it->isZero(*this))
continue;
that.emplace(Cell::getKey(it->getValue()), res_it, inserted, it->getHash(*this));
func(res_it->getMapped(), it->getMapped(), inserted);
}
}
@ -214,13 +224,24 @@ public:
template <typename Func>
void ALWAYS_INLINE mergeToViaFind(Self & that, Func && func)
{
for (auto it = this->begin(), end = this->end(); it != end; ++it)
if (this->hasZero())
{
auto res_it = that.find(Cell::getKey(it->getValue()), it.getHash());
if (!res_it)
func(it->getMapped(), it->getMapped(), false);
if (that.hasZero())
func(this->zeroValue()->getMapped(), that.zeroValue()->getMapped(), true);
else
func(res_it->getMapped(), it->getMapped(), true);
func(this->zeroValue()->getMapped(), that.zeroValue()->getMapped(), false);
}
size_t buf_size = this->grower.bufSize();
for (size_t i = 0; i < buf_size; ++i)
{
auto * it = (this->buf + i);
if (it->isZero(*this))
continue;
auto res_it = that.find(Cell::getKey(it->getValue()), it->getHash(*this));
bool found = res_it != nullptr;
func(res_it->getMapped(), it->getMapped(), found);
}
}
@ -228,16 +249,31 @@ public:
template <typename Func>
void forEachValue(Func && func)
{
for (auto & v : *this)
func(v.getKey(), v.getMapped());
if (this->hasZero())
{
auto * zero_value = this->zeroValue();
std::forward<Func>(func)(zero_value->getKey(), zero_value->getMapped());
}
size_t buf_size = this->grower.bufSize();
for (size_t i = 0; i < buf_size; ++i)
{
auto * it = (this->buf + i);
if (it->isZero(*this))
continue;
std::forward<Func>(func)(it->getKey(), it->getMapped());
}
}
/// Call func(Mapped &) for each hash map element.
template <typename Func>
void forEachMapped(Func && func)
{
for (auto & v : *this)
func(v.getMapped());
forEachValue([&](auto &, auto & mapped)
{
std::forward<Func>(func)(mapped);
});
}
typename Cell::Mapped & ALWAYS_INLINE operator[](const Key & x)

View File

@ -179,8 +179,7 @@ void Aggregator::Params::explain(WriteBuffer & out, size_t indent) const
}
Aggregator::Aggregator(const Params & params_)
: params(params_),
isCancelled([]() { return false; })
: params(params_)
{
/// Use query-level memory tracker
if (auto * memory_tracker_child = CurrentThread::getMemoryTracker())
@ -575,11 +574,11 @@ void NO_INLINE Aggregator::executeWithoutKeyImpl(
void NO_INLINE Aggregator::executeOnIntervalWithoutKeyImpl(
AggregatedDataWithoutKey & res,
size_t row_begin,
size_t row_end,
AggregateFunctionInstruction * aggregate_instructions,
Arena * arena)
AggregatedDataWithoutKey & res,
size_t row_begin,
size_t row_end,
AggregateFunctionInstruction * aggregate_instructions,
Arena * arena)
{
/// Adding values
for (AggregateFunctionInstruction * inst = aggregate_instructions; inst->that; ++inst)
@ -618,12 +617,12 @@ void Aggregator::prepareAggregateInstructions(Columns columns, AggregateColumns
aggregate_functions_instructions[i].arguments = aggregate_columns[i].data();
aggregate_functions_instructions[i].state_offset = offsets_of_aggregate_states[i];
auto * that = aggregate_functions[i];
/// Unnest consecutive trailing -State combinators
while (const auto * func = typeid_cast<const AggregateFunctionState *>(that))
that = func->getNestedFunction().get();
aggregate_functions_instructions[i].that = that;
aggregate_functions_instructions[i].func = that->getAddressOfAddFunction();
if (const auto * func = typeid_cast<const AggregateFunctionArray *>(that))
{
@ -655,9 +654,6 @@ bool Aggregator::executeOnBlock(const Block & block, AggregatedDataVariants & re
bool Aggregator::executeOnBlock(Columns columns, UInt64 num_rows, AggregatedDataVariants & result,
ColumnRawPtrs & key_columns, AggregateColumns & aggregate_columns, bool & no_more_keys)
{
if (isCancelled())
return true;
/// `result` will destroy the states of aggregate functions in the destructor
result.aggregator = this;
@ -670,9 +666,6 @@ bool Aggregator::executeOnBlock(Columns columns, UInt64 num_rows, AggregatedData
LOG_TRACE(log, "Aggregation method: {}", result.getMethodName());
}
if (isCancelled())
return true;
/** Constant columns are not supported directly during aggregation.
* To make them work anyway, we materialize them.
*/
@ -698,9 +691,6 @@ bool Aggregator::executeOnBlock(Columns columns, UInt64 num_rows, AggregatedData
AggregateFunctionInstructions aggregate_functions_instructions;
prepareAggregateInstructions(columns, aggregate_columns, materialized_columns, aggregate_functions_instructions, nested_columns_holder);
if (isCancelled())
return true;
if ((params.overflow_row || result.type == AggregatedDataVariants::Type::without_key) && !result.without_key)
{
AggregateDataPtr place = result.aggregates_pool->alignedAlloc(total_size_of_aggregate_states, align_aggregate_states);
@ -710,6 +700,7 @@ bool Aggregator::executeOnBlock(Columns columns, UInt64 num_rows, AggregatedData
/// We select one of the aggregation methods and call it.
/// For the case when there are no keys (all aggregate into one row).
if (result.type == AggregatedDataVariants::Type::without_key)
{
@ -1425,9 +1416,6 @@ BlocksList Aggregator::prepareBlocksAndFillTwoLevelImpl(
BlocksList Aggregator::convertToBlocks(AggregatedDataVariants & data_variants, bool final, size_t max_threads) const
{
if (isCancelled())
return BlocksList();
LOG_TRACE(log, "Converting aggregated data to blocks");
Stopwatch watch;
@ -1443,16 +1431,10 @@ BlocksList Aggregator::convertToBlocks(AggregatedDataVariants & data_variants, b
&& data_variants.isTwoLevel()) /// TODO Use the shared thread pool with the `merge` function.
thread_pool = std::make_unique<ThreadPool>(max_threads);
if (isCancelled())
return BlocksList();
if (data_variants.without_key)
blocks.emplace_back(prepareBlockAndFillWithoutKey(
data_variants, final, data_variants.type != AggregatedDataVariants::Type::without_key));
if (isCancelled())
return BlocksList();
if (data_variants.type != AggregatedDataVariants::Type::without_key)
{
if (!data_variants.isTwoLevel())
@ -1468,9 +1450,6 @@ BlocksList Aggregator::convertToBlocks(AggregatedDataVariants & data_variants, b
data_variants.aggregator = nullptr;
}
if (isCancelled())
return BlocksList();
size_t rows = 0;
size_t bytes = 0;
@ -1536,7 +1515,7 @@ void NO_INLINE Aggregator::mergeDataImpl(
mergeDataNullKey<Method, Table>(table_dst, table_src, arena);
table_src.mergeToViaEmplace(table_dst,
[&](AggregateDataPtr & dst, AggregateDataPtr & src, bool inserted)
[&](AggregateDataPtr & __restrict dst, AggregateDataPtr & __restrict src, bool inserted)
{
if (!inserted)
{
@ -1911,9 +1890,6 @@ void Aggregator::mergeBlocks(BucketToBlocks bucket_to_blocks, AggregatedDataVari
#undef M
}
if (isCancelled())
return;
/// result will destroy the states of aggregate functions in the destructor
result.aggregator = this;
@ -1940,9 +1916,6 @@ void Aggregator::mergeBlocks(BucketToBlocks bucket_to_blocks, AggregatedDataVari
for (Block & block : bucket_to_blocks[bucket])
{
if (isCancelled())
return;
#define M(NAME) \
else if (result.type == AggregatedDataVariants::Type::NAME) \
mergeStreamsImpl(block, aggregates_pool, *result.NAME, result.NAME->data.impls[bucket], nullptr, false);
@ -1983,12 +1956,6 @@ void Aggregator::mergeBlocks(BucketToBlocks bucket_to_blocks, AggregatedDataVari
LOG_TRACE(log, "Merged partially aggregated two-level data.");
}
if (isCancelled())
{
result.invalidate();
return;
}
if (has_blocks_with_unknown_bucket)
{
LOG_TRACE(log, "Merging partially aggregated single-level data.");
@ -1998,12 +1965,6 @@ void Aggregator::mergeBlocks(BucketToBlocks bucket_to_blocks, AggregatedDataVari
BlocksList & blocks = bucket_to_blocks[-1];
for (Block & block : blocks)
{
if (isCancelled())
{
result.invalidate();
return;
}
if (!checkLimits(result.sizeWithoutOverflowRow(), no_more_keys))
break;
@ -2071,9 +2032,6 @@ Block Aggregator::mergeBlocks(BlocksList & blocks, bool final)
for (Block & block : blocks)
{
if (isCancelled())
return {};
if (bucket_num >= 0 && block.info.bucket_num != bucket_num)
bucket_num = -1;
@ -2090,9 +2048,6 @@ Block Aggregator::mergeBlocks(BlocksList & blocks, bool final)
throw Exception("Unknown aggregated data variant.", ErrorCodes::UNKNOWN_AGGREGATED_DATA_VARIANT);
}
if (isCancelled())
return {};
Block block;
if (result.type == AggregatedDataVariants::Type::without_key || is_overflows)
block = prepareBlockAndFillWithoutKey(result, final, is_overflows);
@ -2114,9 +2069,6 @@ Block Aggregator::mergeBlocks(BlocksList & blocks, bool final)
elapsed_seconds, rows / elapsed_seconds,
ReadableSize(bytes / elapsed_seconds));
if (isCancelled())
return {};
block.info.bucket_num = bucket_num;
return block;
}
@ -2295,10 +2247,4 @@ void Aggregator::destroyAllAggregateStates(AggregatedDataVariants & result)
}
void Aggregator::setCancellationHook(const CancellationHook & cancellation_hook)
{
isCancelled = cancellation_hook;
}
}

View File

@ -867,7 +867,7 @@ using ManyAggregatedDataVariantsPtr = std::shared_ptr<ManyAggregatedDataVariants
/** Aggregates the source of the blocks.
*/
class Aggregator
class Aggregator final
{
public:
struct Params
@ -952,7 +952,7 @@ public:
void explain(WriteBuffer & out, size_t indent) const;
};
Aggregator(const Params & params_);
explicit Aggregator(const Params & params_);
using AggregateColumns = std::vector<ColumnRawPtrs>;
using AggregateColumnsData = std::vector<ColumnAggregateFunction::Container *>;
@ -994,12 +994,6 @@ public:
*/
std::vector<Block> convertBlockToTwoLevel(const Block & block);
using CancellationHook = std::function<bool()>;
/** Set a function that checks whether the current task can be aborted.
*/
void setCancellationHook(const CancellationHook & cancellation_hook);
/// For external aggregation.
void writeToTemporaryFile(AggregatedDataVariants & data_variants, const String & tmp_path);
void writeToTemporaryFile(AggregatedDataVariants & data_variants);
@ -1025,7 +1019,8 @@ public:
/// Get data structure of the result.
Block getHeader(bool final) const;
protected:
private:
friend struct AggregatedDataVariants;
friend class ConvertingAggregatedToChunksTransform;
friend class ConvertingAggregatedToChunksSource;
@ -1042,17 +1037,12 @@ protected:
/** This array serves two purposes.
*
* 1. Function arguments are collected side by side, and they do not need to be collected from different places. Also the array is made zero-terminated.
* Function arguments are collected side by side, and they do not need to be collected from different places. Also the array is made zero-terminated.
* The inner loop (for the case without_key) is almost twice as compact; performance gain of about 30%.
*
* 2. Calling a function by pointer is better than a virtual call, because in the case of a virtual call,
* GCC 5.1.2 generates code that, at each iteration of the loop, reloads the function address from memory into the register
* (the offset value in the virtual function table).
*/
struct AggregateFunctionInstruction
{
const IAggregateFunction * that;
IAggregateFunction::AddFunc func;
size_t state_offset;
const IColumn ** arguments;
const IAggregateFunction * batch_that;
@ -1075,13 +1065,8 @@ protected:
/// How many RAM were used to process the query before processing the first block.
Int64 memory_usage_before_aggregation = 0;
std::mutex mutex;
Poco::Logger * log = &Poco::Logger::get("Aggregator");
/// Returns true if you can abort the current task.
CancellationHook isCancelled;
/// For external aggregation.
TemporaryFiles temporary_files;
@ -1139,7 +1124,6 @@ protected:
Method & method,
IBlockOutputStream & out);
protected:
/// Merge NULL key data from hash table `src` into `dst`.
template <typename Method, typename Table>
void mergeDataNullKey(