optimize memory consumption

This commit is contained in:
CurtizJ 2018-09-05 17:39:51 +03:00
parent 18da41afe3
commit 35cbdcda5b
2 changed files with 27 additions and 40 deletions

View File

@ -24,9 +24,12 @@ static void finalize(Block & block)
}
RollupBlockInputStream::RollupBlockInputStream(
const BlockInputStreamPtr & input_, const Aggregator::Params & params_) : params(params_)
const BlockInputStreamPtr & input_, const Aggregator::Params & params_) : aggregator(params_),
keys(params_.keys)
{
children.push_back(input_);
Aggregator::CancellationHook hook = [&]() { return this->isCancelled(); };
aggregator.setCancellationHook(hook);
}
@ -40,46 +43,26 @@ Block RollupBlockInputStream::getHeader() const
Block RollupBlockInputStream::readImpl()
{
Block block;
while(1)
if (current_key >= 0)
{
if (!blocks.empty())
{
auto finalized = std::move(blocks.front());
finalize(finalized);
blocks.pop_front();
return finalized;
}
auto & current = rollup_block.getByPosition(keys[current_key]);
current.column = current.column->cloneEmpty()->cloneResized(rollup_block.rows());
--current_key;
block = children[0]->read();
BlocksList rollup_blocks = { rollup_block };
rollup_block = aggregator.mergeBlocks(rollup_blocks, false);
if (!block)
return block;
Aggregator aggregator(params);
Aggregator::CancellationHook hook = [&]() { return this->isCancelled(); };
aggregator.setCancellationHook(hook);
Block rollup_block = block;
for (ssize_t i = params.keys_size - 1; i >= 0; --i)
{
auto & current = rollup_block.getByPosition(params.keys[i]);
current.column = current.column->cloneEmpty()->cloneResized(rollup_block.rows());
BlocksList rollup_blocks = { rollup_block };
rollup_block = aggregator.mergeBlocks(rollup_blocks, false);
blocks.push_back(rollup_block);
}
finalize(block);
if (!block)
continue;
return block;
Block finalized = rollup_block;
finalize(finalized);
return finalized;
}
current_block = children[0]->read();
current_key = keys.size() - 1;
rollup_block = current_block;
finalize(current_block);
return current_block;
}
}

View File

@ -3,6 +3,7 @@
#include <DataStreams/IProfilingBlockInputStream.h>
#include <Common/Arena.h>
#include <Interpreters/Aggregator.h>
#include <Core/ColumnNumbers.h>
namespace DB
@ -31,8 +32,11 @@ protected:
Block readImpl() override;
private:
Aggregator::Params params;
BlocksList blocks;
Aggregator aggregator;
ColumnNumbers keys;
ssize_t current_key = -1;
Block current_block;
Block rollup_block;
};
}