Merge remote-tracking branch 'upstream/master' into METR-23466

This commit is contained in:
proller 2016-11-24 22:58:40 +03:00
commit 3b335b5546
20 changed files with 71 additions and 637 deletions

View File

@ -298,7 +298,6 @@ add_library (dbms
include/DB/DataStreams/SquashingTransform.h
include/DB/DataStreams/SquashingBlockInputStream.h
include/DB/DataStreams/SquashingBlockOutputStream.h
include/DB/DataStreams/ColumnGathererStream.h
include/DB/DataTypes/IDataType.h
include/DB/DataTypes/IDataTypeDummy.h
include/DB/DataTypes/DataTypeSet.h
@ -814,7 +813,6 @@ add_library (dbms
src/DataStreams/SquashingTransform.cpp
src/DataStreams/SquashingBlockInputStream.cpp
src/DataStreams/SquashingBlockOutputStream.cpp
src/DataStreams/ColumnGathererStream.cpp
src/DataTypes/DataTypeString.cpp
src/DataTypes/DataTypeFixedString.cpp

View File

@ -35,9 +35,9 @@ struct SortColumnDescription
using SortDescription = std::vector<SortColumnDescription>;
/** Cursor allows to compare rows in different blocks (and parts).
* Cursor moves inside single block.
* It is used in priority queue.
/** Курсор, позволяющий сравнивать соответствующие строки в разных блоках.
* Курсор двигается по одному блоку.
* Для использования в priority queue.
*/
struct SortCursorImpl
{
@ -48,17 +48,14 @@ struct SortCursorImpl
size_t pos = 0;
size_t rows = 0;
/** Determines order if comparing columns are equal.
* Order is determined by number of cursor.
*
* Cursor number (always?) equals to number of merging part.
* Therefore this field can be used to determine part number of current row (see ColumnGathererStream).
/** Порядок (что сравнивается), если сравниваемые столбцы равны.
* Даёт возможность предпочитать строки из нужного курсора.
*/
size_t order;
using NeedCollationFlags = std::vector<UInt8>;
/** Should we use Collator to sort a column? */
/** Нужно ли использовать Collator для сортировки столбца */
NeedCollationFlags need_collation;
/** Есть ли хотя бы один столбец с Collator. */

View File

@ -24,8 +24,8 @@ class CollapsingSortedBlockInputStream : public MergingSortedBlockInputStream
{
public:
CollapsingSortedBlockInputStream(BlockInputStreams inputs_, const SortDescription & description_,
const String & sign_column_, size_t max_block_size_, MergedRowSources * out_row_sources_ = nullptr)
: MergingSortedBlockInputStream(inputs_, description_, max_block_size_, 0, out_row_sources_),
const String & sign_column_, size_t max_block_size_)
: MergingSortedBlockInputStream(inputs_, description_, max_block_size_),
sign_column(sign_column_)
{
}
@ -62,7 +62,7 @@ private:
/// Прочитали до конца.
bool finished = false;
RowRef current_key; /// Текущий первичный ключ.
RowRef current_key; /// Текущий первичный ключ.
RowRef next_key; /// Первичный ключ следующей строки.
RowRef first_negative; /// Первая отрицательная строка для текущего первичного ключа.
@ -77,12 +77,6 @@ private:
size_t blocks_written = 0;
/// Fields specific for VERTICAL merge algorithm
size_t current_pos = 0; /// Global row number of current key
size_t first_negative_pos = 0; /// Global row number of first_negative
size_t last_positive_pos = 0; /// Global row number of last_positive
size_t last_negative_pos = 0; /// Global row number of last_negative
/** Делаем поддержку двух разных курсоров - с Collation и без.
* Шаблоны используем вместо полиморфных SortCursor'ов и вызовов виртуальных функций.
*/

View File

@ -1,81 +0,0 @@
#pragma once
#include <DB/DataStreams/IProfilingBlockInputStream.h>
#include <DB/Storages/IStorage.h>
#include <DB/Common/PODArray.h>
namespace DB
{
struct __attribute__((__packed__)) RowSourcePart
{
/// Sequence of members is important to use RowSourcePart * as UInt8 * if flag = false
UInt8 source_id: 7;
UInt8 flag: 1;
RowSourcePart() = default;
RowSourcePart(unsigned source_id_, bool flag_ = false)
{
source_id = source_id_;
flag = flag_;
}
static constexpr size_t MAX_PARTS = 127;
};
using MergedRowSources = PODArray<RowSourcePart>;
/** Gather single stream from multiple streams according to streams mask.
* Stream mask maps row number to index of source stream.
* Streams should conatin exactly one column.
*/
class ColumnGathererStream : public IProfilingBlockInputStream
{
public:
ColumnGathererStream(const BlockInputStreams & source_streams, const MergedRowSources& pos_to_source_idx_,
size_t block_size_ = DEFAULT_BLOCK_SIZE);
String getName() const override { return "ColumnGatherer"; }
String getID() const override;
Block readImpl() override;
private:
const MergedRowSources & pos_to_source_idx;
/// Cache required fileds
struct Source
{
const IColumn * column;
size_t pos;
size_t size;
Block block;
Source(Block && block_) : block(std::move(block_))
{
update();
}
void update()
{
column = block.getByPosition(0).column.get();
size = block.rowsInFirstColumn();
pos = 0;
}
};
std::vector<Source> sources;
size_t pos_global = 0;
size_t block_size;
Logger * log = &Logger::get("ColumnGathererStream");
};
}

View File

@ -9,7 +9,6 @@
#include <DB/Core/SortDescription.h>
#include <DB/DataStreams/IProfilingBlockInputStream.h>
#include <DB/DataStreams/ColumnGathererStream.h>
namespace DB
@ -57,12 +56,10 @@ inline void intrusive_ptr_release(detail::SharedBlock * ptr)
class MergingSortedBlockInputStream : public IProfilingBlockInputStream
{
public:
/// limit - if isn't 0, then we can produce only first limit rows in sorted order.
/// out_row_sources - if isn't nullptr, then at the end of execution it should contain part numbers of each readed row (and needed flag)
MergingSortedBlockInputStream(BlockInputStreams & inputs_, const SortDescription & description_,
size_t max_block_size_, size_t limit_ = 0, MergedRowSources * out_row_sources_ = nullptr)
/// limit - если не 0, то можно выдать только первые limit строк в сортированном порядке.
MergingSortedBlockInputStream(BlockInputStreams inputs_, const SortDescription & description_, size_t max_block_size_, size_t limit_ = 0)
: description(description_), max_block_size(max_block_size_), limit(limit_),
source_blocks(inputs_.size()), cursors(inputs_.size()), out_row_sources(out_row_sources_)
source_blocks(inputs_.size()), cursors(inputs_.size())
{
children.insert(children.end(), inputs_.begin(), inputs_.end());
}
@ -161,10 +158,6 @@ protected:
using QueueWithCollation = std::priority_queue<SortCursorWithCollation>;
QueueWithCollation queue_with_collation;
/// Used in Vertical merge algorithm to gather non-PK columns (on next step)
/// If it is not nullptr then it should be populated during execution
MergedRowSources * out_row_sources = nullptr;
/// Эти методы используются в Collapsing/Summing/Aggregating... SortedBlockInputStream-ах.

View File

@ -32,21 +32,10 @@ struct MergeInfo
UInt64 total_size_bytes_compressed{};
UInt64 total_size_marks{};
std::atomic<UInt64> bytes_read_uncompressed{};
std::atomic<UInt64> bytes_written_uncompressed{};
/// Updated only for Horizontal algorithm
std::atomic<UInt64> rows_read{};
std::atomic<UInt64> bytes_written_uncompressed{};
std::atomic<UInt64> rows_written{};
/// Updated only for Vertical algorithm
/// mutually exclusive with rows_read and rows_written, updated either rows_written either columns_written
std::atomic<UInt64> columns_written{};
/// Updated in both cases
/// Number of rows for which primary key columns have been written
std::atomic<UInt64> rows_with_key_columns_read{};
std::atomic<UInt64> rows_with_key_columns_written{};
MergeInfo(const std::string & database, const std::string & table, const std::string & result_part_name)
: database{database}, table{table}, result_part_name{result_part_name}
@ -63,12 +52,9 @@ struct MergeInfo
total_size_bytes_compressed(other.total_size_bytes_compressed),
total_size_marks(other.total_size_marks),
bytes_read_uncompressed(other.bytes_read_uncompressed.load(std::memory_order_relaxed)),
bytes_written_uncompressed(other.bytes_written_uncompressed.load(std::memory_order_relaxed)),
rows_read(other.rows_read.load(std::memory_order_relaxed)),
rows_written(other.rows_written.load(std::memory_order_relaxed)),
columns_written(other.columns_written.load(std::memory_order_relaxed)),
rows_with_key_columns_read(other.rows_with_key_columns_read.load(std::memory_order_relaxed)),
rows_with_key_columns_written(other.rows_with_key_columns_written.load(std::memory_order_relaxed))
bytes_written_uncompressed(other.bytes_written_uncompressed.load(std::memory_order_relaxed)),
rows_written(other.rows_written.load(std::memory_order_relaxed))
{
}
};

View File

@ -24,7 +24,7 @@ public:
const MarkRanges & mark_ranges_, bool use_uncompressed_cache_,
ExpressionActionsPtr prewhere_actions_, String prewhere_column_, bool check_columns,
size_t min_bytes_to_use_direct_io_, size_t max_read_buffer_size_,
bool save_marks_in_cache_, bool quiet = false);
bool save_marks_in_cache_);
~MergeTreeBlockInputStream() override;

View File

@ -55,11 +55,11 @@ namespace ErrorCodes
* Структура файлов:
* / min-date _ max-date _ min-id _ max-id _ level / - директория с куском.
* Внутри директории с куском:
* checksums.txt - содержит список файлов с их размерами и контрольными суммами.
* columns.txt - содержит список столбцов с их типами.
* checksums.txt - список файлов с их размерами и контрольными суммами.
* columns.txt - список столбцов с их типами.
* primary.idx - индексный файл.
* [Column].bin - данные столбца
* [Column].mrk - засечки, указывающие, откуда начинать чтение, чтобы пропустить n * k строк.
* Column.bin - данные столбца
* Column.mrk - засечки, указывающие, откуда начинать чтение, чтобы пропустить n * k строк.
*
* Имеется несколько режимов работы, определяющих, что делать при мердже:
* - Ordinary - ничего дополнительно не делать;

View File

@ -9,7 +9,6 @@ namespace DB
{
class MergeListEntry;
class MergeProgressCallback;
struct ReshardingJob;
@ -121,19 +120,6 @@ public:
bool isCancelled() const { return cancelled > 0; }
public:
enum class MergeAlgorithm
{
Horizontal, /// per-row merge of all columns
Vertical /// per-row merge of PK columns, per-column gather for non-PK columns
};
private:
MergeAlgorithm chooseMergeAlgorithm(const MergeTreeData & data, const MergeTreeData::DataPartsVector & parts,
size_t rows_upper_bound, MergedRowSources & rows_sources_to_alloc) const;
private:
MergeTreeData & data;
const BackgroundProcessingPool & pool;

View File

@ -46,8 +46,6 @@ struct MergeTreeDataPartChecksums
void addFile(const String & file_name, size_t file_size, uint128 file_hash);
void add(MergeTreeDataPartChecksums && rhs_checksums);
/// Проверяет, что множество столбцов и их контрольные суммы совпадают. Если нет - бросает исключение.
/// Если have_uncompressed, для сжатых файлов сравнивает чексуммы разжатых данных. Иначе сравнивает только чексуммы файлов.
void checkEqual(const MergeTreeDataPartChecksums & rhs, bool have_uncompressed) const;

View File

@ -90,9 +90,6 @@ struct MergeTreeSettings
/// Minimal absolute delay to close, stop serving requests and not return Ok during status check.
size_t min_absolute_delay_to_close = 0;
/// Enable usage of Vertical merge algorithm.
size_t enable_vertical_merge_algorithm = 0;
void loadFromConfig(const String & config_elem, Poco::Util::AbstractConfiguration & config)
{
@ -127,7 +124,6 @@ struct MergeTreeSettings
SET_SIZE_T(min_relative_delay_to_yield_leadership);
SET_SIZE_T(min_relative_delay_to_close);
SET_SIZE_T(min_absolute_delay_to_close);
SET_SIZE_T(enable_vertical_merge_algorithm);
#undef SET_SIZE_T
#undef SET_DOUBLE

View File

@ -33,7 +33,6 @@ public:
{
}
protected:
using OffsetColumns = std::set<std::string>;
@ -307,16 +306,11 @@ public:
throw Exception("Method writeSuffix is not supported by MergedBlockOutputStream", ErrorCodes::NOT_IMPLEMENTED);
}
MergeTreeData::DataPart::Checksums writeSuffixAndGetChecksums(
const NamesAndTypesList & total_column_list,
MergeTreeData::DataPart::Checksums * additional_column_checksums = nullptr)
MergeTreeData::DataPart::Checksums writeSuffixAndGetChecksums()
{
/// Заканчиваем запись и достаем чексуммы.
MergeTreeData::DataPart::Checksums checksums;
if (additional_column_checksums)
checksums = std::move(*additional_column_checksums);
if (storage.merging_params.mode != MergeTreeData::MergingParams::Unsorted)
{
index_stream->next();
@ -325,10 +319,10 @@ public:
index_stream = nullptr;
}
for (auto & column_stream : column_streams)
for (ColumnStreams::iterator it = column_streams.begin(); it != column_streams.end(); ++it)
{
column_stream.second->finalize();
column_stream.second->addToChecksums(checksums);
it->second->finalize();
it->second->addToChecksums(checksums);
}
column_streams.clear();
@ -344,7 +338,7 @@ public:
{
/// Записываем файл с описанием столбцов.
WriteBufferFromFile out(part_path + "columns.txt", 4096);
total_column_list.writeText(out);
columns_list.writeText(out);
}
{
@ -356,11 +350,6 @@ public:
return checksums;
}
MergeTreeData::DataPart::Checksums writeSuffixAndGetChecksums()
{
return writeSuffixAndGetChecksums(columns_list, nullptr);
}
MergeTreeData::DataPart::Index & getIndex()
{
return index_columns;

View File

@ -55,13 +55,6 @@ void CollapsingSortedBlockInputStream::insertRows(ColumnPlainPtrs & merged_colum
++merged_rows;
for (size_t i = 0; i < num_columns; ++i)
merged_columns[i]->insertFrom(*last_negative.columns[i], last_negative.row_num);
if (out_row_sources)
{
/// true flag value means "skip row"
out_row_sources->data()[last_positive_pos].flag = false;
out_row_sources->data()[last_negative_pos].flag = false;
}
}
return;
}
@ -71,9 +64,6 @@ void CollapsingSortedBlockInputStream::insertRows(ColumnPlainPtrs & merged_colum
++merged_rows;
for (size_t i = 0; i < num_columns; ++i)
merged_columns[i]->insertFrom(*first_negative.columns[i], first_negative.row_num);
if (out_row_sources)
out_row_sources->data()[first_negative_pos].flag = false;
}
if (count_positive >= count_negative)
@ -81,9 +71,6 @@ void CollapsingSortedBlockInputStream::insertRows(ColumnPlainPtrs & merged_colum
++merged_rows;
for (size_t i = 0; i < num_columns; ++i)
merged_columns[i]->insertFrom(*last_positive.columns[i], last_positive.row_num);
if (out_row_sources)
out_row_sources->data()[last_positive_pos].flag = false;
}
if (!(count_positive == count_negative || count_positive + 1 == count_negative || count_positive == count_negative + 1))
@ -136,7 +123,7 @@ void CollapsingSortedBlockInputStream::merge(ColumnPlainPtrs & merged_columns, s
size_t merged_rows = 0;
/// Вынимаем строки в нужном порядке и кладём в merged_block, пока строк не больше max_block_size
for (; !queue.empty(); ++current_pos)
while (!queue.empty())
{
TSortCursor current = queue.top();
@ -162,10 +149,6 @@ void CollapsingSortedBlockInputStream::merge(ColumnPlainPtrs & merged_columns, s
queue.pop();
/// Initially, skip all rows. On insert, unskip "corner" rows.
if (out_row_sources)
out_row_sources->emplace_back(current.impl->order, true);
if (key_differs)
{
/// Запишем данные для предыдущего первичного ключа.
@ -183,21 +166,13 @@ void CollapsingSortedBlockInputStream::merge(ColumnPlainPtrs & merged_columns, s
last_is_positive = true;
setRowRef(last_positive, current);
last_positive_pos = current_pos;
}
else if (sign == -1)
{
if (!count_negative)
{
setRowRef(first_negative, current);
first_negative_pos = current_pos;
}
if (!blocks_written && !merged_rows)
{
setRowRef(last_negative, current);
last_negative_pos = current_pos;
}
++count_negative;
last_is_positive = false;

View File

@ -1,102 +0,0 @@
#include <DB/DataStreams/ColumnGathererStream.h>
namespace DB
{
namespace ErrorCodes
{
extern const int LOGICAL_ERROR;
extern const int INCOMPATIBLE_COLUMNS;
extern const int INCORRECT_NUMBER_OF_COLUMNS;
extern const int EMPTY_DATA_PASSED;
extern const int RECEIVED_EMPTY_DATA;
}
ColumnGathererStream::ColumnGathererStream(const BlockInputStreams & source_streams, const MergedRowSources & pos_to_source_idx_, size_t block_size_)
: pos_to_source_idx(pos_to_source_idx_), block_size(block_size_)
{
if (source_streams.empty())
throw Exception("There are no streams to gather", ErrorCodes::EMPTY_DATA_PASSED);
children.assign(source_streams.begin(), source_streams.end());
sources.reserve(children.size());
for (size_t i = 0; i < children.size(); i++)
{
sources.emplace_back(children[i]->read());
Block & block = sources.back().block;
if (block.columns() != 1)
throw Exception("Stream should contain exactly one column", ErrorCodes::INCORRECT_NUMBER_OF_COLUMNS);
if (block.getByPosition(0).column->getName() != sources[0].block.getByPosition(0).column->getName())
throw Exception("Column types don't match", ErrorCodes::INCOMPATIBLE_COLUMNS);
}
}
String ColumnGathererStream::getID() const
{
std::stringstream res;
res << getName() << "(";
for (size_t i = 0; i < children.size(); i++)
res << (i == 0 ? "" : ", " ) << children[i]->getID();
res << ")";
return res.str();
}
Block ColumnGathererStream::readImpl()
{
if (children.size() == 1)
return children[0]->read();
if (pos_global >= pos_to_source_idx.size())
return Block();
Block block_res = sources[0].block.cloneEmpty();
IColumn & column_res = *block_res.getByPosition(0).column;
size_t pos_finish = std::min(pos_global + block_size, pos_to_source_idx.size());
column_res.reserve(pos_finish - pos_global);
for (size_t pos = pos_global; pos < pos_finish; ++pos)
{
auto source_id = pos_to_source_idx[pos].source_id;
bool skip = pos_to_source_idx[pos].flag;
Source & source = sources[source_id];
if (source.pos >= source.size) /// Fetch new block
{
try
{
source.block = children[source_id]->read();
source.update();
}
catch (Exception & e)
{
e.addMessage("Cannot fetch required block. Stream " + children[source_id]->getID() + ", part " + toString(source_id));
throw;
}
if (0 == source.size)
{
throw Exception("Fetched block is empty. Stream " + children[source_id]->getID() + ", part " + toString(source_id),
ErrorCodes::RECEIVED_EMPTY_DATA);
}
}
if (!skip)
column_res.insertFrom(*source.column, source.pos); //TODO: vectorize
++source.pos;
}
pos_global = pos_finish;
return block_res;
}
}

View File

@ -195,10 +195,13 @@ void MergingSortedBlockInputStream::merge(Block & merged_block, ColumnPlainPtrs
return;
}
/// Actually, current.impl->order stores source number (i.e. cursors[current.impl->order] == current.impl)
size_t source_num = current.impl->order;
size_t source_num = 0;
size_t size = cursors.size();
for (; source_num < size; ++source_num)
if (&cursors[source_num] == current.impl)
break;
if (source_num >= cursors.size())
if (source_num == size)
throw Exception("Logical error in MergingSortedBlockInputStream", ErrorCodes::LOGICAL_ERROR);
for (size_t i = 0; i < num_columns; ++i)
@ -221,9 +224,6 @@ void MergingSortedBlockInputStream::merge(Block & merged_block, ColumnPlainPtrs
finished = true;
}
if (out_row_sources)
out_row_sources->resize_fill(out_row_sources->size() + merged_rows, RowSourcePart(source_num));
// std::cerr << "fetching next block\n";
total_merged_rows += merged_rows;
@ -236,12 +236,6 @@ void MergingSortedBlockInputStream::merge(Block & merged_block, ColumnPlainPtrs
for (size_t i = 0; i < num_columns; ++i)
merged_columns[i]->insertFrom(*current->all_columns[i], current->pos);
if (out_row_sources)
{
/// Actually, current.impl->order stores source number (i.e. cursors[current.impl->order] == current.impl)
out_row_sources->emplace_back(current.impl->order);
}
if (!current->isLast())
{
// std::cerr << "moving to next row\n";

View File

@ -21,7 +21,7 @@ MergeTreeBlockInputStream::MergeTreeBlockInputStream(const String & path_, ///
const MarkRanges & mark_ranges_, bool use_uncompressed_cache_,
ExpressionActionsPtr prewhere_actions_, String prewhere_column_, bool check_columns,
size_t min_bytes_to_use_direct_io_, size_t max_read_buffer_size_,
bool save_marks_in_cache_, bool quiet)
bool save_marks_in_cache_)
:
path(path_), block_size(block_size_),
storage(storage_), owned_data_part(owned_data_part_),
@ -97,7 +97,6 @@ MergeTreeBlockInputStream::MergeTreeBlockInputStream(const String & path_, ///
total_rows += range.end - range.begin;
total_rows *= storage.index_granularity;
if (!quiet)
LOG_TRACE(log, "Reading " << all_mark_ranges.size() << " ranges from part " << owned_data_part->name
<< ", approx. " << total_rows
<< (all_mark_ranges.size() > 1

View File

@ -16,13 +16,11 @@
#include <DB/DataStreams/AggregatingSortedBlockInputStream.h>
#include <DB/DataStreams/MaterializingBlockInputStream.h>
#include <DB/DataStreams/ConcatBlockInputStream.h>
#include <DB/DataStreams/ColumnGathererStream.h>
#include <DB/Storages/MergeTree/BackgroundProcessingPool.h>
#include <DB/Common/Increment.h>
#include <DB/Common/interpolate.h>
#include <cmath>
#include <numeric>
namespace ProfileEvents
@ -44,10 +42,6 @@ namespace ErrorCodes
extern const int ABORTED;
}
using MergeAlgorithm = MergeTreeDataMerger::MergeAlgorithm;
namespace
{
@ -280,157 +274,8 @@ MergeTreeData::DataPartsVector MergeTreeDataMerger::selectAllPartsFromPartition(
return parts_from_partition;
}
static void extractOrdinaryAndKeyColumns(const NamesAndTypesList & all_columns, ExpressionActionsPtr primary_key_expressions,
NamesAndTypesList & ordinary_column_names_and_types, Names & ordinary_column_names,
NamesAndTypesList & key_column_names_and_types, Names & key_column_names
)
{
Names key_columns_dup = primary_key_expressions->getRequiredColumns();
std::set<String> key_columns(key_columns_dup.cbegin(), key_columns_dup.cend());
for (auto & column : all_columns)
{
auto it = std::find(key_columns.cbegin(), key_columns.cend(), column.name);
if (key_columns.end() == it)
{
ordinary_column_names_and_types.emplace_back(column);
ordinary_column_names.emplace_back(column.name);
}
else
{
key_column_names_and_types.emplace_back(column);
key_column_names.emplace_back(column.name);
}
}
}
/* Allow to compute more accurate progress statistics */
class ColumnSizeEstimator
{
std::unordered_map<String, size_t> map;
public:
/// Stores approximate size of columns in bytes
/// Exact values are not required since it used for relative values estimation (progress).
size_t sum_total = 0;
size_t sum_index_columns = 0;
size_t sum_ordinary_columns = 0;
ColumnSizeEstimator(MergeTreeData::DataPartsVector & parts, const Names & key_columns, const Names & ordinary_columns)
{
if (parts.empty())
return;
for (const auto & name_and_type : parts.front()->columns)
map[name_and_type.name] = 0;
for (const auto & part : parts)
{
for (const auto & name_and_type : parts.front()->columns)
map.at(name_and_type.name) += part->getColumnSize(name_and_type.name);
}
for (const auto & name : key_columns)
sum_index_columns += map.at(name);
for (const auto & name : ordinary_columns)
sum_ordinary_columns += map.at(name);
sum_total = sum_index_columns + sum_ordinary_columns;
}
/// Approximate size of num_rows column elements if column contains num_total_rows elements
Float64 columnSize(const String & column, size_t num_rows, size_t num_total_rows) const
{
return static_cast<Float64>(map.at(column)) / num_total_rows * num_rows;
}
/// Relative size of num_rows column elements (in comparison with overall size of all columns) if column contains num_total_rows elements
Float64 columnProgress(const String & column, size_t num_rows, size_t num_total_rows) const
{
return columnSize(column, num_rows, num_total_rows) / sum_total;
}
/// Like columnSize, but takes into account only PK columns
Float64 keyColumnsSize(size_t num_rows, size_t num_total_rows) const
{
return static_cast<Float64>(sum_index_columns) / num_total_rows * num_rows;
}
/// Like columnProgress, but takes into account only PK columns
Float64 keyColumnsProgress(size_t num_rows, size_t num_total_rows) const
{
return keyColumnsSize(num_rows, num_total_rows) / sum_total;
}
};
class MergeProgressCallback : public ProgressCallback
{
public:
MergeProgressCallback(MergeList::Entry & merge_entry_) : merge_entry(merge_entry_) {}
MergeProgressCallback(MergeList::Entry & merge_entry_, MergeTreeDataMerger::MergeAlgorithm merge_alg_, size_t num_total_rows,
const ColumnSizeEstimator & column_sizes)
: merge_entry(merge_entry_), merge_alg(merge_alg_)
{
if (merge_alg == MergeAlgorithm::Horizontal)
average_elem_progress = 1.0 / num_total_rows;
else
average_elem_progress = column_sizes.keyColumnsProgress(1, num_total_rows);
}
MergeList::Entry & merge_entry;
const MergeAlgorithm merge_alg{MergeAlgorithm::Vertical};
Float64 average_elem_progress;
void operator() (const Progress & value)
{
ProfileEvents::increment(ProfileEvents::MergedUncompressedBytes, value.bytes);
merge_entry->bytes_read_uncompressed += value.bytes;
merge_entry->rows_with_key_columns_read += value.rows;
if (merge_alg == MergeAlgorithm::Horizontal)
{
ProfileEvents::increment(ProfileEvents::MergedRows, value.rows);
merge_entry->rows_read += value.rows;
merge_entry->progress = average_elem_progress * merge_entry->rows_read;
}
else
{
merge_entry->progress = average_elem_progress * merge_entry->rows_with_key_columns_read;
}
};
};
class MergeProgressCallbackVerticalStep : public MergeProgressCallback
{
public:
MergeProgressCallbackVerticalStep(MergeList::Entry & merge_entry_, size_t num_total_rows_exact,
const ColumnSizeEstimator & column_sizes, const String & column_name)
: MergeProgressCallback(merge_entry_), initial_progress(merge_entry->progress)
{
average_elem_progress = column_sizes.columnProgress(column_name, 1, num_total_rows_exact);
}
Float64 initial_progress;
/// NOTE: not thread safe (to be copyable). It is OK in current single thread use case
size_t rows_read_internal{0};
void operator() (const Progress & value)
{
merge_entry->bytes_read_uncompressed += value.bytes;
ProfileEvents::increment(ProfileEvents::MergedUncompressedBytes, value.bytes);
rows_read_internal += value.rows;
Float64 local_progress = average_elem_progress * rows_read_internal;
merge_entry->progress = initial_progress + local_progress;
};
};
/// parts should be sorted.
/// parts должны быть отсортированы.
MergeTreeData::MutableDataPartPtr MergeTreeDataMerger::mergePartsToTemporaryPart(
MergeTreeData::DataPartsVector & parts, const String & merged_name, MergeList::Entry & merge_entry,
size_t aio_threshold, time_t time_of_merge, DiskSpaceMonitor::Reservation * disk_reservation)
@ -461,58 +306,49 @@ MergeTreeData::MutableDataPartPtr MergeTreeDataMerger::mergePartsToTemporaryPart
part->accumulateColumnSizes(merged_column_to_size);
}
Names all_column_names = data.getColumnNamesList();
NamesAndTypesList all_column_names_and_types = data.getColumnsList();
SortDescription sort_desc = data.getSortDescription();
NamesAndTypesList ordinary_column_names_and_types, key_column_names_and_types;
Names ordinary_column_names, key_column_names;
extractOrdinaryAndKeyColumns(all_column_names_and_types, data.getPrimaryExpression(),
ordinary_column_names_and_types, ordinary_column_names,
key_column_names_and_types, key_column_names
);
Names column_names = data.getColumnNamesList();
NamesAndTypesList column_names_and_types = data.getColumnsList();
MergeTreeData::MutableDataPartPtr new_data_part = std::make_shared<MergeTreeData::DataPart>(data);
ActiveDataPartSet::parsePartName(merged_name, *new_data_part);
new_data_part->name = "tmp_" + merged_name;
new_data_part->is_temp = true;
size_t sum_input_rows_upper_bound = merge_entry->total_size_marks * data.index_granularity;
MergedRowSources merged_rows_sources;
MergeAlgorithm merge_alg = chooseMergeAlgorithm(data, parts, sum_input_rows_upper_bound, merged_rows_sources);
LOG_DEBUG(log, "Selected MergeAlgorithm: " << ((merge_alg == MergeAlgorithm::Vertical) ? "Vertical" : "Horizontal"));
MergedRowSources * merged_rows_sources_ptr = (merge_alg == MergeAlgorithm::Vertical)
? &merged_rows_sources : nullptr;
Names & main_column_names = (merge_alg == MergeAlgorithm::Vertical)
? key_column_names : all_column_names;
NamesAndTypesList & main_column_names_and_types = (merge_alg == MergeAlgorithm::Vertical)
? key_column_names_and_types : all_column_names_and_types;
ColumnSizeEstimator column_sizes(parts, key_column_names, ordinary_column_names);
/** Читаем из всех кусков, сливаем и пишем в новый.
* Попутно вычисляем выражение для сортировки.
*/
BlockInputStreams src_streams;
size_t sum_rows_approx = 0;
const auto rows_total = merge_entry->total_size_marks * data.index_granularity;
for (size_t i = 0; i < parts.size(); ++i)
{
MarkRanges ranges{{0, parts[i]->size}};
String part_path = data.getFullPath() + parts[i]->name + '/';
auto input = std::make_unique<MergeTreeBlockInputStream>(
part_path, DEFAULT_MERGE_BLOCK_SIZE, main_column_names, data, parts[i],
MarkRanges(1, MarkRange(0, parts[i]->size)), false, nullptr, "", true, aio_threshold, DBMS_DEFAULT_BUFFER_SIZE, false);
part_path, DEFAULT_MERGE_BLOCK_SIZE, column_names, data,
parts[i], ranges, false, nullptr, "", true, aio_threshold, DBMS_DEFAULT_BUFFER_SIZE, false);
input->setProgressCallback(MergeProgressCallback{merge_entry, merge_alg, sum_input_rows_upper_bound, column_sizes});
input->setProgressCallback([&merge_entry, rows_total] (const Progress & value)
{
const auto new_rows_read = merge_entry->rows_read += value.rows;
merge_entry->progress = static_cast<Float64>(new_rows_read) / rows_total;
merge_entry->bytes_read_uncompressed += value.bytes;
ProfileEvents::increment(ProfileEvents::MergedRows, value.rows);
ProfileEvents::increment(ProfileEvents::MergedUncompressedBytes, value.bytes);
});
if (data.merging_params.mode != MergeTreeData::MergingParams::Unsorted)
src_streams.emplace_back(std::make_shared<MaterializingBlockInputStream>(
std::make_shared<ExpressionBlockInputStream>(BlockInputStreamPtr(std::move(input)), data.getPrimaryExpression())));
else
src_streams.emplace_back(std::move(input));
sum_rows_approx += parts[i]->size * data.index_granularity;
}
/// Порядок потоков важен: при совпадении ключа элементы идут в порядке номера потока-источника.
@ -524,32 +360,32 @@ MergeTreeData::MutableDataPartPtr MergeTreeDataMerger::mergePartsToTemporaryPart
{
case MergeTreeData::MergingParams::Ordinary:
merged_stream = std::make_unique<MergingSortedBlockInputStream>(
src_streams, sort_desc, DEFAULT_MERGE_BLOCK_SIZE, 0, merged_rows_sources_ptr);
src_streams, data.getSortDescription(), DEFAULT_MERGE_BLOCK_SIZE);
break;
case MergeTreeData::MergingParams::Collapsing:
merged_stream = std::make_unique<CollapsingSortedBlockInputStream>(
src_streams, sort_desc, data.merging_params.sign_column, DEFAULT_MERGE_BLOCK_SIZE, merged_rows_sources_ptr);
src_streams, data.getSortDescription(), data.merging_params.sign_column, DEFAULT_MERGE_BLOCK_SIZE);
break;
case MergeTreeData::MergingParams::Summing:
merged_stream = std::make_unique<SummingSortedBlockInputStream>(
src_streams, sort_desc, data.merging_params.columns_to_sum, DEFAULT_MERGE_BLOCK_SIZE);
src_streams, data.getSortDescription(), data.merging_params.columns_to_sum, DEFAULT_MERGE_BLOCK_SIZE);
break;
case MergeTreeData::MergingParams::Aggregating:
merged_stream = std::make_unique<AggregatingSortedBlockInputStream>(
src_streams, sort_desc, DEFAULT_MERGE_BLOCK_SIZE);
src_streams, data.getSortDescription(), DEFAULT_MERGE_BLOCK_SIZE);
break;
case MergeTreeData::MergingParams::Replacing:
merged_stream = std::make_unique<ReplacingSortedBlockInputStream>(
src_streams, sort_desc, data.merging_params.version_column, DEFAULT_MERGE_BLOCK_SIZE);
src_streams, data.getSortDescription(), data.merging_params.version_column, DEFAULT_MERGE_BLOCK_SIZE);
break;
case MergeTreeData::MergingParams::Graphite:
merged_stream = std::make_unique<GraphiteRollupSortedBlockInputStream>(
src_streams, sort_desc, DEFAULT_MERGE_BLOCK_SIZE,
src_streams, data.getSortDescription(), DEFAULT_MERGE_BLOCK_SIZE,
data.merging_params.graphite_params, time_of_merge);
break;
@ -568,7 +404,7 @@ MergeTreeData::MutableDataPartPtr MergeTreeDataMerger::mergePartsToTemporaryPart
static_cast<double>(merge_entry->total_size_bytes_compressed) / data.getTotalActiveSizeInBytes());
MergedBlockOutputStream to{
data, new_part_tmp_path, main_column_names_and_types, compression_method, merged_column_to_size, aio_threshold};
data, new_part_tmp_path, column_names_and_types, compression_method, merged_column_to_size, aio_threshold};
merged_stream->readPrefix();
to.writePrefix();
@ -582,86 +418,19 @@ MergeTreeData::MutableDataPartPtr MergeTreeDataMerger::mergePartsToTemporaryPart
rows_written += block.rows();
to.write(block);
if (merge_alg == MergeAlgorithm::Horizontal)
merge_entry->rows_written = merged_stream->getProfileInfo().rows;
merge_entry->rows_with_key_columns_written = merged_stream->getProfileInfo().rows;
merge_entry->rows_written = merged_stream->getProfileInfo().rows;
merge_entry->bytes_written_uncompressed = merged_stream->getProfileInfo().bytes;
/// This update is unactual for VERTICAL algorithm sicne it requires more accurate per-column updates
/// Reservation updates is not performed yet, during the merge it may lead to higher free space requirements
if (disk_reservation && merge_alg == MergeAlgorithm::Horizontal)
{
Float64 relative_rows_written = std::min(1., 1. * rows_written / sum_input_rows_upper_bound);
disk_reservation->update(static_cast<size_t>((1. - relative_rows_written) * initial_reservation));
}
if (disk_reservation)
disk_reservation->update(static_cast<size_t>((1 - std::min(1., 1. * rows_written / sum_rows_approx)) * initial_reservation));
}
if (isCancelled())
throw Exception("Cancelled merging parts", ErrorCodes::ABORTED);
MergeTreeData::DataPart::Checksums checksums_ordinary_columns;
/// Gather ordinary columns
if (merge_alg == MergeAlgorithm::Vertical)
{
size_t sum_input_rows_exact = merge_entry->rows_with_key_columns_read;
merge_entry->columns_written = key_column_names.size();
merge_entry->progress = column_sizes.keyColumnsProgress(sum_input_rows_exact, sum_input_rows_exact);
BlockInputStreams column_part_streams(parts.size());
auto it_name_and_type = ordinary_column_names_and_types.cbegin();
for (size_t column_num = 0; column_num < ordinary_column_names.size(); ++column_num)
{
const String & column_name = ordinary_column_names[column_num];
Names column_name_(1, column_name);
NamesAndTypesList column_name_and_type_(1, *it_name_and_type++);
Float64 progress_before = merge_entry->progress;
LOG_TRACE(log, "Gathering column " << column_name << " " << column_name_and_type_.front().type->getName());
for (size_t part_num = 0; part_num < parts.size(); ++part_num)
{
String part_path = data.getFullPath() + parts[part_num]->name + '/';
/// TODO: test perfomance with more accurate settings
auto column_part_stream = std::make_shared<MergeTreeBlockInputStream>(
part_path, DEFAULT_MERGE_BLOCK_SIZE, column_name_, data, parts[part_num],
MarkRanges(1, MarkRange(0, parts[part_num]->size)), false, nullptr, "", true, aio_threshold, DBMS_DEFAULT_BUFFER_SIZE,
false, true);
column_part_stream->setProgressCallback(
MergeProgressCallbackVerticalStep{merge_entry, sum_input_rows_exact, column_sizes, column_name});
column_part_streams[part_num] = std::move(column_part_stream);
}
ColumnGathererStream column_gathered_stream(column_part_streams, merged_rows_sources, DEFAULT_BLOCK_SIZE);
MergedColumnOnlyOutputStream column_to(data, new_part_tmp_path, true, compression_method);
column_to.writePrefix();
while ((block = column_gathered_stream.read()))
{
column_to.write(block);
}
/// NOTE: nested column contains duplicates checksums (and files)
checksums_ordinary_columns.add(column_to.writeSuffixAndGetChecksums());
merge_entry->columns_written = key_column_names.size() + column_num;
merge_entry->bytes_written_uncompressed += column_gathered_stream.getProfileInfo().bytes;
merge_entry->progress = progress_before + column_sizes.columnProgress(column_name, sum_input_rows_exact, sum_input_rows_exact);
if (isCancelled())
throw Exception("Cancelled merging parts", ErrorCodes::ABORTED);
}
}
merged_stream->readSuffix();
new_data_part->columns = all_column_names_and_types;
if (merge_alg != MergeAlgorithm::Vertical)
new_data_part->checksums = to.writeSuffixAndGetChecksums();
else
new_data_part->checksums = to.writeSuffixAndGetChecksums(all_column_names_and_types, &checksums_ordinary_columns);
new_data_part->columns = column_names_and_types;
new_data_part->checksums = to.writeSuffixAndGetChecksums();
new_data_part->index.swap(to.getIndex());
/// Для удобства, даже CollapsingSortedBlockInputStream не может выдать ноль строк.
@ -677,43 +446,6 @@ MergeTreeData::MutableDataPartPtr MergeTreeDataMerger::mergePartsToTemporaryPart
}
MergeTreeDataMerger::MergeAlgorithm MergeTreeDataMerger::chooseMergeAlgorithm(
const MergeTreeData & data, const MergeTreeData::DataPartsVector & parts,
size_t sum_rows_upper_bound, MergedRowSources & rows_sources_to_alloc) const
{
if (data.context.getMergeTreeSettings().enable_vertical_merge_algorithm == 0)
return MergeAlgorithm::Horizontal;
bool is_supported_storage =
data.merging_params.mode == MergeTreeData::MergingParams::Ordinary ||
data.merging_params.mode == MergeTreeData::MergingParams::Collapsing;
bool enough_ordinary_cols = data.getColumnNamesList().size() > data.getSortDescription().size();
bool enough_total_rows = sum_rows_upper_bound >= DEFAULT_MERGE_BLOCK_SIZE;
bool no_parts_overflow = parts.size() <= RowSourcePart::MAX_PARTS;
auto merge_alg = (is_supported_storage && enough_total_rows && enough_ordinary_cols && no_parts_overflow) ?
MergeAlgorithm::Vertical : MergeAlgorithm::Horizontal;
if (merge_alg == MergeAlgorithm::Vertical)
{
try
{
rows_sources_to_alloc.reserve(sum_rows_upper_bound);
}
catch (...)
{
/// Not enough memory for VERTICAL merge algorithm, make sense for very large tables
merge_alg = MergeAlgorithm::Horizontal;
}
}
return merge_alg;
}
MergeTreeData::DataPartPtr MergeTreeDataMerger::renameMergedTemporaryPart(
MergeTreeData::DataPartsVector & parts,
MergeTreeData::MutableDataPartPtr & new_data_part,

View File

@ -222,14 +222,6 @@ void MergeTreeDataPartChecksums::addFile(const String & file_name, size_t file_s
files[file_name] = Checksum(file_size, file_hash);
}
void MergeTreeDataPartChecksums::add(MergeTreeDataPartChecksums && rhs_checksums)
{
for (auto & checksum : rhs_checksums.files)
files[std::move(checksum.first)] = std::move(checksum.second);
rhs_checksums.files.clear();
}
/// Контрольная сумма от множества контрольных сумм .bin файлов.
void MergeTreeDataPartChecksums::summaryDataChecksum(SipHash & hash) const
{

View File

@ -691,7 +691,7 @@ BlockInputStreams StorageLog::read(
max_block_size,
column_names,
*this,
0, marksCount() ? std::numeric_limits<size_t>::max() : 0,
0, std::numeric_limits<size_t>::max(),
settings.max_read_buffer_size));
}
else

View File

@ -24,10 +24,7 @@ StorageSystemMerges::StorageSystemMerges(const std::string & name)
{ "bytes_read_uncompressed", std::make_shared<DataTypeUInt64>() },
{ "rows_read", std::make_shared<DataTypeUInt64>() },
{ "bytes_written_uncompressed", std::make_shared<DataTypeUInt64>() },
{ "rows_written", std::make_shared<DataTypeUInt64>() },
{ "columns_written", std::make_shared<DataTypeUInt64>() },
{ "rows_with_key_columns_read", std::make_shared<DataTypeUInt64>() },
{ "rows_with_key_columns_written", std::make_shared<DataTypeUInt64>() }
{ "rows_written", std::make_shared<DataTypeUInt64>() }
}
{
}
@ -61,16 +58,13 @@ BlockInputStreams StorageSystemMerges::read(
ColumnWithTypeAndName col_rows_read{std::make_shared<ColumnUInt64>(), std::make_shared<DataTypeUInt64>(), "rows_read"};
ColumnWithTypeAndName col_bytes_written_uncompressed{std::make_shared<ColumnUInt64>(), std::make_shared<DataTypeUInt64>(), "bytes_written_uncompressed"};
ColumnWithTypeAndName col_rows_written{std::make_shared<ColumnUInt64>(), std::make_shared<DataTypeUInt64>(), "rows_written"};
ColumnWithTypeAndName col_columns_written{std::make_shared<ColumnUInt64>(), std::make_shared<DataTypeUInt64>(), "columns_written"};
ColumnWithTypeAndName col_rows_with_key_columns_read{std::make_shared<ColumnUInt64>(), std::make_shared<DataTypeUInt64>(), "rows_with_key_columns_read"};
ColumnWithTypeAndName col_rows_with_key_columns_written{std::make_shared<ColumnUInt64>(), std::make_shared<DataTypeUInt64>(), "rows_with_key_columns_written"};
for (const auto & merge : context.getMergeList().get())
{
col_database.column->insert(merge.database);
col_table.column->insert(merge.table);
col_elapsed.column->insert(merge.watch.elapsedSeconds());
col_progress.column->insert(std::min(1., merge.progress)); /// little cheat
col_progress.column->insert(merge.progress);
col_num_parts.column->insert(merge.num_parts);
col_result_part_name.column->insert(merge.result_part_name);
col_total_size_bytes_compressed.column->insert(merge.total_size_bytes_compressed);
@ -79,9 +73,6 @@ BlockInputStreams StorageSystemMerges::read(
col_rows_read.column->insert(merge.rows_read.load(std::memory_order_relaxed));
col_bytes_written_uncompressed.column->insert(merge.bytes_written_uncompressed.load(std::memory_order_relaxed));
col_rows_written.column->insert(merge.rows_written.load(std::memory_order_relaxed));
col_columns_written.column->insert(merge.columns_written.load(std::memory_order_relaxed));
col_rows_with_key_columns_read.column->insert(merge.rows_with_key_columns_read.load(std::memory_order_relaxed));
col_rows_with_key_columns_written.column->insert(merge.rows_with_key_columns_written.load(std::memory_order_relaxed));
}
Block block{
@ -96,10 +87,7 @@ BlockInputStreams StorageSystemMerges::read(
col_bytes_read_uncompressed,
col_rows_read,
col_bytes_written_uncompressed,
col_rows_written,
col_columns_written,
col_rows_with_key_columns_read,
col_rows_with_key_columns_written
col_rows_written
};
return BlockInputStreams{1, std::make_shared<OneBlockInputStream>(block)};