Added Chunkclass to move columns between ports.

This commit is contained in:
Nikolai Kochetov 2019-02-18 19:36:07 +03:00
parent b6fd31865e
commit e658aff315
21 changed files with 265 additions and 108 deletions

View File

@ -29,7 +29,7 @@ IOutputFormat::Status IOutputFormat::prepare()
if (!input.hasData())
return Status::NeedData;
current_block = input.pull();
current_chunk = input.pull();
current_block_kind = kind;
has_input = true;
return Status::Ready;
@ -43,13 +43,13 @@ void IOutputFormat::work()
switch (current_block_kind)
{
case Main:
consume(std::move(current_block));
consume(std::move(current_chunk));
break;
case Totals:
consumeTotals(std::move(current_block));
consumeTotals(std::move(current_chunk));
break;
case Extremes:
consumeExtremes(std::move(current_block));
consumeExtremes(std::move(current_chunk));
break;
}

View File

@ -26,13 +26,13 @@ public:
protected:
WriteBuffer & out;
Block current_block;
Chunk current_chunk;
PortKind current_block_kind;
bool has_input = false;
virtual void consume(Block) = 0;
virtual void consumeTotals(Block) {}
virtual void consumeExtremes(Block) {}
virtual void consume(Chunk) = 0;
virtual void consumeTotals(Chunk) {}
virtual void consumeExtremes(Chunk) {}
public:
IOutputFormat(Block header, WriteBuffer & out);

View File

@ -31,7 +31,7 @@ static bool isParseError(int code)
}
Block IRowInputFormat::generate()
Chunk IRowInputFormat::generate()
{
if (total_rows == 0)
readPrefix();
@ -40,6 +40,7 @@ Block IRowInputFormat::generate()
size_t num_columns = header.columns();
MutableColumns columns = header.cloneEmptyColumns();
size_t prev_rows = total_rows;
try
{
@ -121,7 +122,9 @@ Block IRowInputFormat::generate()
return {};
}
return header.cloneWithColumns(std::move(columns));
Chunk chunk;
chunk.setColumns(std::move(columns), total_rows - prev_rows);
return chunk;
}
}

View File

