#include #include namespace DB { namespace ErrorCodes { extern const int LOGICAL_ERROR; } CubeTransform::CubeTransform(Block header, AggregatingTransformParamsPtr params_) : IAccumulatingTransform(std::move(header), params_->getHeader()) , params(std::move(params_)) , keys(params->params.keys) { if (keys.size() >= 8 * sizeof(mask)) throw Exception("Too many keys are used for CubeTransform.", ErrorCodes::LOGICAL_ERROR); } Chunk CubeTransform::merge(Chunks && chunks, bool final) { BlocksList rollup_blocks; for (auto & chunk : chunks) rollup_blocks.emplace_back(getInputPort().getHeader().cloneWithColumns(chunk.detachColumns())); auto rollup_block = params->aggregator.mergeBlocks(rollup_blocks, final); auto num_rows = rollup_block.rows(); return Chunk(rollup_block.getColumns(), num_rows); } void CubeTransform::consume(Chunk chunk) { consumed_chunks.emplace_back(std::move(chunk)); } Chunk CubeTransform::generate() { if (!consumed_chunks.empty()) { if (consumed_chunks.size() > 1) cube_chunk = merge(std::move(consumed_chunks), false); else cube_chunk = std::move(consumed_chunks.front()); consumed_chunks.clear(); auto num_rows = cube_chunk.getNumRows(); mask = (static_cast(1) << keys.size()) - 1; current_columns = cube_chunk.getColumns(); current_zero_columns.clear(); current_zero_columns.reserve(keys.size()); for (auto key : keys) current_zero_columns.emplace_back(current_columns[key]->cloneEmpty()->cloneResized(num_rows)); } auto gen_chunk = std::move(cube_chunk); if (mask) { --mask; auto columns = current_columns; auto size = keys.size(); for (size_t i = 0; i < size; ++i) /// Reverse bit order to support previous behaviour. if ((mask & (UInt64(1) << (size - i - 1))) == 0) columns[keys[i]] = current_zero_columns[i]; Chunks chunks; chunks.emplace_back(std::move(columns), current_columns.front()->size()); cube_chunk = merge(std::move(chunks), false); } finalizeChunk(gen_chunk); return gen_chunk; } }