Merge pull request #272 from ludv1x/METR-23783

Consecutive optimization for vertical merge algorithm
This commit is contained in:
alexey-milovidov 2016-12-18 23:41:54 +04:00 committed by GitHub
commit fc27e531c2
5 changed files with 124 additions and 63 deletions

View File

@ -58,7 +58,7 @@ class ColumnGathererStream : public IProfilingBlockInputStream
{
public:
ColumnGathererStream(const BlockInputStreams & source_streams, const String & column_name_,
const MergedRowSources & pos_to_source_idx_, size_t block_size_ = DEFAULT_BLOCK_SIZE);
const MergedRowSources & row_source_, size_t block_size_ = DEFAULT_MERGE_BLOCK_SIZE);
String getName() const override { return "ColumnGatherer"; }
@ -66,11 +66,13 @@ public:
Block readImpl() override;
void readSuffixImpl() override;
private:
String name;
ColumnWithTypeAndName column;
const MergedRowSources & pos_to_source_idx;
const MergedRowSources & row_source;
/// Cache required fileds
struct Source
@ -95,7 +97,7 @@ private:
std::vector<Source> sources;
size_t pos_global = 0;
size_t pos_global_start = 0;
size_t block_size;
Logger * log = &Logger::get("ColumnGathererStream");

View File

