added MultiversionMergeTree [#CLICKHOUSE-3479]

This commit is contained in:
Nikolai Kochetov 2018-01-29 20:42:19 +03:00 committed by alexey-milovidov
parent 3656aa0606
commit 66b94d481a
15 changed files with 556 additions and 119 deletions

View File

@ -34,8 +34,6 @@ Block AggregatingSortedBlockInputStream::readImpl()
/// Additional initialization.
if (next_key.empty())
{
next_key.columns.resize(description.size());
/// Fill in the column numbers that need to be aggregated.
for (size_t i = 0; i < num_columns; ++i)
{
@ -88,7 +86,6 @@ void AggregatingSortedBlockInputStream::merge(MutableColumns & merged_columns, s
if (current_key.empty()) /// The first key encountered.
{
current_key.columns.resize(description.size());
setPrimaryKeyRef(current_key, current);
key_differs = true;
}

View File

@ -27,7 +27,7 @@ void CollapsingSortedBlockInputStream::reportIncorrectData()
{
if (i != 0)
s << ", ";
s << applyVisitor(FieldVisitorToString(), (*current_key.columns[i])[current_key.row_num]);
s << applyVisitor(FieldVisitorToString(), (*(*current_key.columns)[i])[current_key.row_num]);
}
s << ").";
@ -53,10 +53,10 @@ void CollapsingSortedBlockInputStream::insertRows(MutableColumns & merged_column
LOG_INFO(log, "All rows collapsed");
++merged_rows;
for (size_t i = 0; i < num_columns; ++i)
merged_columns[i]->insertFrom(*last_positive.columns[i], last_positive.row_num);
merged_columns[i]->insertFrom(*(*last_positive.columns)[i], last_positive.row_num);
++merged_rows;
for (size_t i = 0; i < num_columns; ++i)
merged_columns[i]->insertFrom(*last_negative.columns[i], last_negative.row_num);
merged_columns[i]->insertFrom(*(*last_negative.columns)[i], last_negative.row_num);
if (out_row_sources_buf)
{
@ -72,7 +72,7 @@ void CollapsingSortedBlockInputStream::insertRows(MutableColumns & merged_column
{
++merged_rows;
for (size_t i = 0; i < num_columns; ++i)
merged_columns[i]->insertFrom(*first_negative.columns[i], first_negative.row_num);
merged_columns[i]->insertFrom(*(*first_negative.columns)[i], first_negative.row_num);
if (out_row_sources_buf)
current_row_sources[first_negative_pos].setSkipFlag(false);
@ -82,7 +82,7 @@ void CollapsingSortedBlockInputStream::insertRows(MutableColumns & merged_column
{
++merged_rows;
for (size_t i = 0; i < num_columns; ++i)
merged_columns[i]->insertFrom(*last_positive.columns[i], last_positive.row_num);
merged_columns[i]->insertFrom(*(*last_positive.columns)[i], last_positive.row_num);
if (out_row_sources_buf)
current_row_sources[last_positive_pos].setSkipFlag(false);
@ -124,13 +124,8 @@ Block CollapsingSortedBlockInputStream::readImpl()
/// Additional initialization.
if (first_negative.empty())
{
first_negative.columns.resize(num_columns);
last_negative.columns.resize(num_columns);
last_positive.columns.resize(num_columns);
sign_column_number = header.getPositionByName(sign_column);
}
merge(merged_columns, queue);
return header.cloneWithColumns(std::move(merged_columns));
@ -147,12 +142,7 @@ void CollapsingSortedBlockInputStream::merge(MutableColumns & merged_columns, st
SortCursor current = queue.top();
if (current_key.empty())
{
current_key.columns.resize(description.size());
next_key.columns.resize(description.size());
setPrimaryKeyRef(current_key, current);
}
Int8 sign = static_cast<const ColumnInt8 &>(*current->all_columns[sign_column_number]).getData()[current->pos];
setPrimaryKeyRef(next_key, current);

View File

@ -25,8 +25,7 @@ class CollapsingSortedBlockInputStream : public MergingSortedBlockInputStream
public:
CollapsingSortedBlockInputStream(
BlockInputStreams inputs_, const SortDescription & description_,
const String & sign_column_, size_t max_block_size_,
WriteBuffer * out_row_sources_buf_ = nullptr)
const String & sign_column_, size_t max_block_size_, WriteBuffer * out_row_sources_buf_ = nullptr)
: MergingSortedBlockInputStream(inputs_, description_, max_block_size_, 0, out_row_sources_buf_)
, sign_column(sign_column_)
{

View File

@ -98,9 +98,6 @@ Block GraphiteRollupSortedBlockInputStream::readImpl()
for (size_t i = 0; i < num_columns; ++i)
if (i != time_column_num && i != value_column_num && i != version_column_num)
unmodified_column_numbers.push_back(i);
if (current_subgroup_newest_row.empty())
current_subgroup_newest_row.columns.resize(num_columns);
}
merge(merged_columns, queue);
@ -257,14 +254,14 @@ void GraphiteRollupSortedBlockInputStream::finishCurrentGroup(MutableColumns & m
}
else
merged_columns[value_column_num]->insertFrom(
*current_subgroup_newest_row.columns[value_column_num], current_subgroup_newest_row.row_num);
*(*current_subgroup_newest_row.columns)[value_column_num], current_subgroup_newest_row.row_num);
}
void GraphiteRollupSortedBlockInputStream::accumulateRow(RowRef & row)
{
if (aggregate_state_created)
current_pattern->function->add(place_for_aggregate_state.data(), &row.columns[value_column_num], row.row_num, nullptr);
current_pattern->function->add(place_for_aggregate_state.data(), &(*row.columns)[value_column_num], row.row_num, nullptr);
}
}

View File

@ -53,10 +53,9 @@ void MergingSortedBlockInputStream::init(Block & header, MutableColumns & merged
{
first = false;
size_t i = 0;
for (auto it = source_blocks.begin(); it != source_blocks.end(); ++it, ++i)
for (size_t i = 0; i < source_blocks.size(); ++i)
{
SharedBlockPtr & shared_block_ptr = *it;
SharedBlockPtr & shared_block_ptr = source_blocks[i];
if (shared_block_ptr.get())
continue;
@ -75,6 +74,8 @@ void MergingSortedBlockInputStream::init(Block & header, MutableColumns & merged
expected_block_size = std::min(rows, max_block_size);
cursors[i] = SortCursorImpl(*shared_block_ptr, description, i);
shared_block_ptr->all_columns = cursors[i].all_columns;
shared_block_ptr->sort_columns = cursors[i].sort_columns;
has_collation |= cursors[i].has_collation;
}
@ -173,25 +174,20 @@ Block MergingSortedBlockInputStream::readImpl()
template <typename TSortCursor>
void MergingSortedBlockInputStream::fetchNextBlock(const TSortCursor & current, std::priority_queue<TSortCursor> & queue)
{
size_t i = 0;
size_t order = current.impl->order;
size_t size = cursors.size();
for (; i < size; ++i)
{
if (&cursors[i] == current.impl)
{
source_blocks[i] = new detail::SharedBlock(children[i]->read());
if (*source_blocks[i])
{
cursors[i].reset(*source_blocks[i]);
queue.push(TSortCursor(&cursors[i]));
}
break;
}
}
if (i == size)
if (order >= size || &cursors[order] != current.impl)
throw Exception("Logical error in MergingSortedBlockInputStream", ErrorCodes::LOGICAL_ERROR);
source_blocks[order] = new detail::SharedBlock(children[order]->read());
if (*source_blocks[order])
{
cursors[order].reset(*source_blocks[order]);
queue.push(TSortCursor(&cursors[order]));
source_blocks[order]->all_columns = cursors[order].all_columns;
source_blocks[order]->sort_columns = cursors[order].sort_columns;
}
}
template

View File

@ -30,13 +30,15 @@ namespace ErrorCodes
/// The reference counter is not atomic, since it is used from one thread.
namespace detail
{
struct SharedBlock : Block
{
int refcount = 0;
struct SharedBlock : Block
{
int refcount = 0;
SharedBlock(Block && value_)
: Block(std::move(value_)) {};
};
ColumnRawPtrs all_columns;
ColumnRawPtrs sort_columns;
SharedBlock(Block && block) : Block(std::move(block)) {}
};
}
using SharedBlockPtr = boost::intrusive_ptr<detail::SharedBlock>;
@ -77,7 +79,7 @@ public:
protected:
struct RowRef
{
ColumnRawPtrs columns;
ColumnRawPtrs * columns = nullptr;
size_t row_num;
SharedBlockPtr shared_block;
@ -91,9 +93,9 @@ protected:
/// The number and types of columns must match.
bool operator==(const RowRef & other) const
{
size_t size = columns.size();
size_t size = columns->size();
for (size_t i = 0; i < size; ++i)
if (0 != columns[i]->compareAt(row_num, other.row_num, *other.columns[i], 1))
if (0 != (*columns)[i]->compareAt(row_num, other.row_num, *(*other.columns)[i], 1))
return false;
return true;
}
@ -103,8 +105,8 @@ protected:
return !(*this == other);
}
bool empty() const { return columns.empty(); }
size_t size() const { return columns.size(); }
bool empty() const { return columns == nullptr; }
size_t size() const { return empty() ? 0 : columns->size(); }
};
@ -190,9 +192,7 @@ protected:
{
row_ref.row_num = cursor.impl->pos;
row_ref.shared_block = source_blocks[cursor.impl->order];
for (size_t i = 0; i < num_columns; ++i)
row_ref.columns[i] = cursor->all_columns[i];
row_ref.columns = &row_ref.shared_block->all_columns;
}
template <typename TSortCursor>
@ -200,9 +200,7 @@ protected:
{
row_ref.row_num = cursor.impl->pos;
row_ref.shared_block = source_blocks[cursor.impl->order];
for (size_t i = 0; i < cursor->sort_columns_size; ++i)
row_ref.columns[i] = cursor->sort_columns[i];
row_ref.columns = &row_ref.shared_block->sort_columns;
}
private:

View File

@ -0,0 +1,183 @@
#include <Common/FieldVisitors.h>
#include <DataStreams/MultiversionSortedBlockInputStream.h>
#include <Columns/ColumnsNumber.h>
namespace DB
{
namespace ErrorCodes
{
extern const int LOGICAL_ERROR;
}
inline ALWAYS_INLINE static void writeRowSourcePart(WriteBuffer & buffer, RowSourcePart row_source)
{
if constexpr (sizeof(RowSourcePart) == 1)
buffer.write(*reinterpret_cast<const char *>(&row_source));
else
buffer.write(reinterpret_cast<const char *>(&row_source), sizeof(RowSourcePart));
}
void MultiversionSortedBlockInputStream::insertGap(size_t gap_size)
{
if (out_row_sources_buf)
{
for (size_t i = 0; i < gap_size; ++i)
{
writeRowSourcePart(*out_row_sources_buf, current_row_sources.front());
current_row_sources.pop();
}
}
}
void MultiversionSortedBlockInputStream::insertRow(size_t skip_rows, const RowRef & row, MutableColumns & merged_columns)
{
const auto & columns = row.shared_block->all_columns;
for (size_t i = 0; i < num_columns; ++i)
merged_columns[i]->insertFrom(*columns[i], row.row_num);
insertGap(skip_rows);
if (out_row_sources_buf)
{
writeRowSourcePart(*out_row_sources_buf, current_row_sources.front());
current_row_sources.pop();
}
}
Block MultiversionSortedBlockInputStream::readImpl()
{
if (finished)
return {};
if (children.size() == 1)
return children[0]->read();
Block header;
MutableColumns merged_columns;
bool is_initialized = !first;
init(header, merged_columns);
if (has_collation)
throw Exception("Logical error: " + getName() + " does not support collations", ErrorCodes::LOGICAL_ERROR);
if (merged_columns.empty())
return {};
/// Additional initialization.
if (!is_initialized)
sign_column_number = header.getPositionByName(sign_column);
merge(merged_columns, queue);
return header.cloneWithColumns(std::move(merged_columns));
}
void MultiversionSortedBlockInputStream::merge(MutableColumns & merged_columns, std::priority_queue<SortCursor> & queue)
{
size_t merged_rows = 0;
auto update_queue = [this, & queue](SortCursor cursor)
{
queue.pop();
if (out_row_sources_buf)
current_row_sources.emplace(cursor->order, true);
if (!cursor->isLast())
{
cursor->next();
queue.push(cursor);
}
else
{
/// We take next block from the corresponding source, if there is one.
fetchNextBlock(cursor, queue);
}
};
/// Take rows in correct order and put them into `merged_columns` until the rows no more than `max_block_size`
while (!queue.empty())
{
SortCursor current = queue.top();
RowRef next_key;
Int8 sign = static_cast<const ColumnInt8 &>(*current->all_columns[sign_column_number]).getData()[current->pos];
setPrimaryKeyRef(next_key, current);
size_t rows_to_merge = 0;
/// Each branch either updates queue or increases rows_to_merge.
if (current_keys.empty())
{
sign_in_queue = sign;
current_keys.pushBack(next_key);
update_queue(current);
}
else
{
if (current_keys.back() == next_key)
{
update_queue(current);
/// If all the rows was collapsed, we still want to give at least one block in the result.
/// If queue is empty then don't collapse two last rows.
if (sign == sign_in_queue || (!can_collapse_all_rows && blocks_written == 0
&& merged_rows == 0 && queue.empty() && current_keys.size() == 1))
current_keys.pushBack(next_key);
else
{
current_keys.popBack();
current_keys.pushGap(2);
}
}
else
rows_to_merge = current_keys.size();
}
if (current_keys.size() > max_rows_in_queue)
rows_to_merge = std::max(rows_to_merge, current_keys.size() - max_rows_in_queue);
while (rows_to_merge)
{
if (merged_rows >= max_block_size)
{
++blocks_written;
return;
}
const auto & row = current_keys.front();
auto gap = current_keys.frontGap();
insertRow(gap, row, merged_columns);
current_keys.popFront();
++merged_rows;
--rows_to_merge;
}
}
while (!current_keys.empty())
{
const auto & row = current_keys.front();
auto gap = current_keys.frontGap();
insertRow(gap, row, merged_columns);
current_keys.popFront();
++merged_rows;
}
/// Write information about last collapsed rows.
insertGap(current_keys.frontGap());
finished = true;
}
}

View File

@ -0,0 +1,197 @@
#pragma once
#include <common/logger_useful.h>
#include <DataStreams/MergingSortedBlockInputStream.h>
#include <deque>
namespace DB
{
static const size_t MAX_ROWS_IN_MULTIVERSION_QUEUE = 8192;
/* Deque with fixed memory size. Allows pushing gaps.
* frontGap() returns the number of gaps were inserted before front.
*
* Note: empty deque may have non-zero front gap.
*/
template <typename T>
class FixedSizeDequeWithGaps
{
public:
struct ValueWithGap
{
size_t gap;
char value[sizeof(T)];
};
explicit FixedSizeDequeWithGaps(size_t size)
{
container.resize_fill(size + 1);
}
~FixedSizeDequeWithGaps()
{
auto destruct_range = [this](size_t from, size_t to)
{
for (size_t i = from; i < to; ++i)
destructValue(i);
};
if (begin <= end)
destruct_range(begin, end);
else
{
destruct_range(0, end);
destruct_range(begin, container.size());
}
}
void pushBack(const T & value)
{
constructValue(end, value);
moveRight(end);
container[end].gap = 0;
}
void pushGap(size_t count) { container[end].gap += count; }
void popBack()
{
size_t curr_gap = container[end].gap;
moveLeft(end);
destructValue(end);
container[end].gap += curr_gap;
}
void popFront()
{
destructValue(begin);
moveRight(begin);
}
T & front() { return getValue(begin); }
const T & front() const { return getValue(begin); }
const T & back() const
{
size_t ps = end;
moveLeft(ps);
return getValue(ps);
}
size_t & frontGap() { return container[begin].gap; }
const size_t & frontGap() const { return container[begin].gap; }
size_t size() const
{
if (begin <= end)
return end - begin;
return end + (container.size() - begin);
}
bool empty() const { return begin == end; }
private:
PODArray<ValueWithGap> container;
size_t gap_before_first = 0;
size_t begin = 0;
size_t end = 0;
void constructValue(size_t index, const T & value) { new (container[index].value) T(value); }
void destructValue(size_t index) { reinterpret_cast<T *>(container[index].value)->~T(); }
T & getValue(size_t index) { return *reinterpret_cast<T *>(container[index].value); }
const T & getValue(size_t index) const { return *reinterpret_cast<const T *>(container[index].value); }
void moveRight(size_t & index) const
{
++index;
if (index == container.size())
index = 0;
}
void moveLeft(size_t & index) const
{
if (index == 0)
index = container.size();
--index;
}
};
class MultiversionSortedBlockInputStream : public MergingSortedBlockInputStream
{
public:
/// Don't need version column. It's in primary key.
/// max_rows_in_queue should be about max_block_size_ if we won't store a lot of extra blocks (RowRef holds SharedBlockPtr).
MultiversionSortedBlockInputStream(
BlockInputStreams inputs_, const SortDescription & description_,
const String & sign_column_, size_t max_block_size_, bool can_collapse_all_rows_,
WriteBuffer * out_row_sources_buf_ = nullptr)
: MergingSortedBlockInputStream(inputs_, description_, max_block_size_, 0, out_row_sources_buf_)
, sign_column(sign_column_)
, max_rows_in_queue(std::min(std::max<size_t>(3, max_block_size_), MAX_ROWS_IN_MULTIVERSION_QUEUE) - 2)
, current_keys(max_rows_in_queue + 1), can_collapse_all_rows(can_collapse_all_rows_)
{
}
String getName() const override { return "CollapsingSorted"; }
String getID() const override
{
std::stringstream res;
res << "MultiversionSortedBlockInputStream(inputs";
for (const auto & child : children)
res << ", " << child->getID();
res << ", description";
for (const auto & descr : description)
res << ", " << descr.getID();
res << ", sign_column, " << sign_column;
res << ", version_column, " << sign_column << ")";
return res.str();
}
protected:
/// Can return 1 more records than max_block_size.
Block readImpl() override;
private:
String sign_column;
size_t sign_column_number = 0;
Logger * log = &Logger::get("MultiversionSortedBlockInputStream");
/// Read is finished.
bool finished = false;
Int8 sign_in_queue = 0;
const size_t max_rows_in_queue;
/// Rows with the same primary key and sign.
FixedSizeDequeWithGaps<RowRef> current_keys;
size_t blocks_written = 0;
/// Sources of rows for VERTICAL merge algorithm. Size equals to (size + number of gaps) in current_keys.
std::queue<RowSourcePart> current_row_sources;
const bool can_collapse_all_rows;
void merge(MutableColumns & merged_columns, std::priority_queue<SortCursor> & queue);
/// Output to result row for the current primary key.
void insertRow(size_t skip_rows, const RowRef & row, MutableColumns & merged_columns);
void insertGap(size_t gap_size);
};
}

View File

@ -26,7 +26,7 @@ void ReplacingSortedBlockInputStream::insertRow(MutableColumns & merged_columns,
++merged_rows;
for (size_t i = 0; i < num_columns; ++i)
merged_columns[i]->insertFrom(*selected_row.columns[i], selected_row.row_num);
merged_columns[i]->insertFrom(*(*selected_row.columns)[i], selected_row.row_num);
}
@ -52,8 +52,6 @@ Block ReplacingSortedBlockInputStream::readImpl()
/// Additional initialization.
if (selected_row.empty())
{
selected_row.columns.resize(num_columns);
if (!version_column.empty())
version_column_number = header.getPositionByName(version_column);
}
@ -73,12 +71,7 @@ void ReplacingSortedBlockInputStream::merge(MutableColumns & merged_columns, std
SortCursor current = queue.top();
if (current_key.empty())
{
current_key.columns.resize(description.size());
next_key.columns.resize(description.size());
setPrimaryKeyRef(current_key, current);
}
UInt64 version = version_column_number != -1
? current->all_columns[version_column_number]->get64(current->pos)

View File

@ -131,7 +131,6 @@ Block SummingSortedBlockInputStream::readImpl()
if (current_row.empty())
{
current_row.resize(num_columns);
next_key.columns.resize(description.size());
/// name of nested structure -> the column numbers that refer to it.
std::unordered_map<std::string, std::vector<size_t>> discovered_maps;
@ -323,8 +322,7 @@ void SummingSortedBlockInputStream::merge(MutableColumns & merged_columns, std::
bool key_differs;
if (current_key.empty()) /// The first key encountered.
{
current_key.columns.resize(description.size());
{
setPrimaryKeyRef(current_key, current);
key_differs = true;
}

View File

@ -272,11 +272,28 @@ void MergeTreeData::initPartitionKey()
void MergeTreeData::MergingParams::check(const NamesAndTypesList & columns) const
{
if (!sign_column.empty() && mode != MergingParams::Collapsing && mode != MergingParams::Multiversion)
throw Exception("Sign column for MergeTree cannot be specified in modes except Collapsing or Multiversion.",
ErrorCodes::LOGICAL_ERROR);
if (!version_column.empty() && mode != MergingParams::Replacing && mode != MergingParams::Multiversion)
throw Exception("Version column for MergeTree cannot be specified in modes except Replacing or Multiversion.",
ErrorCodes::LOGICAL_ERROR);
if (!columns_to_sum.empty() && mode != MergingParams::Summing)
throw Exception("List of columns to sum for MergeTree cannot be specified in all modes except Summing.",
ErrorCodes::LOGICAL_ERROR);
/// Check that if the sign column is needed, it exists and is of type Int8.
if (mode == MergingParams::Collapsing)
auto check_sign_column = [this, & columns](bool is_optional, const std::string & storage)
{
if (sign_column.empty())
throw Exception("Logical error: Sign column for storage CollapsingMergeTree is empty", ErrorCodes::LOGICAL_ERROR);
{
if (is_optional)
return;
throw Exception("Logical error: Sign column for storage " + storage + " is empty", ErrorCodes::LOGICAL_ERROR);
}
bool miss_column = true;
for (const auto & column : columns)
@ -284,59 +301,63 @@ void MergeTreeData::MergingParams::check(const NamesAndTypesList & columns) cons
if (column.name == sign_column)
{
if (!typeid_cast<const DataTypeInt8 *>(column.type.get()))
throw Exception("Sign column (" + sign_column + ")"
" for storage CollapsingMergeTree must have type Int8."
" Provided column of type " + column.type->getName() + ".", ErrorCodes::BAD_TYPE_OF_FIELD);
throw Exception("Sign column (" + sign_column + ") for storage " + storage + " must have type Int8."
" Provided column of type " + column.type->getName() + ".", ErrorCodes::BAD_TYPE_OF_FIELD);
miss_column = false;
break;
}
}
if (miss_column)
throw Exception("Sign column " + sign_column + " does not exist in table declaration.");
}
else if (!sign_column.empty())
throw Exception("Sign column for MergeTree cannot be specified in all modes except Collapsing.", ErrorCodes::LOGICAL_ERROR);
};
/// If colums_to_sum are set, then check that such columns exist.
if (!columns_to_sum.empty())
/// that if the version_column column is needed, it exists and is of unsigned integer type.
auto check_version_column = [this, & columns](bool is_optional, const std::string & storage)
{
if (mode != MergingParams::Summing)
throw Exception("List of columns to sum for MergeTree cannot be specified in all modes except Summing.",
ErrorCodes::LOGICAL_ERROR);
if (version_column.empty())
{
if (is_optional)
return;
for (const auto & column_to_sum : columns_to_sum)
if (columns.end() == std::find_if(columns.begin(), columns.end(),
[&](const NameAndTypePair & name_and_type) { return column_to_sum == Nested::extractTableName(name_and_type.name); }))
throw Exception("Column " + column_to_sum + " listed in columns to sum does not exist in table declaration.");
}
/// Check that version_column column is set only for Replacing mode and is of unsigned integer type.
if (!version_column.empty())
{
if (mode != MergingParams::Replacing)
throw Exception("Version column for MergeTree cannot be specified in all modes except Replacing.",
ErrorCodes::LOGICAL_ERROR);
throw Exception("Logical error: Version column for storage " + storage + " is empty", ErrorCodes::LOGICAL_ERROR);
}
bool miss_column = true;
for (const auto & column : columns)
{
if (column.name == version_column)
{
if (!typeid_cast<const DataTypeUInt8 *>(column.type.get())
&& !typeid_cast<const DataTypeUInt16 *>(column.type.get())
&& !typeid_cast<const DataTypeUInt32 *>(column.type.get())
&& !typeid_cast<const DataTypeUInt64 *>(column.type.get())
&& !typeid_cast<const DataTypeDate *>(column.type.get())
&& !typeid_cast<const DataTypeDateTime *>(column.type.get()))
if (!column.type->isUnsignedInteger() && !column.type->isDateOrDateTime())
throw Exception("Version column (" + version_column + ")"
" for storage ReplacingMergeTree must have type of UInt family or Date or DateTime."
" Provided column of type " + column.type->getName() + ".", ErrorCodes::BAD_TYPE_OF_FIELD);
" for storage " + storage + " must have type of UInt family or Date or DateTime."
" Provided column of type " + column.type->getName() + ".", ErrorCodes::BAD_TYPE_OF_FIELD);
miss_column = false;
break;
}
}
if (miss_column)
throw Exception("Version column " + version_column + " does not exist in table declaration.");
};
if (mode == MergingParams::Collapsing)
check_sign_column(false, "CollapsingMergeTree");
if (mode == MergingParams::Summing)
{
/// If columns_to_sum are set, then check that such columns exist.
for (const auto & column_to_sum : columns_to_sum)
if (columns.end() == std::find_if(columns.begin(), columns.end(),
[&](const NameAndTypePair & name_and_type) { return column_to_sum == Nested::extractTableName(name_and_type.name); }))
throw Exception("Column " + column_to_sum + " listed in columns to sum does not exist in table declaration.");
}
if (mode == MergingParams::Replacing)
check_version_column(true, "ReplacingMergeTree");
if (mode == MergingParams::Multiversion)
{
check_sign_column(false, "MultiversionMergeTree");
check_version_column(false, "MultiversionMergeTree");
}
/// TODO Checks for Graphite mode.
@ -354,6 +375,7 @@ String MergeTreeData::MergingParams::getModeName() const
case Unsorted: return "Unsorted";
case Replacing: return "Replacing";
case Graphite: return "Graphite";
case Multiversion: return "Multiversion";
default:
throw Exception("Unknown mode of operation for MergeTreeData: " + toString<int>(mode), ErrorCodes::LOGICAL_ERROR);

View File

@ -235,24 +235,25 @@ public:
/// Merging mode. See above.
enum Mode
{
Ordinary = 0, /// Enum values are saved. Do not change them.
Collapsing = 1,
Summing = 2,
Aggregating = 3,
Unsorted = 4,
Replacing = 5,
Graphite = 6,
Ordinary = 0, /// Enum values are saved. Do not change them.
Collapsing = 1,
Summing = 2,
Aggregating = 3,
Unsorted = 4,
Replacing = 5,
Graphite = 6,
Multiversion = 7,
};
Mode mode;
/// For collapsing mode.
/// For Collapsing and Multiversion mode.
String sign_column;
/// For Summing mode. If empty - columns_to_sum is determined automatically.
Names columns_to_sum;
/// For Replacing mode. Can be empty.
/// For Replacing and Multiversion mode. Can be empty for Replacing.
String version_column;
/// For Graphite mode.
@ -300,7 +301,8 @@ public:
return merging_params.mode == MergingParams::Collapsing
|| merging_params.mode == MergingParams::Summing
|| merging_params.mode == MergingParams::Aggregating
|| merging_params.mode == MergingParams::Replacing;
|| merging_params.mode == MergingParams::Replacing
|| merging_params.mode == MergingParams::Multiversion;
}
bool mayBenefitFromIndexForIn(const ASTPtr & left_in_operand) const;

View File

@ -13,6 +13,7 @@
#include <DataStreams/ReplacingSortedBlockInputStream.h>
#include <DataStreams/GraphiteRollupSortedBlockInputStream.h>
#include <DataStreams/AggregatingSortedBlockInputStream.h>
#include <DataStreams/MultiversionSortedBlockInputStream.h>
#include <DataStreams/MaterializingBlockInputStream.h>
#include <DataStreams/ConcatBlockInputStream.h>
#include <DataStreams/ColumnGathererStream.h>
@ -339,6 +340,10 @@ static void extractMergingAndGatheringColumns(const NamesAndTypesList & all_colu
if (merging_params.mode == MergeTreeData::MergingParams::Replacing)
key_columns.emplace(merging_params.version_column);
/// Force sign column for Multiversion mode. Version is already in primary key.
if (merging_params.mode == MergeTreeData::MergingParams::Multiversion)
key_columns.emplace(merging_params.sign_column);
/// TODO: also force "summing" and "aggregating" columns to make Horizontal merge only for such columns
for (auto & column : all_columns)
@ -626,6 +631,11 @@ MergeTreeData::MutableDataPartPtr MergeTreeDataMerger::mergePartsToTemporaryPart
data.merging_params.graphite_params, time_of_merge);
break;
case MergeTreeData::MergingParams::Multiversion:
merged_stream = std::make_unique<MultiversionSortedBlockInputStream>(
src_streams, sort_desc, data.merging_params.sign_column, DEFAULT_MERGE_BLOCK_SIZE, false, rows_sources_write_buf.get());
break;
case MergeTreeData::MergingParams::Unsorted:
merged_stream = std::make_unique<ConcatBlockInputStream>(src_streams);
break;
@ -792,7 +802,8 @@ MergeTreeDataMerger::MergeAlgorithm MergeTreeDataMerger::chooseMergeAlgorithm(
bool is_supported_storage =
data.merging_params.mode == MergeTreeData::MergingParams::Ordinary ||
data.merging_params.mode == MergeTreeData::MergingParams::Collapsing ||
data.merging_params.mode == MergeTreeData::MergingParams::Replacing;
data.merging_params.mode == MergeTreeData::MergingParams::Replacing ||
data.merging_params.mode == MergeTreeData::MergingParams::Multiversion;
bool enough_ordinary_cols = gathering_columns.size() >= data.context.getMergeTreeSettings().vertical_merge_algorithm_min_columns_to_activate;

View File

@ -38,6 +38,7 @@ namespace std
#include <DataStreams/SummingSortedBlockInputStream.h>
#include <DataStreams/ReplacingSortedBlockInputStream.h>
#include <DataStreams/AggregatingSortedBlockInputStream.h>
#include <DataStreams/MultiversionSortedBlockInputStream.h>
#include <DataTypes/DataTypesNumber.h>
#include <DataTypes/DataTypeDate.h>
#include <DataTypes/DataTypeEnum.h>
@ -783,7 +784,7 @@ BlockInputStreams MergeTreeDataSelectExecutor::spreadMarkRangesAmongStreamsFinal
BlockInputStreams res;
if (to_merge.size() == 1)
{
if (!data.merging_params.sign_column.empty())
if (data.merging_params.mode == MergeTreeData::MergingParams::Collapsing)
{
ExpressionActionsPtr sign_filter_expression;
String sign_filter_column;
@ -806,7 +807,8 @@ BlockInputStreams MergeTreeDataSelectExecutor::spreadMarkRangesAmongStreamsFinal
break;
case MergeTreeData::MergingParams::Collapsing:
merged = std::make_shared<CollapsingFinalBlockInputStream>(to_merge, data.getSortDescription(), data.merging_params.sign_column);
merged = std::make_shared<CollapsingFinalBlockInputStream>(
to_merge, data.getSortDescription(), data.merging_params.sign_column);
break;
case MergeTreeData::MergingParams::Summing:
@ -823,6 +825,11 @@ BlockInputStreams MergeTreeDataSelectExecutor::spreadMarkRangesAmongStreamsFinal
data.getSortDescription(), data.merging_params.version_column, max_block_size);
break;
case MergeTreeData::MergingParams::Multiversion: /// TODO Make MultiversionFinalBlockInputStream
merged = std::make_shared<MultiversionSortedBlockInputStream>(
to_merge, data.getSortDescription(), data.merging_params.sign_column, max_block_size, true);
break;
case MergeTreeData::MergingParams::Unsorted:
throw Exception("UnsortedMergeTree doesn't support FINAL", ErrorCodes::LOGICAL_ERROR);

View File

@ -368,6 +368,8 @@ static StoragePtr create(const StorageFactory::Arguments & args)
merging_params.mode = MergeTreeData::MergingParams::Replacing;
else if (name_part == "Graphite")
merging_params.mode = MergeTreeData::MergingParams::Graphite;
else if (name_part == "Multiversion")
merging_params.mode = MergeTreeData::MergingParams::Multiversion;
else if (!name_part.empty())
throw Exception(
"Unknown storage " + args.engine_name + getMergeTreeVerboseHelp(is_extended_storage_def),
@ -424,6 +426,12 @@ static StoragePtr create(const StorageFactory::Arguments & args)
case MergeTreeData::MergingParams::Graphite:
add_mandatory_param("'config_element_for_graphite_schema'");
break;
case MergeTreeData::MergingParams::Multiversion:
{
add_mandatory_param("sign column");
add_mandatory_param("version");
break;
}
}
ASTs & engine_args = args.engine_args;
@ -536,6 +544,26 @@ static StoragePtr create(const StorageFactory::Arguments & args)
engine_args.pop_back();
setGraphitePatternsFromConfig(args.context, graphite_config_name, merging_params.graphite_params);
}
else if (merging_params.mode == MergeTreeData::MergingParams::Multiversion)
{
if (auto ast = typeid_cast<const ASTIdentifier *>(engine_args.back().get()))
merging_params.version_column = ast->name;
else
throw Exception(
"Version column name must be an unquoted string" + getMergeTreeVerboseHelp(is_extended_storage_def),
ErrorCodes::BAD_ARGUMENTS);
engine_args.pop_back();
if (auto ast = typeid_cast<const ASTIdentifier *>(engine_args.back().get()))
merging_params.sign_column = ast->name;
else
throw Exception(
"Sign column name must be an unquoted string" + getMergeTreeVerboseHelp(is_extended_storage_def),
ErrorCodes::BAD_ARGUMENTS);
engine_args.pop_back();
}
String date_column_name;
ASTPtr partition_expr_list;
@ -585,6 +613,23 @@ static StoragePtr create(const StorageFactory::Arguments & args)
ErrorCodes::BAD_ARGUMENTS);
}
/// MultiversionMergeTree must have version column in primary key. Add it to primary key implicitly otherwise.
if (merging_params.mode == MergeTreeData::MergingParams::Multiversion)
{
auto ast_primary_expr_list = typeid_cast<ASTExpressionList *>(primary_expr_list.get());
bool has_version_column_in_pk = false;
for (const auto & expr : ast_primary_expr_list->children)
if (auto ast = typeid_cast<const ASTIdentifier *>(expr.get()))
if (ast->name == merging_params.version_column)
has_version_column_in_pk = true;
if (!has_version_column_in_pk)
{
auto version = std::make_shared<ASTIdentifier>(StringRange(), merging_params.version_column);
ast_primary_expr_list->children.push_back(version);
}
}
if (replicated)
return StorageReplicatedMergeTree::create(
zookeeper_path, replica_name, args.attach, args.data_path, args.database_name, args.table_name,
@ -610,6 +655,7 @@ void registerStorageMergeTree(StorageFactory & factory)
factory.registerStorage("AggregatingMergeTree", create);
factory.registerStorage("SummingMergeTree", create);
factory.registerStorage("GraphiteMergeTree", create);
factory.registerStorage("MultiversionMergeTree", create);
factory.registerStorage("ReplicatedMergeTree", create);
factory.registerStorage("ReplicatedCollapsingMergeTree", create);
@ -617,6 +663,7 @@ void registerStorageMergeTree(StorageFactory & factory)
factory.registerStorage("ReplicatedAggregatingMergeTree", create);
factory.registerStorage("ReplicatedSummingMergeTree", create);
factory.registerStorage("ReplicatedGraphiteMergeTree", create);
factory.registerStorage("ReplicatedMultiversionMergeTree", create);
}
}