2018-09-17 18:01:04 +00:00
|
|
|
#include <DataStreams/CubeBlockInputStream.h>
|
|
|
|
#include <DataStreams/finalizeBlock.h>
|
|
|
|
#include <DataTypes/DataTypeAggregateFunction.h>
|
|
|
|
#include <Columns/ColumnAggregateFunction.h>
|
|
|
|
#include <Columns/FilterDescription.h>
|
|
|
|
#include <Common/typeid_cast.h>
|
|
|
|
|
|
|
|
namespace DB
|
|
|
|
{
|
|
|
|
|
2018-09-17 19:16:51 +00:00
|
|
|
namespace ErrorCodes
|
|
|
|
{
|
|
|
|
extern const int TOO_MANY_COLUMNS;
|
|
|
|
}
|
|
|
|
|
2018-09-17 18:01:04 +00:00
|
|
|
CubeBlockInputStream::CubeBlockInputStream(
|
|
|
|
const BlockInputStreamPtr & input_, const Aggregator::Params & params_) : aggregator(params_),
|
|
|
|
keys(params_.keys)
|
|
|
|
{
|
2018-09-17 19:16:51 +00:00
|
|
|
if (keys.size() > 30)
|
|
|
|
throw Exception("Too many columns for cube", ErrorCodes::TOO_MANY_COLUMNS);
|
|
|
|
|
2018-09-17 18:01:04 +00:00
|
|
|
children.push_back(input_);
|
|
|
|
Aggregator::CancellationHook hook = [this]() { return this->isCancelled(); };
|
|
|
|
aggregator.setCancellationHook(hook);
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
|
|
Block CubeBlockInputStream::getHeader() const
|
|
|
|
{
|
|
|
|
Block res = children.at(0)->getHeader();
|
|
|
|
finalizeBlock(res);
|
|
|
|
return res;
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
|
|
Block CubeBlockInputStream::readImpl()
|
|
|
|
{
|
|
|
|
/** After reading a block from input stream,
|
2018-09-19 11:18:38 +00:00
|
|
|
* we will calculate all subsets of columns on next iterations of readImpl
|
|
|
|
* by zeroing columns at positions, where bits are zero in current bitmask.
|
2018-09-17 18:01:04 +00:00
|
|
|
*/
|
2018-09-20 15:46:37 +00:00
|
|
|
if (mask)
|
2018-09-17 18:01:04 +00:00
|
|
|
{
|
|
|
|
--mask;
|
|
|
|
Block cube_block = source_block;
|
2018-09-20 15:46:37 +00:00
|
|
|
for (size_t i = 0; i < keys.size(); ++i)
|
2018-09-17 18:01:04 +00:00
|
|
|
{
|
2018-09-20 15:46:37 +00:00
|
|
|
if (!((mask >> i) & 1))
|
2018-09-17 18:01:04 +00:00
|
|
|
{
|
2018-09-20 10:44:13 +00:00
|
|
|
size_t pos = keys.size() - i - 1;
|
|
|
|
auto & current = cube_block.getByPosition(keys[pos]);
|
2018-09-20 15:46:37 +00:00
|
|
|
current.column = zero_block.getByPosition(pos).column;
|
2018-09-17 18:01:04 +00:00
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
BlocksList cube_blocks = { cube_block };
|
|
|
|
Block finalized = aggregator.mergeBlocks(cube_blocks, true);
|
|
|
|
return finalized;
|
|
|
|
}
|
|
|
|
|
|
|
|
source_block = children[0]->read();
|
2018-09-20 15:46:37 +00:00
|
|
|
if (!source_block)
|
|
|
|
return source_block;
|
|
|
|
|
|
|
|
zero_block = source_block.cloneEmpty();
|
|
|
|
for (auto key : keys)
|
|
|
|
{
|
|
|
|
auto & current = zero_block.getByPosition(key);
|
|
|
|
current.column = current.column->cloneResized(source_block.rows());
|
|
|
|
}
|
2018-09-17 18:01:04 +00:00
|
|
|
Block finalized = source_block;
|
|
|
|
finalizeBlock(finalized);
|
|
|
|
mask = (1 << keys.size()) - 1;
|
|
|
|
|
|
|
|
return finalized;
|
|
|
|
}
|
|
|
|
}
|