Squashing transforms: development [#METR-21877].

This commit is contained in:
Alexey Milovidov 2016-07-07 00:48:11 +03:00
parent 01b1503faa
commit 5f81c096f6
5 changed files with 126 additions and 70 deletions

View File

@ -266,6 +266,7 @@ add_library (dbms
include/DB/DataStreams/CSVRowOutputStream.h
include/DB/DataStreams/CSVRowInputStream.h
include/DB/DataStreams/verbosePrintString.h
include/DB/DataStreams/SquashingTransform.h
include/DB/DataStreams/SquashingBlockInputStream.h
include/DB/DataTypes/IDataType.h
include/DB/DataTypes/IDataTypeDummy.h
@ -745,6 +746,7 @@ add_library (dbms
src/DataStreams/RemoteBlockInputStream.cpp
src/DataStreams/BlockIO.cpp
src/DataStreams/verbosePrintString.cpp
src/DataStreams/SquashingTransform.cpp
src/DataStreams/SquashingBlockInputStream.cpp
src/DataTypes/DataTypeString.cpp

View File

@ -1,6 +1,7 @@
#pragma once
#include <DB/DataStreams/IProfilingBlockInputStream.h>
#include <DB/DataStreams/SquashingTransform.h>
namespace DB
@ -37,15 +38,7 @@ protected:
Block readImpl() override;
private:
size_t min_block_size_rows;
size_t min_block_size_bytes;
Block accumulated_block;
bool all_read = false;
void append(Block && block);
bool isEnoughSize(size_t rows, size_t bytes) const;
SquashingTransform transform;
};
}

View File

@ -0,0 +1,35 @@
#include <DB/Core/Block.h>
namespace DB
{
class SquashingTransform
{
public:
SquashingTransform(size_t min_block_size_rows, size_t min_block_size_bytes);
struct Result
{
bool ready = false;
Block block;
Result(bool ready_) : ready(ready_) {}
Result(Block && block_) : ready(true), block(std::move(block_)) {}
};
Result add(Block & block);
private:
size_t min_block_size_rows;
size_t min_block_size_bytes;
Block accumulated_block;
bool all_read = false;
void append(Block && block);
bool isEnoughSize(size_t rows, size_t bytes) const;
};
}

View File

@ -5,7 +5,7 @@ namespace DB
{
SquashingBlockInputStream::SquashingBlockInputStream(BlockInputStreamPtr & src, size_t min_block_size_rows, size_t min_block_size_bytes)
: min_block_size_rows(min_block_size_rows), min_block_size_bytes(min_block_size_bytes)
: transform(min_block_size_rows, min_block_size_bytes)
{
children.emplace_back(src);
}
@ -13,68 +13,12 @@ SquashingBlockInputStream::SquashingBlockInputStream(BlockInputStreamPtr & src,
Block SquashingBlockInputStream::readImpl()
{
if (all_read)
return {};
while (Block block = children[0]->read())
while (true)
{
/// Just read block is alredy enough.
if (isEnoughSize(block.rowsInFirstColumn(), block.bytes()))
{
/// If no accumulated data, return just read block.
if (!accumulated_block)
return block;
/// Return accumulated data (may be it has small size) and place new block to accumulated data.
accumulated_block.swap(block);
return block;
}
/// Accumulated block is already enough.
if (accumulated_block && isEnoughSize(accumulated_block.rowsInFirstColumn(), accumulated_block.bytes()))
{
/// Return accumulated data and place new block to accumulated data.
accumulated_block.swap(block);
return block;
}
append(std::move(block));
if (isEnoughSize(accumulated_block.rowsInFirstColumn(), accumulated_block.bytes()))
{
Block res;
res.swap(accumulated_block);
return res;
}
SquashingTransform::Result result = transform.add(children[0]->read());
if (result.ready)
return result.block;
}
all_read = true;
return accumulated_block;
}
void SquashingBlockInputStream::append(Block && block)
{
if (!accumulated_block)
{
accumulated_block = std::move(block);
return;
}
size_t columns = block.columns();
size_t rows = block.rowsInFirstColumn();
for (size_t i = 0; i < columns; ++i)
accumulated_block.unsafeGetByPosition(i).column->insertRangeFrom(
*block.unsafeGetByPosition(i).column, 0, rows);
}
bool SquashingBlockInputStream::isEnoughSize(size_t rows, size_t bytes) const
{
return (!min_block_size_rows && !min_block_size_bytes)
|| (min_block_size_rows && rows >= min_block_size_rows)
|| (min_block_size_bytes && bytes >= min_block_size_bytes);
}
}

View File

@ -0,0 +1,82 @@
#include <DB/DataStreams/SquashingTransform.h>
namespace DB
{
SquashingTransform::SquashingTransform(size_t min_block_size_rows, size_t min_block_size_bytes)
: min_block_size_rows(min_block_size_rows), min_block_size_bytes(min_block_size_bytes)
{
}
SquashingTransform::Result SquashingTransform::add(Block & block)
{
if (all_read)
return true;
if (!block)
{
all_read = true;
return Result(std::move(accumulated_block));
}
/// Just read block is alredy enough.
if (isEnoughSize(block.rowsInFirstColumn(), block.bytes()))
{
/// If no accumulated data, return just read block.
if (!accumulated_block)
return Result(std::move(block));
/// Return accumulated data (may be it has small size) and place new block to accumulated data.
accumulated_block.swap(block);
return Result(std::move(block));
}
/// Accumulated block is already enough.
if (accumulated_block && isEnoughSize(accumulated_block.rowsInFirstColumn(), accumulated_block.bytes()))
{
/// Return accumulated data and place new block to accumulated data.
accumulated_block.swap(block);
return Result(std::move(block));
}
append(std::move(block));
if (isEnoughSize(accumulated_block.rowsInFirstColumn(), accumulated_block.bytes()))
{
Block res;
res.swap(accumulated_block);
return Result(std::move(res));
}
/// Squashed block is not ready.
return false;
}
void SquashingTransform::append(Block && block)
{
if (!accumulated_block)
{
accumulated_block = std::move(block);
return;
}
size_t columns = block.columns();
size_t rows = block.rowsInFirstColumn();
for (size_t i = 0; i < columns; ++i)
accumulated_block.unsafeGetByPosition(i).column->insertRangeFrom(
*block.unsafeGetByPosition(i).column, 0, rows);
}
bool SquashingTransform::isEnoughSize(size_t rows, size_t bytes) const
{
return (!min_block_size_rows && !min_block_size_bytes)
|| (min_block_size_rows && rows >= min_block_size_rows)
|| (min_block_size_bytes && bytes >= min_block_size_bytes);
}
}