@ -30,7 +30,7 @@ public:
{
}
Block generate() override;
Chunk generate() override;
protected:
/** Read next row and append it to the columns.

View File

@ -16,9 +16,9 @@ class IRowOutputFormat : public IOutputFormat
protected:
DataTypes types;
void consume(Block block) override;
void consumeTotals(Block block) override;
void consumeExtremes(Block block) override;
void consume(Chunk chunk) override;
void consumeTotals(Chunk chunk) override;
void consumeExtremes(Chunk chunk) override;
public:
IRowOutputFormat(Block header, WriteBuffer & out)

View File

@ -26,8 +26,8 @@ IAccumulatingTransform::Status IAccumulatingTransform::prepare()
}
/// Output if has data.
if (current_output_block)
output.push(std::move(current_output_block));
if (current_output_chunk)
output.push(std::move(current_output_chunk));
if (finished_generate)
{
@ -49,7 +49,7 @@ IAccumulatingTransform::Status IAccumulatingTransform::prepare()
if (!input.hasData())
return Status::NeedData;
current_input_block = input.pull();
current_input_chunk = input.pull();
has_input = true;
}
@ -60,13 +60,13 @@ void IAccumulatingTransform::work()
{
if (!finished_input)
{
consume(std::move(current_input_block));
consume(std::move(current_input_chunk));
has_input = false;
}
else
{
current_output_block = generate();
if (!current_output_block)
current_output_chunk = generate();
if (!current_output_chunk)
finished_generate = true;
}
}

View File

@ -16,14 +16,14 @@ protected:
InputPort & input;
OutputPort & output;
Block current_input_block;
Block current_output_block;
Chunk current_input_chunk;
Chunk current_output_chunk;
bool has_input = false;
bool finished_input = false;
bool finished_generate = false;
virtual void consume(Block block) = 0;
virtual Block generate() = 0;
virtual void consume(Chunk chunk) = 0;
virtual Chunk generate() = 0;
public:
IAccumulatingTransform(Block input_header, Block output_header);

View File

@ -29,7 +29,7 @@ ISimpleTransform::Status ISimpleTransform::prepare()
/// Output if has data.
if (transformed)
{
output.push(std::move(current_block));
output.push(std::move(current_chunk));
has_input = false;
transformed = false;
}
@ -48,7 +48,7 @@ ISimpleTransform::Status ISimpleTransform::prepare()
if (!input.hasData())
return Status::NeedData;
current_block = input.pull();
current_chunk = input.pull();
has_input = true;
}
@ -58,7 +58,7 @@ ISimpleTransform::Status ISimpleTransform::prepare()
void ISimpleTransform::work()
{
transform(current_block);
transform(current_chunk);
transformed = true;
}

View File

@ -15,11 +15,11 @@ protected:
InputPort & input;
OutputPort & output;
Block current_block;
Chunk current_chunk;
bool has_input = false;
bool transformed = false;
virtual void transform(Block & block) = 0;
virtual void transform(Chunk & chunk) = 0;
public:
ISimpleTransform(Block input_header, Block output_header);

View File

@ -21,14 +21,14 @@ ISink::Status ISink::prepare()
if (!input.hasData())
return Status::NeedData;
current_block = input.pull();
current_chunk = input.pull();
has_input = true;
return Status::Ready;
}
void ISink::work()
{
consume(std::move(current_block));
consume(std::move(current_chunk));
has_input = false;
}

View File

@ -10,10 +10,10 @@ class ISink : public IProcessor
{
protected:
InputPort & input;
Block current_block;
Chunk current_chunk;
bool has_input;
virtual void consume(Block block) = 0;
virtual void consume(Chunk block) = 0;
public:
ISink(Block header);

View File

@ -27,7 +27,7 @@ ISource::Status ISource::prepare()
if (!has_input)
return Status::Ready;
output.push(std::move(current_block));
output.push(std::move(current_chunk));
has_input = false;
/// Now, we pushed to output, and it must be full.
@ -36,8 +36,8 @@ ISource::Status ISource::prepare()
void ISource::work()
{
current_block = generate();
if (!current_block)
current_chunk = generate();
if (!current_chunk)
finished = true;
else
has_input = true;

View File

@ -12,9 +12,9 @@ protected:
OutputPort & output;
bool has_input = false;
bool finished = false;
Block current_block;
Chunk current_chunk;
virtual Block generate() = 0;
virtual Chunk generate() = 0;
public:
ISource(Block header);

View File

@ -31,7 +31,7 @@ LimitTransform::Status LimitTransform::prepare()
/// Push block if can.
if (block_processed)
{
output.push(std::move(current_block));
output.push(std::move(current_chunk));
has_block = false;
block_processed = false;
}
@ -60,13 +60,13 @@ LimitTransform::Status LimitTransform::prepare()
if (!input.hasData())
return Status::NeedData;
current_block = input.pull();
current_chunk = input.pull();
has_block = true;
/// Skip block (for 'always_read_till_end' case).
if (pushing_is_finished)
{
current_block.clear();
current_chunk.clear();
has_block = false;
/// Now, we pulled from input, and it must be empty.
@ -75,12 +75,12 @@ LimitTransform::Status LimitTransform::prepare()
/// Process block.
size_t rows = current_block.rows();
size_t rows = current_chunk.getNumRows();
rows_read += rows;
if (rows_read <= offset)
{
current_block.clear();
current_chunk.clear();
has_block = false;
/// Now, we pulled from input, and it must be empty.
@ -93,7 +93,7 @@ LimitTransform::Status LimitTransform::prepare()
if (output.hasData())
return Status::PortFull;
output.push(std::move(current_block));
output.push(std::move(current_chunk));
has_block = false;
return Status::NeedData;
@ -105,21 +105,25 @@ LimitTransform::Status LimitTransform::prepare()
void LimitTransform::work()
{
size_t rows = current_block.rows();
size_t columns = current_block.columns();
size_t num_rows = current_chunk.getNumRows();
size_t num_columns = current_chunk.getNumColumns();
/// return a piece of the block
size_t start = std::max(
static_cast<Int64>(0),
static_cast<Int64>(offset) - static_cast<Int64>(rows_read) + static_cast<Int64>(rows));
static_cast<Int64>(offset) - static_cast<Int64>(rows_read) + static_cast<Int64>(num_rows));
size_t length = std::min(
static_cast<Int64>(limit), std::min(
static_cast<Int64>(rows_read) - static_cast<Int64>(offset),
static_cast<Int64>(limit) + static_cast<Int64>(offset) - static_cast<Int64>(rows_read) + static_cast<Int64>(rows)));
static_cast<Int64>(limit) + static_cast<Int64>(offset) - static_cast<Int64>(rows_read) + static_cast<Int64>(num_rows)));
for (size_t i = 0; i < columns; ++i)
current_block.getByPosition(i).column = current_block.getByPosition(i).column->cut(start, length);
auto columns = current_chunk.detachColumns();
for (size_t i = 0; i < num_columns; ++i)
columns[i] = columns[i]->cut(start, length);
current_chunk.setColumns(std::move(columns), length);
block_processed = true;
}

View File

@ -19,7 +19,7 @@ private:
bool has_block = false;
bool block_processed = false;
Block current_block;
Chunk current_chunk;
public:
LimitTransform(Block header, size_t limit, size_t offset, bool always_read_till_end = false);

View File

@ -1,8 +1,105 @@
#include <Processors/Port.h>
#include <IO/WriteHelpers.h>
namespace DB
{
Chunk::Chunk(DB::Columns columns_, UInt64 num_rows_) : columns(std::move(columns_)), num_rows(num_rows_)
{
checkNumRowsIsConsistent();
}
Chunk::Chunk(Columns columns_, UInt64 num_rows_, ChunkInfoPtr chunk_info_)
: columns(std::move(columns_)), num_rows(num_rows_), chunk_info(std::move(chunk_info_))
{
checkNumRowsIsConsistent();
}
static Columns unmuteColumns(MutableColumns && mut_columns)
{
Columns columns;
columns.reserve(mut_columns.size());
for (auto & col : mut_columns)
columns.emplace_back(std::move(col));
return columns;
}
Chunk::Chunk(MutableColumns columns_, UInt64 num_rows_)
: columns(unmuteColumns(std::move(columns_))), num_rows(num_rows_)
{
}
Chunk::Chunk(MutableColumns columns_, UInt64 num_rows_, ChunkInfoPtr chunk_info_)
: columns(unmuteColumns(std::move(columns_))), num_rows(num_rows_), chunk_info(std::move(chunk_info_))
{
}
Chunk::Chunk(Chunk && other) noexcept
: columns(std::move(other.columns))
, num_rows(other.num_rows)
, chunk_info(std::move(other.chunk_info))
{
other.num_rows = 0;
}
Chunk & Chunk::operator=(Chunk && other) noexcept
{
columns = std::move(other.columns);
chunk_info = std::move(other.chunk_info);
num_rows = other.num_rows;
other.num_rows = 0;
return *this;
}
void Chunk::setColumns(Columns columns_, UInt64 num_rows_)
{
columns = std::move(columns_);
num_rows = num_rows_;
checkNumRowsIsConsistent();
}
void Chunk::setColumns(MutableColumns columns_, UInt64 num_rows_)
{
columns = unmuteColumns(std::move(columns_));
num_rows = num_rows_;
checkNumRowsIsConsistent();
}
void Chunk::checkNumRowsIsConsistent()
{
for (auto & column : columns)
if (column->size() != num_rows)
throw Exception("Invalid number of rows in Chunk column " + column->getName()+ ": expected " +
toString(num_rows) + ", got " + toString(column->size()), ErrorCodes::LOGICAL_ERROR);
}
MutableColumns Chunk::mutateColumns()
{
size_t num_columns = columns.size();
MutableColumns mut_columns(num_columns);
for (size_t i = 0; i < num_columns; ++i)
mut_columns[i] = (*std::move(columns[i])).mutate();
columns.clear();
num_rows = 0;
return mut_columns;
}
Columns Chunk::detachColumns()
{
num_rows = 0;
return std::move(columns);
}
void Chunk::clear()
{
num_rows = 0;
columns.clear();
chunk_info.reset();
}
void connect(OutputPort & output, InputPort & input)
{
if (input.state || output.state)

View File

@ -5,10 +5,58 @@
#include <Core/Block.h>
#include <Common/Exception.h>
namespace DB
{
class ChunkInfo
{
public:
virtual ~ChunkInfo() = default;
};
using ChunkInfoPtr = std::shared_ptr<const ChunkInfo>;
class Chunk
{
public:
Chunk() = default;
Chunk(const Chunk & other) = default;
Chunk(Chunk && other) noexcept;
Chunk(Columns columns_, UInt64 num_rows_);
Chunk(Columns columns_, UInt64 num_rows_, ChunkInfoPtr chunk_info_);
Chunk(MutableColumns columns_, UInt64 num_rows_);
Chunk(MutableColumns columns_, UInt64 num_rows_, ChunkInfoPtr chunk_info_);
Chunk & operator=(const Chunk & other) = default;
Chunk & operator=(Chunk && other) noexcept;
const Columns & getColumns() { return columns; }
void setColumns(Columns columns_, UInt64 num_rows_);
void setColumns(MutableColumns columns_, UInt64 num_rows_);
Columns detachColumns();
MutableColumns mutateColumns();
const ChunkInfoPtr & getChunkInfo() const { return chunk_info; }
void setChunkInfo(ChunkInfoPtr chunk_info_) { chunk_info = std::move(chunk_info_); }
UInt64 getNumRows() const { return num_rows; }
UInt64 getNumColumns() const { return columns.size(); }
bool empty() const { return num_rows == 0; }
operator bool() const { return !empty() || !columns.empty(); }
void clear();
private:
Columns columns;
UInt64 num_rows = 0;
ChunkInfoPtr chunk_info;
void checkNumRowsIsConsistent();
};
using Chunks = std::vector<Chunk>;
class InputPort;
class OutputPort;
class IProcessor;
@ -26,7 +74,7 @@ protected:
public:
State() = default;
void push(Block block)
void push(Chunk chunk)
{
if (finished)
throw Exception("Cannot push block to finished port.", ErrorCodes::LOGICAL_ERROR);
@ -37,11 +85,11 @@ protected:
if (has_data)
throw Exception("Cannot push block to port which already has data.", ErrorCodes::LOGICAL_ERROR);
data = std::move(block);
data = std::move(chunk);
has_data = true;
}
Block pull()
Chunk pull()
{
if (!needed)
throw Exception("Cannot pull block from port which is not needed.", ErrorCodes::LOGICAL_ERROR);
@ -106,7 +154,7 @@ protected:
bool isNeeded() const { return needed && !finished; }
private:
Block data;
Chunk data;
/// Use special flag to check if block has data. This allows to send empty blocks between processors.
bool has_data = false;
/// Block is not needed right now, but may be will be needed later.
@ -174,7 +222,7 @@ public:
void setVersion(UInt64 * value) { version = value; }
Block pull()
Chunk pull()
{
if (version)
++(*version);
@ -247,13 +295,13 @@ public:
void setVersion(UInt64 * value) { version = value; }
void push(Block block)
void push(Chunk chunk)
{
if (version)
++(*version);
assumeConnected();
state->push(std::move(block));
state->push(std::move(chunk));
}
void finish()

View File

@ -10,7 +10,7 @@ namespace DB
class QueueBuffer : public IAccumulatingTransform
{
private:
std::queue<Block> blocks;
std::queue<Chunk> chunks;
public:
String getName() const override { return "QueueBuffer"; }
@ -19,18 +19,18 @@ public:
{
}
void consume(Block block) override
void consume(Chunk block) override
{
blocks.push(std::move(block));
chunks.push(std::move(block));
}
Block generate() override
Chunk generate() override
{
if (blocks.empty())
if (chunks.empty())
return {};
Block res = std::move(blocks.front());
blocks.pop();
auto res = std::move(chunks.front());
chunks.pop();
return res;
}
};

View File

@ -41,14 +41,14 @@ private:
UInt64 current_number = 0;
unsigned sleep_useconds;
Block generate() override
Chunk generate() override
{
usleep(sleep_useconds);
MutableColumns columns;
columns.emplace_back(ColumnUInt64::create(1, current_number));
++current_number;
return getPort().getHeader().cloneWithColumns(std::move(columns));
return Chunk(std::move(columns), 1);
}
};
@ -78,10 +78,10 @@ public:
if (!output.canPush())
return Status::PortFull;
if (!current_block)
if (!current_chunk)
return Status::Async;
output.push(std::move(current_block));
output.push(std::move(current_chunk));
return Status::Async;
}
@ -91,7 +91,7 @@ public:
pool.schedule([&watch, this]
{
usleep(sleep_useconds);
current_block = generate();
current_chunk = generate();
active = false;
watch.notify();
});
@ -101,18 +101,18 @@ public:
private:
ThreadPool pool{1, 1, 0};
Block current_block;
Chunk current_chunk;
std::atomic_bool active {false};
UInt64 current_number = 0;
unsigned sleep_useconds;
Block generate()
Chunk generate()
{
MutableColumns columns;
columns.emplace_back(ColumnUInt64::create(1, current_number));
++current_number;
return getPort().getHeader().cloneWithColumns(std::move(columns));
return Chunk(std::move(columns), 1);
}
};
@ -133,10 +133,10 @@ private:
WriteBufferFromFileDescriptor out{STDOUT_FILENO};
FormatSettings settings;
void consume(Block block) override
void consume(Chunk chunk) override
{
size_t rows = block.rows();
size_t columns = block.columns();
size_t rows = chunk.getNumRows();
size_t columns = chunk.getNumColumns();
for (size_t row_num = 0; row_num < rows; ++row_num)
{
@ -145,7 +145,7 @@ private:
{
if (column_num != 0)
writeChar('\t', out);
getPort().getHeader().getByPosition(column_num).type->serializeText(*block.getByPosition(column_num).column, row_num, out, settings);
getPort().getHeader().getByPosition(column_num).type->serializeAsText(*chunk.getColumns()[column_num], row_num, out, settings);
}
writeChar('\n', out);
}

View File

@ -38,14 +38,14 @@ private:
UInt64 current_number = 0;
unsigned sleep_useconds;
Block generate() override
Chunk generate() override
{
usleep(sleep_useconds);
MutableColumns columns;
columns.emplace_back(ColumnUInt64::create(1, current_number));
++current_number;
return getPort().getHeader().cloneWithColumns(std::move(columns));
return Chunk(std::move(columns), 1);
}
};
@ -61,7 +61,7 @@ public:
String getName() const override { return "SleepyTransform"; }
protected:
void transform(Block &) override
void transform(Chunk &) override
{
usleep(sleep_useconds);
}
@ -75,19 +75,21 @@ class PrintSink : public ISink
public:
String getName() const override { return "Print"; }
explicit PrintSink(String prefix)
: ISink(Block({ColumnWithTypeAndName{ ColumnUInt64::create(), std::make_shared<DataTypeUInt64>(), "number" }})),
prefix(std::move(prefix)) {}
PrintSink(String prefix)
: ISink(Block({ColumnWithTypeAndName{ ColumnUInt64::create(), std::make_shared<DataTypeUInt64>(), "number" }})),
prefix(std::move(prefix))
{
}
private:
String prefix;
WriteBufferFromFileDescriptor out{STDOUT_FILENO};
FormatSettings settings;
void consume(Block block) override
void consume(Chunk chunk) override
{
size_t rows = block.rows();
size_t columns = block.columns();
size_t rows = chunk.getNumRows();
size_t columns = chunk.getNumColumns();
for (size_t row_num = 0; row_num < rows; ++row_num)
{
@ -96,7 +98,7 @@ private:
{
if (column_num != 0)
writeChar('\t', out);
getPort().getHeader().getByPosition(column_num).type->serializeText(*block.getByPosition(column_num).column, row_num, out, settings);
getPort().getHeader().getByPosition(column_num).type->serializeAsText(*chunk.getColumns()[column_num], row_num, out, settings);
}
writeChar('\n', out);
}

View File

@ -28,7 +28,7 @@ class MergingSortedProcessor : public IProcessor
public:
MergingSortedProcessor(const Block & header, size_t num_inputs)
: IProcessor(InputPorts(num_inputs, header), OutputPorts{header})
, blocks(num_inputs), positions(num_inputs, 0), finished(num_inputs, false)
, chunks(num_inputs), positions(num_inputs, 0), finished(num_inputs, false)
{
}
@ -76,13 +76,13 @@ public:
if (!inputs[i].isFinished())
{
all_inputs_finished = false;
bool needed = positions[i] >= blocks[i].rows();
bool needed = positions[i] >= chunks[i].getNumRows();
if (needed)
{
inputs[i].setNeeded();
if (inputs[i].hasData())
{
blocks[i] = inputs[i].pull();
chunks[i] = inputs[i].pull();
positions[i] = 0;
}
else
@ -112,15 +112,15 @@ public:
{
using Key = std::pair<UInt64, size_t>;
std::priority_queue<Key, std::vector<Key>, std::greater<>> queue;
for (size_t i = 0; i < blocks.size(); ++i)
for (size_t i = 0; i < chunks.size(); ++i)
{
if (finished[i])
continue;
if (positions[i] >= blocks[i].rows())
if (positions[i] >= chunks[i].getNumRows())
return;
queue.push({blocks[i].getByPosition(0).column->getUInt(positions[i]), i});
queue.push({chunks[i].getColumns()[0]->getUInt(positions[i]), i});
}
auto col = ColumnUInt64::create();
@ -130,7 +130,7 @@ public:
size_t ps = queue.top().second;
queue.pop();
auto & cur_col = blocks[ps].getByPosition(0).column;
auto & cur_col = chunks[ps].getColumns()[0];
col->insertFrom(*cur_col, positions[ps]);
++positions[ps];
@ -140,15 +140,15 @@ public:
queue.push({cur_col->getUInt(positions[ps]), ps});
}
res = getOutputPort().getHeader();
res.getByPosition(0).column = std::move(col);
UInt64 num_rows = col->size();
res.setColumns(Columns({std::move(col)}), num_rows);
}
OutputPort & getOutputPort() { return outputs[0]; }
private:
Blocks blocks;
Block res;
Chunks chunks;
Chunk res;
std::vector<size_t> positions;
std::vector<bool> finished;
};
@ -170,17 +170,18 @@ private:
UInt64 step;
unsigned sleep_useconds;
Block generate() override
Chunk generate() override
{
usleep(sleep_useconds);
MutableColumns columns;
columns.emplace_back(ColumnUInt64::create(1, current_number));
current_number += step;
return getPort().getHeader().cloneWithColumns(std::move(columns));
return Chunk(std::move(columns), 1);
}
};
class SleepyTransform : public ISimpleTransform
{
public:
@ -193,7 +194,7 @@ public:
String getName() const override { return "SleepyTransform"; }
protected:
void transform(Block &) override
void transform(Chunk &) override
{
usleep(sleep_useconds);
}
@ -207,19 +208,21 @@ class PrintSink : public ISink
public:
String getName() const override { return "Print"; }
explicit PrintSink(String prefix)
PrintSink(String prefix)
: ISink(Block({ColumnWithTypeAndName{ ColumnUInt64::create(), std::make_shared<DataTypeUInt64>(), "number" }})),
prefix(std::move(prefix)) {}
prefix(std::move(prefix))
{
}
private:
String prefix;
WriteBufferFromFileDescriptor out{STDOUT_FILENO};
FormatSettings settings;
void consume(Block block) override
void consume(Chunk chunk) override
{
size_t rows = block.rows();
size_t columns = block.columns();
size_t rows = chunk.getNumRows();
size_t columns = chunk.getNumColumns();
for (size_t row_num = 0; row_num < rows; ++row_num)
{
@ -228,7 +231,7 @@ private:
{
if (column_num != 0)
writeChar('\t', out);
getPort().getHeader().getByPosition(column_num).type->serializeText(*block.getByPosition(column_num).column, row_num, out, settings);
getPort().getHeader().getByPosition(column_num).type->serializeAsText(*chunk.getColumns()[column_num], row_num, out, settings);
}
writeChar('\n', out);
}