@ -52,44 +52,21 @@ inline void intrusive_ptr_release(detail::SharedBlock * ptr)
}
/** Соединяет несколько сортированных потоков в один.
/** Merges several sorted streams into one sorted stream.
*/
class MergingSortedBlockInputStream : public IProfilingBlockInputStream
{
public:
/// limit - if isn't 0, then we can produce only first limit rows in sorted order.
/// out_row_sources - if isn't nullptr, then at the end of execution it should contain part numbers of each readed row (and needed flag)
MergingSortedBlockInputStream(BlockInputStreams & inputs_, const SortDescription & description_,
size_t max_block_size_, size_t limit_ = 0, MergedRowSources * out_row_sources_ = nullptr)
: description(description_), max_block_size(max_block_size_), limit(limit_),
source_blocks(inputs_.size()), cursors(inputs_.size()), out_row_sources(out_row_sources_)
{
children.insert(children.end(), inputs_.begin(), inputs_.end());
}
/** limit - if isn't 0, then we can produce only first limit rows in sorted order.
* out_row_sources - if isn't nullptr, then at the end of execution it should contain part numbers of each readed row (and needed flag)
* quiet - don't log profiling info
*/
MergingSortedBlockInputStream(BlockInputStreams & inputs_, const SortDescription & description_, size_t max_block_size_,
size_t limit_ = 0, MergedRowSources * out_row_sources_ = nullptr, bool quiet_ = false);
String getName() const override { return "MergingSorted"; }
String getID() const override
{
std::stringstream res;
res << "MergingSorted(";
Strings children_ids(children.size());
for (size_t i = 0; i < children.size(); ++i)
children_ids[i] = children[i]->getID();
/// Порядок не имеет значения.
std::sort(children_ids.begin(), children_ids.end());
for (size_t i = 0; i < children_ids.size(); ++i)
res << (i == 0 ? "" : ", ") << children_ids[i];
for (size_t i = 0; i < description.size(); ++i)
res << ", " << description[i].getID();
res << ")";
return res.str();
}
String getID() const override;
protected:
struct RowRef
@ -144,6 +121,7 @@ protected:
bool first = true;
bool has_collation = false;
bool quiet = false;
/// May be smaller or equal to max_block_size. To do 'reserve' for columns.
size_t expected_block_size = 0;

View File

@ -1,4 +1,5 @@
#include <DB/DataStreams/ColumnGathererStream.h>
#include <iomanip>
namespace DB
{
@ -13,8 +14,8 @@ namespace ErrorCodes
}
ColumnGathererStream::ColumnGathererStream(const BlockInputStreams & source_streams, const String & column_name_,
const MergedRowSources & pos_to_source_idx_, size_t block_size_)
: name(column_name_), pos_to_source_idx(pos_to_source_idx_), block_size(block_size_)
const MergedRowSources & row_source_, size_t block_size_)
: name(column_name_), row_source(row_source_), block_size(block_size_)
{
if (source_streams.empty())
throw Exception("There are no streams to gather", ErrorCodes::EMPTY_DATA_PASSED);
@ -63,19 +64,20 @@ Block ColumnGathererStream::readImpl()
if (children.size() == 1)
return children[0]->read();
if (pos_global >= pos_to_source_idx.size())
if (pos_global_start >= row_source.size())
return Block();
Block block_res{column.cloneEmpty()};
IColumn & column_res = *block_res.unsafeGetByPosition(0).column;
size_t pos_finish = std::min(pos_global + block_size, pos_to_source_idx.size());
column_res.reserve(pos_finish - pos_global);
size_t pos_global_finish = std::min(pos_global_start + block_size, row_source.size());
size_t curr_block_size = pos_global_finish - pos_global_start;
column_res.reserve(curr_block_size);
for (size_t pos = pos_global; pos < pos_finish; ++pos)
for (size_t pos_global = pos_global_start; pos_global < pos_global_finish;)
{
auto source_id = pos_to_source_idx[pos].getSourceNum();
bool skip = pos_to_source_idx[pos].getSkipFlag();
auto source_id = row_source[pos_global].getSourceNum();
bool skip = row_source[pos_global].getSkipFlag();
Source & source = sources[source_id];
if (source.pos >= source.size) /// Fetch new block
@ -98,14 +100,44 @@ Block ColumnGathererStream::readImpl()
}
}
/// Consecutive optimization. TODO: precompute lens
size_t len = 1;
size_t max_len = std::min(pos_global_finish - pos_global, source.size - source.pos); // interval should be in the same block
for (; len < max_len && row_source[pos_global].getData() == row_source[pos_global + len].getData(); ++len);
if (!skip)
column_res.insertFrom(*source.column, source.pos); //TODO: vectorize
++source.pos;
{
if (column_res.size() == 0 && source.pos == 0 && curr_block_size == len && source.size == len)
{
// Whole block could be produced via copying pointer from current block
block_res.unsafeGetByPosition(0).column = source.block.getByName(name).column;
}
else
{
column_res.insertRangeFrom(*source.column, source.pos, len);
}
}
pos_global = pos_finish;
source.pos += len;
pos_global += len;
}
pos_global_start = pos_global_finish;
return block_res;
}
void ColumnGathererStream::readSuffixImpl()
{
const BlockStreamProfileInfo & profile_info = getProfileInfo();
double seconds = profile_info.total_stopwatch.elapsedSeconds();
LOG_DEBUG(log, std::fixed << std::setprecision(2)
<< "Gathred column " << column.name << " " << column.type->getName()
<< " (" << static_cast<double>(profile_info.bytes) / profile_info.rows << " bytes/elem.)"
<< " in " << seconds << " sec., "
<< profile_info.rows / seconds << " rows/sec., "
<< profile_info.bytes / 1000000.0 / seconds << " MiB/sec.");
}
}

View File

