Preparation [#CLICKHOUSE-2]

This commit is contained in:
Alexey Milovidov 2018-09-08 22:23:48 +03:00
parent b6d3ab4362
commit 4ec18956c7
5 changed files with 51 additions and 37 deletions

View File

@ -23,9 +23,9 @@ Block SquashingBlockInputStream::readImpl()
if (!block)
all_read = true;
SquashingTransform::Result result = transform.add(std::move(block));
SquashingTransform::Result result = transform.add(block.mutateColumns());
if (result.ready)
return result.block;
return block.cloneWithColumns(std::move(result.columns));
}
}

View File

@ -5,16 +5,16 @@ namespace DB
{
SquashingBlockOutputStream::SquashingBlockOutputStream(BlockOutputStreamPtr & dst, size_t min_block_size_rows, size_t min_block_size_bytes)
: output(dst), transform(min_block_size_rows, min_block_size_bytes)
: output(dst), header(output->getHeader()), transform(min_block_size_rows, min_block_size_bytes)
{
}
void SquashingBlockOutputStream::write(const Block & block)
{
SquashingTransform::Result result = transform.add(Block(block));
SquashingTransform::Result result = transform.add(block.mutateColumns());
if (result.ready)
output->write(result.block);
output->write(header.cloneWithColumns(std::move(result.columns)));
}
@ -26,8 +26,8 @@ void SquashingBlockOutputStream::finalize()
all_written = true;
SquashingTransform::Result result = transform.add({});
if (result.ready && result.block)
output->write(result.block);
if (result.ready && !result.columns.empty())
output->write(header.cloneWithColumns(std::move(result.columns)));
}

View File

@ -14,7 +14,7 @@ class SquashingBlockOutputStream : public IBlockOutputStream
public:
SquashingBlockOutputStream(BlockOutputStreamPtr & dst, size_t min_block_size_rows, size_t min_block_size_bytes);
Block getHeader() const override { return output->getHeader(); }
Block getHeader() const override { return header; }
void write(const Block & block) override;
void flush() override;
@ -26,6 +26,7 @@ public:
private:
BlockOutputStreamPtr output;
Block header;
SquashingTransform transform;
bool all_written = false;

View File

@ -10,37 +10,37 @@ SquashingTransform::SquashingTransform(size_t min_block_size_rows, size_t min_bl
}
SquashingTransform::Result SquashingTransform::add(Block && block)
SquashingTransform::Result SquashingTransform::add(MutableColumns && columns)
{
if (!block)
return Result(std::move(accumulated_block));
if (columns.empty())
return Result(std::move(accumulated_columns));
/// Just read block is alredy enough.
if (isEnoughSize(block.rows(), block.bytes()))
if (isEnoughSize(columns))
{
/// If no accumulated data, return just read block.
if (!accumulated_block)
return Result(std::move(block));
if (accumulated_columns.empty())
return Result(std::move(columns));
/// Return accumulated data (may be it has small size) and place new block to accumulated data.
accumulated_block.swap(block);
return Result(std::move(block));
columns.swap(accumulated_columns);
return Result(std::move(columns));
}
/// Accumulated block is already enough.
if (accumulated_block && isEnoughSize(accumulated_block.rows(), accumulated_block.bytes()))
if (!accumulated_columns.empty() && isEnoughSize(accumulated_columns))
{
/// Return accumulated data and place new block to accumulated data.
accumulated_block.swap(block);
return Result(std::move(block));
columns.swap(accumulated_columns);
return Result(std::move(columns));
}
append(std::move(block));
append(std::move(columns));
if (isEnoughSize(accumulated_block.rows(), accumulated_block.bytes()))
if (isEnoughSize(accumulated_columns))
{
Block res;
res.swap(accumulated_block);
MutableColumns res;
res.swap(accumulated_columns);
return Result(std::move(res));
}
@ -49,23 +49,35 @@ SquashingTransform::Result SquashingTransform::add(Block && block)
}
void SquashingTransform::append(Block && block)
void SquashingTransform::append(MutableColumns && columns)
{
if (!accumulated_block)
if (accumulated_columns.empty())
{
accumulated_block = std::move(block);
accumulated_columns = std::move(columns);
return;
}
size_t columns = block.columns();
size_t rows = block.rows();
for (size_t i = 0, size = columns.size(); i < size; ++i)
accumulated_columns[i]->insertRangeFrom(*columns[i], 0, columns[i]->size());
}
for (size_t i = 0; i < columns; ++i)
bool SquashingTransform::isEnoughSize(const MutableColumns & columns)
{
size_t rows = 0;
size_t bytes = 0;
for (const auto & column : columns)
{
MutableColumnPtr mutable_column = (*std::move(accumulated_block.getByPosition(i).column)).mutate();
mutable_column->insertRangeFrom(*block.getByPosition(i).column, 0, rows);
accumulated_block.getByPosition(i).column = std::move(mutable_column);
if (!rows)
rows = column->size();
else if (rows != column->size())
throw Exception("Sizes of columns doesn't match", ErrorCodes::SIZES_OF_COLUMNS_DOESNT_MATCH);
bytes += column->byteSize();
}
return isEnoughSize(rows, bytes);
}

View File

@ -29,25 +29,26 @@ public:
struct Result
{
bool ready = false;
Block block;
MutableColumns columns;
Result(bool ready_) : ready(ready_) {}
Result(Block && block_) : ready(true), block(std::move(block_)) {}
Result(MutableColumns && columns) : ready(true), columns(std::move(columns)) {}
};
/** Add next block and possibly returns squashed block.
* At end, you need to pass empty block. As the result for last (empty) block, you will get last Result with ready = true.
*/
Result add(Block && block);
Result add(MutableColumns && columns);
private:
size_t min_block_size_rows;
size_t min_block_size_bytes;
Block accumulated_block;
MutableColumns accumulated_columns;
void append(Block && block);
void append(MutableColumns && columns);
bool isEnoughSize(const MutableColumns & columns);
bool isEnoughSize(size_t rows, size_t bytes) const;
};