Update ColumnGathererStream

This commit is contained in:
Nikolai Kochetov 2021-09-29 20:45:01 +03:00
parent 378ebb3f3a
commit 68f8b9d235
8 changed files with 168 additions and 125 deletions

View File

@ -18,97 +18,151 @@ namespace ErrorCodes
}
ColumnGathererStream::ColumnGathererStream(
const String & column_name_, const BlockInputStreams & source_streams, ReadBuffer & row_sources_buf_,
size_t block_preferred_size_)
: column_name(column_name_), sources(source_streams.size()), row_sources_buf(row_sources_buf_)
, block_preferred_size(block_preferred_size_), log(&Poco::Logger::get("ColumnGathererStream"))
size_t num_inputs, ReadBuffer & row_sources_buf_, size_t block_preferred_size_)
: sources(num_inputs), row_sources_buf(row_sources_buf_)
, block_preferred_size(block_preferred_size_)
{
if (source_streams.empty())
if (num_inputs == 0)
throw Exception("There are no streams to gather", ErrorCodes::EMPTY_DATA_PASSED);
}
children.assign(source_streams.begin(), source_streams.end());
for (size_t i = 0; i < children.size(); ++i)
void ColumnGathererStream::initialize(Inputs inputs)
{
for (size_t i = 0; i < inputs.size(); ++i)
{
const Block & header = children[i]->getHeader();
/// Sometimes MergeTreeReader injects additional column with partitioning key
if (header.columns() > 2)
throw Exception(
"Block should have 1 or 2 columns, but contains " + toString(header.columns()),
ErrorCodes::INCORRECT_NUMBER_OF_COLUMNS);
if (i == 0)
if (inputs[i].chunk)
{
column.name = column_name;
column.type = header.getByName(column_name).type;
column.column = column.type->createColumn();
sources[i].update(inputs[i].chunk.detachColumns().at(0));
if (!result_column)
result_column = sources[i].column->cloneEmpty();
}
else if (header.getByName(column_name).column->getName() != column.column->getName())
throw Exception("Column types don't match", ErrorCodes::INCOMPATIBLE_COLUMNS);
}
}
Block ColumnGathererStream::readImpl()
IMergingAlgorithm::Status ColumnGathererStream::merge()
{
/// Nothing to read after initialize.
if (!result_column)
return Status(Chunk(), true);
if (source_to_fully_copy) /// Was set on a previous iteration
{
Chunk res;
res.addColumn(source_to_fully_copy->column);
merged_rows += source_to_fully_copy->size;
source_to_fully_copy->pos = source_to_fully_copy->size;
source_to_fully_copy = nullptr;
return Status(std::move(res));
}
/// Special case: single source and there are no skipped rows
if (children.size() == 1 && row_sources_buf.eof() && !source_to_fully_copy)
return children[0]->read();
/// Note: looks like this should never happen because row_sources_buf cannot just skip row info.
if (sources.size() == 1 && row_sources_buf.eof())
{
if (sources.front().pos < sources.front().size)
{
next_required_source = 0;
Chunk res;
merged_rows += sources.front().column->size();
merged_bytes += sources.front().column->allocatedBytes();
res.addColumn(std::move(sources.front().column));
sources.front().pos = sources.front().size = 0;
return Status(std::move(res));
}
if (!source_to_fully_copy && row_sources_buf.eof())
return Block();
if (next_required_source == -1)
return Status(Chunk(), true);
MutableColumnPtr output_column = column.column->cloneEmpty();
output_block = Block{column.cloneEmpty()};
/// Surprisingly this call may directly change output_block, bypassing
next_required_source = 0;
return Status(next_required_source);
}
if (next_required_source != -1 && sources[next_required_source].size == 0)
throw Exception("Cannot fetch required block. Source " + toString(next_required_source), ErrorCodes::RECEIVED_EMPTY_DATA);
/// Surprisingly this call may directly change some internal state of ColumnGathererStream.
/// output_column. See ColumnGathererStream::gather.
output_column->gather(*this);
if (!output_column->empty())
output_block.getByPosition(0).column = std::move(output_column);
result_column->gather(*this);
return output_block;
if (next_required_source != -1)
return Status(next_required_source);
if (source_to_fully_copy && result_column->empty())
{
Chunk res;
merged_rows += source_to_fully_copy->column->size();
merged_bytes += source_to_fully_copy->column->allocatedBytes();
res.addColumn(source_to_fully_copy->column);
source_to_fully_copy->pos = source_to_fully_copy->size;
source_to_fully_copy = nullptr;
return Status(std::move(res));
}
auto col = result_column->cloneEmpty();
result_column.swap(col);
Chunk res;
merged_rows += col->size();
merged_bytes += col->allocatedBytes();
res.addColumn(std::move(col));
return Status(std::move(res), row_sources_buf.eof());
}
void ColumnGathererStream::fetchNewBlock(Source & source, size_t source_num)
void ColumnGathererStream::consume(Input & input, size_t source_num)
{
try
{
source.block = children[source_num]->read();
source.update(column_name);
}
catch (Exception & e)
{
e.addMessage("Cannot fetch required block. Stream " + children[source_num]->getName() + ", part " + toString(source_num));
throw;
}
auto & source = sources[source_num];
if (input.chunk)
source.update(input.chunk.getColumns().at(0));
if (0 == source.size)
{
throw Exception("Fetched block is empty. Stream " + children[source_num]->getName() + ", part " + toString(source_num),
throw Exception("Fetched block is empty. Source " + toString(source_num),
ErrorCodes::RECEIVED_EMPTY_DATA);
}
}
void ColumnGathererStream::readSuffixImpl()
ColumnGathererTransform::ColumnGathererTransform(
const Block & header,
size_t num_inputs,
ReadBuffer & row_sources_buf_,
size_t block_preferred_size_)
: IMergingTransform<ColumnGathererStream>(
num_inputs, header, header, /*have_all_inputs_=*/ true, /*has_limit_below_one_block_=*/ false,
num_inputs, row_sources_buf_, block_preferred_size_)
, log(&Poco::Logger::get("ColumnGathererStream"))
{
const BlockStreamProfileInfo & profile_info = getProfileInfo();
if (header.columns() != 1)
throw Exception(
"Header should have 1 column, but contains " + toString(header.columns()),
ErrorCodes::INCORRECT_NUMBER_OF_COLUMNS);
}
void ColumnGathererTransform::work()
{
Stopwatch stopwatch;
IMergingTransform<ColumnGathererStream>::work();
elapsed_ns += stopwatch.elapsedNanoseconds();
}
void ColumnGathererTransform::onFinish()
{
auto merged_rows = algorithm.getMergedRows();
auto merged_bytes = algorithm.getMergedRows();
/// Don't print info for small parts (< 10M rows)
if (profile_info.rows < 10000000)
if (merged_rows < 10000000)
return;
double seconds = profile_info.total_stopwatch.elapsedSeconds();
double seconds = static_cast<double>(elapsed_ns) / 1000000000ULL;
const auto & column_name = getOutputPort().getHeader().getByPosition(0).name;
if (!seconds)
LOG_DEBUG(log, "Gathered column {} ({} bytes/elem.) in 0 sec.",
column_name, static_cast<double>(profile_info.bytes) / profile_info.rows);
column_name, static_cast<double>(merged_bytes) / merged_rows);
else
LOG_DEBUG(log, "Gathered column {} ({} bytes/elem.) in {} sec., {} rows/sec., {}/sec.",
column_name, static_cast<double>(profile_info.bytes) / profile_info.rows, seconds,
profile_info.rows / seconds, ReadableSize(profile_info.bytes / seconds));
column_name, static_cast<double>(merged_bytes) / merged_rows, seconds,
merged_rows / seconds, ReadableSize(merged_bytes / seconds));
}
}