@ -15,6 +15,36 @@ namespace ErrorCodes
}
MergingSortedBlockInputStream::MergingSortedBlockInputStream(BlockInputStreams & inputs_, const SortDescription & description_,
size_t max_block_size_, size_t limit_, MergedRowSources * out_row_sources_, bool quiet_)
: description(description_), max_block_size(max_block_size_), limit(limit_), quiet(quiet_),
source_blocks(inputs_.size()), cursors(inputs_.size()), out_row_sources(out_row_sources_)
{
children.insert(children.end(), inputs_.begin(), inputs_.end());
}
String MergingSortedBlockInputStream::getID() const
{
std::stringstream res;
res << "MergingSorted(";
Strings children_ids(children.size());
for (size_t i = 0; i < children.size(); ++i)
children_ids[i] = children[i]->getID();
/// Порядок не имеет значения.
std::sort(children_ids.begin(), children_ids.end());
for (size_t i = 0; i < children_ids.size(); ++i)
res << (i == 0 ? "" : ", ") << children_ids[i];
for (size_t i = 0; i < description.size(); ++i)
res << ", " << description[i].getID();
res << ")";
return res.str();
}
void MergingSortedBlockInputStream::init(Block & merged_block, ColumnPlainPtrs & merged_columns)
{
/// Читаем первые блоки, инициализируем очередь.
@ -312,13 +342,16 @@ void MergingSortedBlockInputStream::fetchNextBlock(const TSortCursor & current,
void MergingSortedBlockInputStream::readSuffixImpl()
{
if (quiet)
return;
const BlockStreamProfileInfo & profile_info = getProfileInfo();
double seconds = profile_info.total_stopwatch.elapsedSeconds();
LOG_DEBUG(log, std::fixed << std::setprecision(2)
<< "Merge sorted " << profile_info.blocks << " blocks, " << profile_info.rows << " rows"
<< " in " << seconds << " sec., "
<< profile_info.rows / seconds << " rows/sec., "
<< profile_info.bytes / 1000000.0 / seconds << " MiB/sec.");
<< profile_info.bytes / 1000000.0 / seconds << " MB/sec.");
}
}

View File

