Stream row sources from disk for vertical merge [#CLICKHOUSE-3118]

This commit is contained in:
Alexey Zatelepin 2017-07-04 15:38:53 +03:00 committed by alexey-milovidov
parent 1cd936579d
commit 89939a685a
8 changed files with 177 additions and 130 deletions

View File

@ -41,39 +41,40 @@ void CollapsingSortedBlockInputStream::reportIncorrectData()
void CollapsingSortedBlockInputStream::insertRows(ColumnPlainPtrs & merged_columns, size_t & merged_rows, bool last_in_stream)
{
if (count_positive != 0 || count_negative != 0)
if (count_positive == 0 && count_negative == 0)
return;
if (count_positive == count_negative && !last_is_positive)
{
if (count_positive == count_negative && !last_is_positive)
/// If all the rows in the input streams collapsed, we still want to give at least one block in the result.
if (last_in_stream && merged_rows == 0 && !blocks_written)
{
/// If all the rows in the input streams collapsed, we still want to give at least one block in the result.
if (last_in_stream && merged_rows == 0 && !blocks_written)
LOG_INFO(log, "All rows collapsed");
++merged_rows;
for (size_t i = 0; i < num_columns; ++i)
merged_columns[i]->insertFrom(*last_positive.columns[i], last_positive.row_num);
++merged_rows;
for (size_t i = 0; i < num_columns; ++i)
merged_columns[i]->insertFrom(*last_negative.columns[i], last_negative.row_num);
if (out_row_sources_buf)
{
LOG_INFO(log, "All rows collapsed");
++merged_rows;
for (size_t i = 0; i < num_columns; ++i)
merged_columns[i]->insertFrom(*last_positive.columns[i], last_positive.row_num);
++merged_rows;
for (size_t i = 0; i < num_columns; ++i)
merged_columns[i]->insertFrom(*last_negative.columns[i], last_negative.row_num);
if (out_row_sources)
{
/// true flag value means "skip row"
out_row_sources->data()[last_positive_pos].setSkipFlag(false);
out_row_sources->data()[last_negative_pos].setSkipFlag(false);
}
/// true flag value means "skip row"
current_row_sources[last_positive_pos].setSkipFlag(false);
current_row_sources[last_negative_pos].setSkipFlag(false);
}
return;
}
}
else
{
if (count_positive <= count_negative)
{
++merged_rows;
for (size_t i = 0; i < num_columns; ++i)
merged_columns[i]->insertFrom(*first_negative.columns[i], first_negative.row_num);
if (out_row_sources)
out_row_sources->data()[first_negative_pos].setSkipFlag(false);
if (out_row_sources_buf)
current_row_sources[first_negative_pos].setSkipFlag(false);
}
if (count_positive >= count_negative)
@ -82,8 +83,8 @@ void CollapsingSortedBlockInputStream::insertRows(ColumnPlainPtrs & merged_colum
for (size_t i = 0; i < num_columns; ++i)
merged_columns[i]->insertFrom(*last_positive.columns[i], last_positive.row_num);
if (out_row_sources)
out_row_sources->data()[last_positive_pos].setSkipFlag(false);
if (out_row_sources_buf)
current_row_sources[last_positive_pos].setSkipFlag(false);
}
if (!(count_positive == count_negative || count_positive + 1 == count_negative || count_positive == count_negative + 1))
@ -93,6 +94,11 @@ void CollapsingSortedBlockInputStream::insertRows(ColumnPlainPtrs & merged_colum
++count_incorrect_data;
}
}
if (out_row_sources_buf)
out_row_sources_buf->write(
reinterpret_cast<const char *>(current_row_sources.data()),
current_row_sources.size() * sizeof(RowSourcePart));
}
@ -162,10 +168,6 @@ void CollapsingSortedBlockInputStream::merge(ColumnPlainPtrs & merged_columns, s
queue.pop();
/// Initially, skip all rows. On insert, unskip "corner" rows.
if (out_row_sources)
out_row_sources->emplace_back(current.impl->order, true);
if (key_differs)
{
/// We write data for the previous primary key.
@ -175,8 +177,18 @@ void CollapsingSortedBlockInputStream::merge(ColumnPlainPtrs & merged_columns, s
count_negative = 0;
count_positive = 0;
current_pos = 0;
first_negative_pos = 0;
last_positive_pos = 0;
last_negative_pos = 0;
current_row_sources.resize(0);
}
/// Initially, skip all rows. On insert, unskip "corner" rows.
if (out_row_sources_buf)
current_row_sources.emplace_back(current.impl->order, true);
if (sign == 1)
{
++count_positive;

View File

@ -23,10 +23,12 @@ namespace DB
class CollapsingSortedBlockInputStream : public MergingSortedBlockInputStream
{
public:
CollapsingSortedBlockInputStream(BlockInputStreams inputs_, const SortDescription & description_,
const String & sign_column_, size_t max_block_size_, MergedRowSources * out_row_sources_ = nullptr)
: MergingSortedBlockInputStream(inputs_, description_, max_block_size_, 0, out_row_sources_),
sign_column(sign_column_)
CollapsingSortedBlockInputStream(
BlockInputStreams inputs_, const SortDescription & description_,
const String & sign_column_, size_t max_block_size_,
WriteBuffer * out_row_sources_buf_ = nullptr)
: MergingSortedBlockInputStream(inputs_, description_, max_block_size_, 0, out_row_sources_buf_)
, sign_column(sign_column_)
{
}
@ -77,11 +79,13 @@ private:
size_t blocks_written = 0;
/// Fields specific for VERTICAL merge algorithm
size_t current_pos = 0; /// Global row number of current key
size_t first_negative_pos = 0; /// Global row number of first_negative
size_t last_positive_pos = 0; /// Global row number of last_positive
size_t last_negative_pos = 0; /// Global row number of last_negative
/// Fields specific for VERTICAL merge algorithm.
/// Row numbers are relative to the start of current primary key.
size_t current_pos = 0; /// Current row number
size_t first_negative_pos = 0; /// Row number of first_negative
size_t last_positive_pos = 0; /// Row number of last_positive
size_t last_negative_pos = 0; /// Row number of last_negative
PODArray<RowSourcePart> current_row_sources; /// Sources of rows with the current primary key
/** We support two different cursors - with Collation and without.
* Templates are used instead of polymorphic SortCursors and calls to virtual functions.

View File

@ -11,13 +11,16 @@ namespace ErrorCodes
extern const int LOGICAL_ERROR;
extern const int INCOMPATIBLE_COLUMNS;
extern const int INCORRECT_NUMBER_OF_COLUMNS;
extern const int NOT_FOUND_COLUMN_IN_BLOCK;
extern const int EMPTY_DATA_PASSED;
extern const int RECEIVED_EMPTY_DATA;
}
ColumnGathererStream::ColumnGathererStream(const BlockInputStreams & source_streams, const String & column_name_,
const MergedRowSources & row_source_, size_t block_preferred_size_)
: name(column_name_), row_source(row_source_), block_preferred_size(block_preferred_size_), log(&Logger::get("ColumnGathererStream"))
ColumnGathererStream::ColumnGathererStream(
const String & column_name_, const BlockInputStreams & source_streams, ReadBuffer & row_sources_buf_,
size_t block_preferred_size_)
: name(column_name_), row_sources_buf(row_sources_buf_)
, block_preferred_size(block_preferred_size_), log(&Logger::get("ColumnGathererStream"))
{
if (source_streams.empty())
throw Exception("There are no streams to gather", ErrorCodes::EMPTY_DATA_PASSED);
@ -49,8 +52,14 @@ void ColumnGathererStream::init()
Block & block = sources.back().block;
/// Sometimes MergeTreeReader injects additional column with partitioning key
if (block.columns() > 2 || !block.has(name))
throw Exception("Block should have 1 or 2 columns and contain column with requested name", ErrorCodes::INCORRECT_NUMBER_OF_COLUMNS);
if (block.columns() > 2)
throw Exception(
"Block should have 1 or 2 columns, but contains " + toString(block.columns()),
ErrorCodes::INCORRECT_NUMBER_OF_COLUMNS);
if (!block.has(name))
throw Exception(
"Not found column `" + name + "' in block.",
ErrorCodes::NOT_FOUND_COLUMN_IN_BLOCK);
if (i == 0)
{
@ -68,22 +77,19 @@ void ColumnGathererStream::init()
Block ColumnGathererStream::readImpl()
{
/// Special case: single source and there are no skipped rows
if (children.size() == 1 && row_source.size() == 0)
if (children.size() == 1 && row_sources_buf.eof())
return children[0]->read();
/// Initialize first source blocks
if (sources.empty())
init();
if (pos_global_start >= row_source.size())
if (row_sources_buf.eof())
return Block();
block_res = Block{column.cloneEmpty()};
IColumn & column_res = *block_res.getByPosition(0).column;
column_res.gather(*this);
return std::move(block_res);
output_block = Block{column.cloneEmpty()};
output_block.getByPosition(0).column->gather(*this);
return std::move(output_block);
}

View File

@ -14,22 +14,21 @@ namespace DB
/// Tiny struct, stores number of a Part from which current row was fetched, and insertion flag.
struct RowSourcePart
{
UInt8 data = 0;
RowSourcePart() = default;
RowSourcePart(size_t source_num, bool flag = false)
RowSourcePart(size_t source_num, bool skip_flag = false)
{
static_assert(sizeof(*this) == 1, "Size of RowSourcePart is too big due to compiler settings");
setSourceNum(source_num);
setSkipFlag(flag);
setSkipFlag(skip_flag);
}
/// Data is equal to getSourceNum() if flag is false
UInt8 getData() const { return data; }
size_t getSourceNum() const { return data & MASK_NUMBER; }
/// In CollapsingMergeTree case flag means "skip this rows"
bool getSkipFlag() const { return (data & MASK_FLAG) != 0; }
bool getSkipFlag() const { return (data & MASK_FLAG) != 0; }
void setSourceNum(size_t source_num)
{
@ -44,9 +43,6 @@ struct RowSourcePart
static constexpr size_t MAX_PARTS = 0x7F;
static constexpr UInt8 MASK_NUMBER = 0x7F;
static constexpr UInt8 MASK_FLAG = 0x80;
private:
UInt8 data;
};
using MergedRowSources = PODArray<RowSourcePart>;
@ -59,8 +55,9 @@ using MergedRowSources = PODArray<RowSourcePart>;
class ColumnGathererStream : public IProfilingBlockInputStream
{
public:
ColumnGathererStream(const BlockInputStreams & source_streams, const String & column_name_,
const MergedRowSources & row_source_, size_t block_preferred_size_ = DEFAULT_MERGE_BLOCK_SIZE);
ColumnGathererStream(
const String & column_name_, const BlockInputStreams & source_streams, ReadBuffer & row_sources_buf_,
size_t block_preferred_size_ = DEFAULT_BLOCK_SIZE);
String getName() const override { return "ColumnGatherer"; }
@ -75,11 +72,7 @@ public:
void gather(Column & column_res);
private:
String name;
ColumnWithTypeAndName column;
const MergedRowSources & row_source;
/// Cache required fileds
/// Cache required fields
struct Source
{
const IColumn * column;
@ -103,12 +96,16 @@ private:
void init();
void fetchNewBlock(Source & source, size_t source_num);
std::vector<Source> sources;
String name;
ColumnWithTypeAndName column;
std::vector<Source> sources;
ReadBuffer & row_sources_buf;
size_t pos_global_start = 0;
size_t block_preferred_size;
Block block_res;
Source * source_to_fully_copy = nullptr;
Block output_block;
Poco::Logger * log;
};
@ -116,56 +113,74 @@ private:
template<typename Column>
void ColumnGathererStream::gather(Column & column_res)
{
size_t global_size = row_source.size();
size_t curr_block_preferred_size = std::min(global_size - pos_global_start, block_preferred_size);
column_res.reserve(curr_block_preferred_size);
size_t pos_global = pos_global_start;
while (pos_global < global_size && column_res.size() < curr_block_preferred_size)
if (source_to_fully_copy) /// Was set on a previous iteration
{
auto source_data = row_source[pos_global].getData();
bool source_skip = row_source[pos_global].getSkipFlag();
auto source_num = row_source[pos_global].getSourceNum();
output_block.getByPosition(0).column = source_to_fully_copy->block.getByName(name).column;
source_to_fully_copy->pos = source_to_fully_copy->size;
source_to_fully_copy = nullptr;
return;
}
row_sources_buf.nextIfAtEnd();
RowSourcePart * row_source_pos = reinterpret_cast<RowSourcePart *>(row_sources_buf.position());
RowSourcePart * row_sources_end = reinterpret_cast<RowSourcePart *>(row_sources_buf.buffer().end());
size_t cur_block_preferred_size = std::min(static_cast<size_t>(row_sources_end - row_source_pos), block_preferred_size);
column_res.reserve(cur_block_preferred_size);
size_t cur_size = 0;
while (row_source_pos < row_sources_end && cur_size < cur_block_preferred_size)
{
RowSourcePart row_source = *row_source_pos;
size_t source_num = row_source.getSourceNum();
Source & source = sources[source_num];
bool source_skip = row_source.getSkipFlag();
++row_source_pos;
if (source.pos >= source.size) /// Fetch new block from source_num part
{
fetchNewBlock(source, source_num);
}
/// Consecutive optimization. TODO: precompute lens
/// Consecutive optimization. TODO: precompute lengths
size_t len = 1;
size_t max_len = std::min(global_size - pos_global, source.size - source.pos); // interval should be in the same block
for (; len < max_len && source_data == row_source[pos_global + len].getData(); ++len);
size_t max_len = std::min(static_cast<size_t>(row_sources_end - row_source_pos), source.size - source.pos); // interval should be in the same block
while (len < max_len && row_source_pos->data == row_source.data)
{
++len;
++row_source_pos;
}
row_sources_buf.position() = reinterpret_cast<char *>(row_source_pos);
if (!source_skip)
{
/// Whole block could be produced via copying pointer from current block
if (source.pos == 0 && source.size == len)
{
/// If current block already contains data, return it. We will be here again on next read() iteration.
if (column_res.size() != 0)
break;
/// If current block already contains data, return it.
/// Whole column from current source will be returned on next read() iteration.
if (cur_size > 0)
{
source_to_fully_copy = &source;
return;
}
block_res.getByPosition(0).column = source.block.getByName(name).column;
output_block.getByPosition(0).column = source.block.getByName(name).column;
source.pos += len;
pos_global += len;
break;
return;
}
else if (len == 1)
{
column_res.insertFrom(*source.column, source.pos);
}
else
{
column_res.insertRangeFrom(*source.column, source.pos, len);
}
cur_size += len;
}
source.pos += len;
pos_global += len;
}
pos_global_start = pos_global;
}
}

View File

@ -15,14 +15,13 @@ namespace ErrorCodes
}
MergingSortedBlockInputStream::MergingSortedBlockInputStream(BlockInputStreams & inputs_, const SortDescription & description_,
size_t max_block_size_, size_t limit_, MergedRowSources * out_row_sources_, bool quiet_)
: description(description_), max_block_size(max_block_size_), limit(limit_), quiet(quiet_),
source_blocks(inputs_.size()), cursors(inputs_.size()), out_row_sources(out_row_sources_)
MergingSortedBlockInputStream::MergingSortedBlockInputStream(
BlockInputStreams & inputs_, const SortDescription & description_,
size_t max_block_size_, size_t limit_, WriteBuffer * out_row_sources_buf_, bool quiet_)
: description(description_), max_block_size(max_block_size_), limit(limit_), quiet(quiet_)
, source_blocks(inputs_.size()), cursors(inputs_.size()), out_row_sources_buf(out_row_sources_buf_)
{
children.insert(children.end(), inputs_.begin(), inputs_.end());
if (out_row_sources)
out_row_sources->clear();
}
String MergingSortedBlockInputStream::getID() const
@ -253,8 +252,12 @@ void MergingSortedBlockInputStream::merge(Block & merged_block, ColumnPlainPtrs
finished = true;
}
if (out_row_sources)
out_row_sources->resize_fill(out_row_sources->size() + merged_rows, RowSourcePart(source_num));
if (out_row_sources_buf)
{
RowSourcePart row_source(source_num);
for (size_t i = 0; i < merged_rows; ++i)
out_row_sources_buf->write(row_source.data);
}
// std::cerr << "fetching next block\n";
@ -268,10 +271,11 @@ void MergingSortedBlockInputStream::merge(Block & merged_block, ColumnPlainPtrs
for (size_t i = 0; i < num_columns; ++i)
merged_columns[i]->insertFrom(*current->all_columns[i], current->pos);
if (out_row_sources)
if (out_row_sources_buf)
{
/// Actually, current.impl->order stores source number (i.e. cursors[current.impl->order] == current.impl)
out_row_sources->emplace_back(current.impl->order);
RowSourcePart row_source(current.impl->order);
out_row_sources_buf->write(row_source.data);
}
if (!current->isLast())

View File

@ -62,8 +62,9 @@ public:
* out_row_sources - if isn't nullptr, then at the end of execution it should contain part numbers of each readed row (and needed flag)
* quiet - don't log profiling info
*/
MergingSortedBlockInputStream(BlockInputStreams & inputs_, const SortDescription & description_, size_t max_block_size_,
size_t limit_ = 0, MergedRowSources * out_row_sources_ = nullptr, bool quiet_ = false);
MergingSortedBlockInputStream(
BlockInputStreams & inputs_, const SortDescription & description_, size_t max_block_size_,
size_t limit_ = 0, WriteBuffer * out_row_sources_buf_ = nullptr, bool quiet_ = false);
String getName() const override { return "MergingSorted"; }
@ -146,7 +147,7 @@ protected:
/// Used in Vertical merge algorithm to gather non-PK columns (on next step)
/// If it is not nullptr then it should be populated during execution
MergedRowSources * out_row_sources = nullptr;
WriteBuffer * out_row_sources_buf;
/// These methods are used in Collapsing/Summing/Aggregating... SortedBlockInputStream-s.

View File

@ -18,6 +18,8 @@
#include <DataStreams/MaterializingBlockInputStream.h>
#include <DataStreams/ConcatBlockInputStream.h>
#include <DataStreams/ColumnGathererStream.h>
#include <IO/CompressedWriteBuffer.h>
#include <IO/CompressedReadBufferFromFile.h>
#include <DataTypes/DataTypeNested.h>
#include <DataTypes/DataTypeArray.h>
#include <Storages/MergeTree/BackgroundProcessingPool.h>
@ -516,15 +518,23 @@ MergeTreeData::MutableDataPartPtr MergeTreeDataMerger::mergePartsToTemporaryPart
size_t sum_input_rows_upper_bound = merge_entry->total_size_marks * data.index_granularity;
MergedRowSources merged_rows_sources;
MergedRowSources * merged_rows_sources_ptr = &merged_rows_sources;
MergeAlgorithm merge_alg = chooseMergeAlgorithm(data, parts, sum_input_rows_upper_bound, gathering_columns, merged_rows_sources, deduplicate);
MergeAlgorithm merge_alg = chooseMergeAlgorithm(data, parts, sum_input_rows_upper_bound, gathering_columns, deduplicate);
LOG_DEBUG(log, "Selected MergeAlgorithm: " << ((merge_alg == MergeAlgorithm::Vertical) ? "Vertical" : "Horizontal"));
if (merge_alg != MergeAlgorithm::Vertical)
String rows_sources_file_path;
std::unique_ptr<WriteBuffer> rows_sources_uncompressed_write_buf;
std::unique_ptr<WriteBuffer> rows_sources_write_buf;
if (merge_alg == MergeAlgorithm::Vertical)
{
Poco::File(new_part_tmp_path).createDirectories();
rows_sources_file_path = new_part_tmp_path + "rows_sources";
rows_sources_uncompressed_write_buf = std::make_unique<WriteBufferFromFile>(rows_sources_file_path);
rows_sources_write_buf = std::make_unique<CompressedWriteBuffer>(*rows_sources_uncompressed_write_buf);
}
else
{
merged_rows_sources_ptr = nullptr;
merging_columns = all_columns;
merging_column_names = all_column_names;
gathering_columns.clear();
@ -564,12 +574,12 @@ MergeTreeData::MutableDataPartPtr MergeTreeDataMerger::mergePartsToTemporaryPart
{
case MergeTreeData::MergingParams::Ordinary:
merged_stream = std::make_unique<MergingSortedBlockInputStream>(
src_streams, sort_desc, DEFAULT_MERGE_BLOCK_SIZE, 0, merged_rows_sources_ptr, true);
src_streams, sort_desc, DEFAULT_MERGE_BLOCK_SIZE, 0, rows_sources_write_buf.get(), true);
break;
case MergeTreeData::MergingParams::Collapsing:
merged_stream = std::make_unique<CollapsingSortedBlockInputStream>(
src_streams, sort_desc, data.merging_params.sign_column, DEFAULT_MERGE_BLOCK_SIZE, merged_rows_sources_ptr);
src_streams, sort_desc, data.merging_params.sign_column, DEFAULT_MERGE_BLOCK_SIZE, rows_sources_write_buf.get());
break;
case MergeTreeData::MergingParams::Summing:
@ -658,6 +668,10 @@ MergeTreeData::MutableDataPartPtr MergeTreeDataMerger::mergePartsToTemporaryPart
auto it_name_and_type = gathering_columns.cbegin();
rows_sources_write_buf->next();
rows_sources_uncompressed_write_buf->next();
CompressedReadBufferFromFile rows_sources_read_buf(rows_sources_file_path, 0, 0);
for (size_t column_num = 0, gathering_column_names_size = gathering_column_names.size();
column_num < gathering_column_names_size;
++column_num, ++it_name_and_type)
@ -681,7 +695,8 @@ MergeTreeData::MutableDataPartPtr MergeTreeDataMerger::mergePartsToTemporaryPart
column_part_streams[part_num] = std::move(column_part_stream);
}
ColumnGathererStream column_gathered_stream(column_part_streams, column_name, merged_rows_sources, DEFAULT_BLOCK_SIZE);
rows_sources_read_buf.seek(0, 0);
ColumnGathererStream column_gathered_stream(column_name, column_part_streams, rows_sources_read_buf);
MergedColumnOnlyOutputStream column_to(data, new_part_tmp_path, false, compression_method, offset_written);
size_t column_elems_written = 0;
@ -710,6 +725,8 @@ MergeTreeData::MutableDataPartPtr MergeTreeDataMerger::mergePartsToTemporaryPart
if (isCancelled())
throw Exception("Cancelled merging parts", ErrorCodes::ABORTED);
}
Poco::File(rows_sources_file_path).remove();
}
/// Print overall profiling info. NOTE: it may duplicates previous messages
@ -746,7 +763,7 @@ MergeTreeData::MutableDataPartPtr MergeTreeDataMerger::mergePartsToTemporaryPart
MergeTreeDataMerger::MergeAlgorithm MergeTreeDataMerger::chooseMergeAlgorithm(
const MergeTreeData & data, const MergeTreeData::DataPartsVector & parts, size_t sum_rows_upper_bound,
const NamesAndTypesList & gathering_columns, MergedRowSources & rows_sources_to_alloc, bool deduplicate) const
const NamesAndTypesList & gathering_columns, bool deduplicate) const
{
if (deduplicate)
return MergeAlgorithm::Horizontal;
@ -766,19 +783,6 @@ MergeTreeDataMerger::MergeAlgorithm MergeTreeDataMerger::chooseMergeAlgorithm(
auto merge_alg = (is_supported_storage && enough_total_rows && enough_ordinary_cols && no_parts_overflow) ?
MergeAlgorithm::Vertical : MergeAlgorithm::Horizontal;
if (merge_alg == MergeAlgorithm::Vertical)
{
try
{
rows_sources_to_alloc.reserve(sum_rows_upper_bound);
}
catch (...)
{
/// Not enough memory for VERTICAL merge algorithm, make sense for very large tables
merge_alg = MergeAlgorithm::Horizontal;
}
}
return merge_alg;
}

View File

@ -136,8 +136,9 @@ public:
private:
MergeAlgorithm chooseMergeAlgorithm(const MergeTreeData & data, const MergeTreeData::DataPartsVector & parts,
size_t rows_upper_bound, const NamesAndTypesList & gathering_columns, MergedRowSources & rows_sources_to_alloc, bool deduplicate) const;
MergeAlgorithm chooseMergeAlgorithm(
const MergeTreeData & data, const MergeTreeData::DataPartsVector & parts,
size_t rows_upper_bound, const NamesAndTypesList & gathering_columns, bool deduplicate) const;
private:
MergeTreeData & data;