View File

@ -1,8 +1,9 @@
#pragma once
#include <DataStreams/IBlockInputStream.h>
#include <IO/ReadBuffer.h>
#include <Common/PODArray.h>
#include <Processors/Merges/Algorithms/IMergingAlgorithm.h>
#include <Processors/Merges/IMergingTransform.h>
namespace Poco { class Logger; }
@ -53,77 +54,91 @@ using MergedRowSources = PODArray<RowSourcePart>;
* Stream mask maps row number to index of source stream.
* Streams should contain exactly one column.
*/
class ColumnGathererStream : public IBlockInputStream
class ColumnGathererStream final : public IMergingAlgorithm
{
public:
ColumnGathererStream(
const String & column_name_, const BlockInputStreams & source_streams, ReadBuffer & row_sources_buf_,
size_t block_preferred_size_ = DEFAULT_BLOCK_SIZE);
ColumnGathererStream(size_t num_inputs, ReadBuffer & row_sources_buf_, size_t block_preferred_size_ = DEFAULT_BLOCK_SIZE);
String getName() const override { return "ColumnGatherer"; }
Block readImpl() override;
void readSuffixImpl() override;
Block getHeader() const override { return children.at(0)->getHeader(); }
void initialize(Inputs inputs) override;
void consume(Input & input, size_t source_num) override;
Status merge() override;
/// for use in implementations of IColumn::gather()
template <typename Column>
void gather(Column & column_res);
UInt64 getMergedRows() const { return merged_rows; }
UInt64 getMergedBytes() const { return merged_bytes; }
private:
/// Cache required fields
struct Source
{
const IColumn * column = nullptr;
ColumnPtr column;
size_t pos = 0;
size_t size = 0;
Block block;
void update(const String & name)
void update(ColumnPtr column_)
{
column = block.getByName(name).column.get();
size = block.rows();
column = std::move(column_);
size = column->size();
pos = 0;
}
};
void fetchNewBlock(Source & source, size_t source_num);
String column_name;
ColumnWithTypeAndName column;
MutableColumnPtr result_column;
std::vector<Source> sources;
ReadBuffer & row_sources_buf;
size_t block_preferred_size;
const size_t block_preferred_size;
Source * source_to_fully_copy = nullptr;
Block output_block;
ssize_t next_required_source = -1;
size_t cur_block_preferred_size;
UInt64 merged_rows = 0;
UInt64 merged_bytes = 0;
};
class ColumnGathererTransform final : public IMergingTransform<ColumnGathererStream>
{
public:
ColumnGathererTransform(
const Block & header,
size_t num_inputs,
ReadBuffer & row_sources_buf_,
size_t block_preferred_size_ = DEFAULT_BLOCK_SIZE);
String getName() const override { return "ColumnGathererTransform"; }
void work() override;
protected:
void onFinish() override;
UInt64 elapsed_ns = 0;
Poco::Logger * log;
};
template <typename Column>
void ColumnGathererStream::gather(Column & column_res)
{
if (source_to_fully_copy) /// Was set on a previous iteration
{
output_block.getByPosition(0).column = source_to_fully_copy->block.getByName(column_name).column;
source_to_fully_copy->pos = source_to_fully_copy->size;
source_to_fully_copy = nullptr;
return;
}
row_sources_buf.nextIfAtEnd();
RowSourcePart * row_source_pos = reinterpret_cast<RowSourcePart *>(row_sources_buf.position());
RowSourcePart * row_sources_end = reinterpret_cast<RowSourcePart *>(row_sources_buf.buffer().end());
size_t cur_block_preferred_size = std::min(static_cast<size_t>(row_sources_end - row_source_pos), block_preferred_size);
column_res.reserve(cur_block_preferred_size);
if (next_required_source == -1)
{
/// Start new column.
cur_block_preferred_size = std::min(static_cast<size_t>(row_sources_end - row_source_pos), block_preferred_size);
column_res.reserve(cur_block_preferred_size);
}
size_t cur_size = 0;
size_t cur_size = column_res->size();
next_required_source = -1;
while (row_source_pos < row_sources_end && cur_size < cur_block_preferred_size)
{
@ -131,13 +146,15 @@ void ColumnGathererStream::gather(Column & column_res)
size_t source_num = row_source.getSourceNum();
Source & source = sources[source_num];
bool source_skip = row_source.getSkipFlag();
++row_source_pos;
if (source.pos >= source.size) /// Fetch new block from source_num part
{
fetchNewBlock(source, source_num);
next_required_source = source_num;
return;
}
++row_source_pos;
/// Consecutive optimization. TODO: precompute lengths
size_t len = 1;
size_t max_len = std::min(static_cast<size_t>(row_sources_end - row_source_pos), source.size - source.pos); // interval should be in the same block
@ -156,14 +173,7 @@ void ColumnGathererStream::gather(Column & column_res)
{
/// If current block already contains data, return it.
/// Whole column from current source will be returned on next read() iteration.
if (cur_size > 0)
{
source_to_fully_copy = &source;
return;
}
output_block.getByPosition(0).column = source.block.getByName(column_name).column;
source.pos += len;
source_to_fully_copy = &source;
return;
}
else if (len == 1)

View File

@ -2,14 +2,13 @@
#include <Storages/MergeTree/MergeTreeIndexGranularity.h>
#include <Storages/MergeTree/MergeTreeData.h>
#include <DataStreams/IBlockOutputStream.h>
#include <Storages/MergeTree/IMergeTreeDataPart.h>
#include <Storages/MergeTree/IMergeTreeDataPartWriter.h>
namespace DB
{
class IMergedBlockOutputStream : public IBlockOutputStream
class IMergedBlockOutputStream
{
public:
IMergedBlockOutputStream(
@ -35,7 +34,6 @@ protected:
NamesAndTypesList & columns,
MergeTreeData::DataPart::Checksums & checksums);
protected:
const MergeTreeData & storage;
StorageMetadataPtr metadata_snapshot;

View File

@ -238,9 +238,6 @@ bool MergeTask::ExecuteAndFinalizeHorizontalPart::prepare()
global_ctx->merged_stream->readPrefix();
/// TODO: const
const_cast<MergedBlockOutputStream&>(*global_ctx->to).writePrefix();
global_ctx->rows_written = 0;
ctx->initial_reservation = global_ctx->space_reservation ? global_ctx->space_reservation->getSize() : 0;
@ -421,8 +418,6 @@ void MergeTask::VerticalMergeStage::prepareVerticalMergeForOneColumn() const
global_ctx->to->getIndexGranularity());
ctx->column_elems_written = 0;
ctx->column_to->writePrefix();
}

View File

@ -51,11 +51,6 @@ void MergedBlockOutputStream::writeWithPermutation(const Block & block, const IC
writeImpl(block, permutation);
}
void MergedBlockOutputStream::writeSuffix()
{
throw Exception("Method writeSuffix is not supported by MergedBlockOutputStream", ErrorCodes::NOT_IMPLEMENTED);
}
void MergedBlockOutputStream::writeSuffixAndFinalizePart(
MergeTreeData::MutableDataPartPtr & new_part,
bool sync,

View File

@ -21,18 +21,16 @@ public:
CompressionCodecPtr default_codec_,
bool blocks_are_granules_size = false);
Block getHeader() const override { return metadata_snapshot->getSampleBlock(); }
Block getHeader() const { return metadata_snapshot->getSampleBlock(); }
/// If the data is pre-sorted.
void write(const Block & block) override;
void write(const Block & block);
/** If the data is not sorted, but we have previously calculated the permutation, that will sort it.
* This method is used to save RAM, since you do not need to keep two blocks at once - the original one and the sorted one.
*/
void writeWithPermutation(const Block & block, const IColumn::Permutation * permutation);
void writeSuffix() override;
/// Finalize writing part and fill inner structures
/// If part is new and contains projections, they should be added before invoking this method.
void writeSuffixAndFinalizePart(
@ -53,7 +51,6 @@ private:
MergeTreeData::DataPart::Checksums & checksums,
bool sync);
private:
NamesAndTypesList columns_list;
IMergeTreeDataPart::MinMaxIndex minmax_idx;
size_t rows_count = 0;

View File

@ -53,11 +53,6 @@ void MergedColumnOnlyOutputStream::write(const Block & block)
writer->write(block, nullptr);
}
void MergedColumnOnlyOutputStream::writeSuffix()
{
throw Exception("Method writeSuffix is not supported by MergedColumnOnlyOutputStream", ErrorCodes::NOT_IMPLEMENTED);
}
MergeTreeData::DataPart::Checksums
MergedColumnOnlyOutputStream::writeSuffixAndGetChecksums(
MergeTreeData::MutableDataPartPtr & new_part,

View File

@ -23,9 +23,8 @@ public:
const MergeTreeIndexGranularity & index_granularity = {},
const MergeTreeIndexGranularityInfo * index_granularity_info_ = nullptr);
Block getHeader() const override { return header; }
void write(const Block & block) override;
void writeSuffix() override;
Block getHeader() const { return header; }
void write(const Block & block);
MergeTreeData::DataPart::Checksums
writeSuffixAndGetChecksums(MergeTreeData::MutableDataPartPtr & new_part, MergeTreeData::DataPart::Checksums & all_checksums, bool sync = false);