@ -25,6 +25,7 @@
#include <cmath>
#include <numeric>
#include <iomanip>
namespace ProfileEvents
@ -557,7 +558,7 @@ MergeTreeData::MutableDataPartPtr MergeTreeDataMerger::mergePartsToTemporaryPart
{
case MergeTreeData::MergingParams::Ordinary:
merged_stream = std::make_unique<MergingSortedBlockInputStream>(
src_streams, sort_desc, DEFAULT_MERGE_BLOCK_SIZE, 0, merged_rows_sources_ptr);
src_streams, sort_desc, DEFAULT_MERGE_BLOCK_SIZE, 0, merged_rows_sources_ptr, true);
break;
case MergeTreeData::MergingParams::Collapsing:
@ -618,19 +619,25 @@ MergeTreeData::MutableDataPartPtr MergeTreeDataMerger::mergePartsToTemporaryPart
merge_entry->rows_written = merged_stream->getProfileInfo().rows;
merge_entry->bytes_written_uncompressed = merged_stream->getProfileInfo().bytes;
/// This update is unactual for VERTICAL algorithm sicne it requires more accurate per-column updates
/// Reservation updates is not performed yet, during the merge it may lead to higher free space requirements
if (disk_reservation && merge_alg == MergeAlgorithm::Horizontal)
if (disk_reservation)
{
Float64 relative_rows_written = std::min(1., 1. * rows_written / sum_input_rows_upper_bound);
disk_reservation->update(static_cast<size_t>((1. - relative_rows_written) * initial_reservation));
/// The same progress from merge_entry could be used for both algorithms (it should be more accurate)
/// But now we are using inaccurate row-based estimation in Horizontal case for backward compability
Float64 progress = (merge_alg == MergeAlgorithm::Horizontal)
? std::min(1., 1. * rows_written / sum_input_rows_upper_bound)
: std::min(1., merge_entry->progress);
disk_reservation->update(static_cast<size_t>((1. - progress) * initial_reservation));
}
}
merged_stream->readSuffix();
merged_stream.reset();
if (isCancelled())
throw Exception("Cancelled merging parts", ErrorCodes::ABORTED);
MergeTreeData::DataPart::Checksums checksums_ordinary_columns;
MergeTreeData::DataPart::Checksums checksums_gathered_columns;
/// Gather ordinary columns
if (merge_alg == MergeAlgorithm::Vertical)
@ -649,18 +656,15 @@ MergeTreeData::MutableDataPartPtr MergeTreeDataMerger::mergePartsToTemporaryPart
const String & column_name = it_name_and_type->name;
const DataTypePtr & column_type = it_name_and_type->type;
const String offset_column_name = DataTypeNested::extractNestedTableName(column_name);
Names column_name_(1, column_name);
NamesAndTypesList column_name_and_type_(1, *it_name_and_type);
Names column_name_{column_name};
NamesAndTypesList column_name_and_type_{*it_name_and_type};
Float64 progress_before = merge_entry->progress;
bool offset_written = offset_columns_written.count(offset_column_name);
LOG_TRACE(log, "Gathering column " << column_name << " " << column_type->getName());
for (size_t part_num = 0; part_num < parts.size(); ++part_num)
{
String part_path = data.getFullPath() + parts[part_num]->name + '/';
/// TODO: test perfomance with more accurate settings
auto column_part_stream = std::make_shared<MergeTreeBlockInputStream>(
part_path, DEFAULT_MERGE_BLOCK_SIZE, column_name_, data, parts[part_num],
MarkRanges(1, MarkRange(0, parts[part_num]->size)), false, nullptr, "", true, aio_threshold, DBMS_DEFAULT_BUFFER_SIZE,
@ -672,16 +676,17 @@ MergeTreeData::MutableDataPartPtr MergeTreeDataMerger::mergePartsToTemporaryPart
column_part_streams[part_num] = std::move(column_part_stream);
}
ColumnGathererStream column_gathered_stream(column_part_streams, column_name, merged_rows_sources, DEFAULT_BLOCK_SIZE);
MergedColumnOnlyOutputStream column_to(data, new_part_tmp_path, true, compression_method, offset_written);
/// Block size should match with block size of column_part_stream to enable fast gathering via copying of column pointer
ColumnGathererStream column_gathered_stream(column_part_streams, column_name, merged_rows_sources, DEFAULT_MERGE_BLOCK_SIZE);
MergedColumnOnlyOutputStream column_to(data, new_part_tmp_path, false, compression_method, offset_written);
column_to.writePrefix();
while ((block = column_gathered_stream.read()))
{
column_to.write(block);
}
/// NOTE: nested column contains duplicates checksums (and files)
checksums_ordinary_columns.add(column_to.writeSuffixAndGetChecksums());
column_gathered_stream.readSuffix();
checksums_gathered_columns.add(column_to.writeSuffixAndGetChecksums());
if (typeid_cast<const DataTypeArray *>(column_type.get()))
offset_columns_written.emplace(offset_column_name);
@ -695,12 +700,23 @@ MergeTreeData::MutableDataPartPtr MergeTreeDataMerger::mergePartsToTemporaryPart
}
}
merged_stream->readSuffix();
/// Print overall profiling info. NOTE: it may duplicates previous messages
{
double elapsed_seconds = merge_entry->watch.elapsedSeconds();
LOG_DEBUG(log, std::fixed << std::setprecision(2)
<< "Merge sorted " << merge_entry->rows_read << " rows"
<< ", containing " << all_column_names.size() << " columns"
<< " (" << merging_column_names.size() << " merged, " << gathering_column_names.size() << " gathered)"
<< " in " << elapsed_seconds << " sec., "
<< merge_entry->rows_read / elapsed_seconds << " rows/sec., "
<< merge_entry->bytes_read_uncompressed / 1000000.0 / elapsed_seconds << " MB/sec.");
}
new_data_part->columns = all_columns;
if (merge_alg != MergeAlgorithm::Vertical)
new_data_part->checksums = to.writeSuffixAndGetChecksums();
else
new_data_part->checksums = to.writeSuffixAndGetChecksums(all_columns, &checksums_ordinary_columns);
new_data_part->checksums = to.writeSuffixAndGetChecksums(all_columns, &checksums_gathered_columns);
new_data_part->index.swap(to.getIndex());
/// Для удобства, даже CollapsingSortedBlockInputStream не может выдать ноль строк.