Merge branch 'master' into patch-8

This commit is contained in:
Amy Krishnevsky 2018-02-13 11:37:05 +03:00 committed by GitHub
commit fb92d76698
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
62 changed files with 1701 additions and 245 deletions

View File

@ -1,7 +1,7 @@
# ClickHouse release 1.1.54343, 2018-02-05
* Added macros support for defining cluster names in distributed DDL queries and constructors of Distributed tables: `CREATE TABLE distr ON CLUSTER '{cluster}' (...) ENGINE = Distributed('{cluster}', 'db', 'table')`.
* Now the index is used for conditions like `expr IN (subquery)`.
* Now the table index is used for conditions like `expr IN (subquery)`.
* Improved processing of duplicates when inserting to Replicated tables, so they no longer slow down execution of the replication queue.
# ClickHouse release 1.1.54342, 2018-01-22

View File

@ -65,15 +65,15 @@ public:
bool hasEqualOffsets(const ColumnArray & other) const;
/** More efficient methods of manipulation */
IColumn & getData() { return *data->assumeMutable(); }
IColumn & getData() { return data->assumeMutableRef(); }
const IColumn & getData() const { return *data; }
IColumn & getOffsetsColumn() { return *offsets->assumeMutable(); }
IColumn & getOffsetsColumn() { return offsets->assumeMutableRef(); }
const IColumn & getOffsetsColumn() const { return *offsets; }
Offsets & ALWAYS_INLINE getOffsets()
{
return static_cast<ColumnOffsets &>(*offsets->assumeMutable()).getData();
return static_cast<ColumnOffsets &>(offsets->assumeMutableRef()).getData();
}
const Offsets & ALWAYS_INLINE getOffsets() const
@ -81,11 +81,9 @@ public:
return static_cast<const ColumnOffsets &>(*offsets).getData();
}
//MutableColumnPtr getDataMutablePtr() { return data->assumeMutable(); }
const ColumnPtr & getDataPtr() const { return data; }
ColumnPtr & getDataPtr() { return data; }
//MutableColumnPtr getOffsetsMutablePtr() { return offsets->assumeMutable(); }
const ColumnPtr & getOffsetsPtr() const { return offsets; }
ColumnPtr & getOffsetsPtr() { return offsets; }

View File

@ -133,9 +133,9 @@ public:
const char * deserializeAndInsertFromArena(const char * pos) override
{
MutableColumnPtr mutable_data = data->assumeMutable();
auto res = mutable_data->deserializeAndInsertFromArena(pos);
mutable_data->popBack(1);
auto & mutable_data = data->assumeMutableRef();
auto res = mutable_data.deserializeAndInsertFromArena(pos);
mutable_data.popBack(1);
++s;
return res;
}
@ -191,7 +191,7 @@ public:
/// Not part of the common interface.
IColumn & getDataColumn() { return *data->assumeMutable(); }
IColumn & getDataColumn() { return data->assumeMutableRef(); }
const IColumn & getDataColumn() const { return *data; }
//MutableColumnPtr getDataColumnMutablePtr() { return data; }
const ColumnPtr & getDataColumnPtr() const { return data; }

View File

@ -121,13 +121,13 @@ std::vector<MutableColumnPtr> ColumnFunction::scatter(IColumn::ColumnIndex num_c
void ColumnFunction::insertDefault()
{
for (auto & column : captured_columns)
column.column->assumeMutable()->insertDefault();
column.column->assumeMutableRef().insertDefault();
++size_;
}
void ColumnFunction::popBack(size_t n)
{
for (auto & column : captured_columns)
column.column->assumeMutable()->popBack(n);
column.column->assumeMutableRef().popBack(n);
size_ -= n;
}

View File

@ -81,17 +81,16 @@ public:
/// Return the column that represents values.
IColumn & getNestedColumn() { return *nested_column->assumeMutable(); }
IColumn & getNestedColumn() { return nested_column->assumeMutableRef(); }
const IColumn & getNestedColumn() const { return *nested_column; }
//ColumnPtr & getNestedColumnPtr() { return nested_column->assumeMutable(); }
const ColumnPtr & getNestedColumnPtr() const { return nested_column; }
/// Return the column that represents the byte map.
//ColumnPtr & getNullMapColumnPtr() { return null_map; }
const ColumnPtr & getNullMapColumnPtr() const { return null_map; }
ColumnUInt8 & getNullMapColumn() { return static_cast<ColumnUInt8 &>(*null_map->assumeMutable()); }
ColumnUInt8 & getNullMapColumn() { return static_cast<ColumnUInt8 &>(null_map->assumeMutableRef()); }
const ColumnUInt8 & getNullMapColumn() const { return static_cast<const ColumnUInt8 &>(*null_map); }
NullMap & getNullMapData() { return getNullMapColumn().getData(); }

View File

@ -81,7 +81,7 @@ void ColumnTuple::insert(const Field & x)
throw Exception("Cannot insert value of different size into tuple", ErrorCodes::CANNOT_INSERT_VALUE_OF_DIFFERENT_SIZE_INTO_TUPLE);
for (size_t i = 0; i < tuple_size; ++i)
columns[i]->assumeMutable()->insert(tuple[i]);
columns[i]->assumeMutableRef().insert(tuple[i]);
}
void ColumnTuple::insertFrom(const IColumn & src_, size_t n)
@ -93,19 +93,19 @@ void ColumnTuple::insertFrom(const IColumn & src_, size_t n)
throw Exception("Cannot insert value of different size into tuple", ErrorCodes::CANNOT_INSERT_VALUE_OF_DIFFERENT_SIZE_INTO_TUPLE);
for (size_t i = 0; i < tuple_size; ++i)
columns[i]->assumeMutable()->insertFrom(*src.columns[i], n);
columns[i]->assumeMutableRef().insertFrom(*src.columns[i], n);
}
void ColumnTuple::insertDefault()
{
for (auto & column : columns)
column->assumeMutable()->insertDefault();
column->assumeMutableRef().insertDefault();
}
void ColumnTuple::popBack(size_t n)
{
for (auto & column : columns)
column->assumeMutable()->popBack(n);
column->assumeMutableRef().popBack(n);
}
StringRef ColumnTuple::serializeValueIntoArena(size_t n, Arena & arena, char const *& begin) const
@ -120,7 +120,7 @@ StringRef ColumnTuple::serializeValueIntoArena(size_t n, Arena & arena, char con
const char * ColumnTuple::deserializeAndInsertFromArena(const char * pos)
{
for (auto & column : columns)
pos = column->assumeMutable()->deserializeAndInsertFromArena(pos);
pos = column->assumeMutableRef().deserializeAndInsertFromArena(pos);
return pos;
}
@ -135,7 +135,7 @@ void ColumnTuple::insertRangeFrom(const IColumn & src, size_t start, size_t leng
{
const size_t tuple_size = columns.size();
for (size_t i = 0; i < tuple_size; ++i)
columns[i]->assumeMutable()->insertRangeFrom(
columns[i]->assumeMutableRef().insertRangeFrom(
*static_cast<const ColumnTuple &>(src).columns[i],
start, length);
}

View File

@ -65,13 +65,11 @@ public:
size_t tupleSize() const { return columns.size(); }
const IColumn & getColumn(size_t idx) const { return *columns[idx]; }
IColumn & getColumn(size_t idx) { return *columns[idx]->assumeMutable(); }
IColumn & getColumn(size_t idx) { return columns[idx]->assumeMutableRef(); }
const Columns & getColumns() const { return columns; }
const ColumnPtr & getColumnPtr(size_t idx) const { return columns[idx]; }
//ColumnPtr & getColumnPtr(size_t idx) { return columns[idx]; }
//MutableColumnPtr getColumnMutablePtr(size_t idx) { return columns[idx]->assumeMutable(); }
};

View File

@ -172,6 +172,11 @@ public:
{
return const_cast<COWPtr*>(this)->getPtr();
}
Derived & assumeMutableRef() const
{
return const_cast<Derived &>(*derived());
}
};
@ -235,6 +240,6 @@ public:
* 3. Store subobjects as immutable ptrs. Implement copy-constructor to do shallow copy.
* But reimplement 'mutate' method, so it will call 'mutate' of all subobjects (do deep mutate).
* It will guarantee, that mutable object have all subobjects unshared.
* From non-const method, you can modify subobjects with 'assumeMutable' method.
* From non-const method, you can modify subobjects with 'assumeMutableRef' method.
* Drawback: it's more complex than other solutions.
*/

View File

@ -34,8 +34,6 @@ Block AggregatingSortedBlockInputStream::readImpl()
/// Additional initialization.
if (next_key.empty())
{
next_key.columns.resize(description.size());
/// Fill in the column numbers that need to be aggregated.
for (size_t i = 0; i < num_columns; ++i)
{
@ -88,7 +86,6 @@ void AggregatingSortedBlockInputStream::merge(MutableColumns & merged_columns, s
if (current_key.empty()) /// The first key encountered.
{
current_key.columns.resize(description.size());
setPrimaryKeyRef(current_key, current);
key_differs = true;
}

View File

@ -27,7 +27,7 @@ void CollapsingSortedBlockInputStream::reportIncorrectData()
{
if (i != 0)
s << ", ";
s << applyVisitor(FieldVisitorToString(), (*current_key.columns[i])[current_key.row_num]);
s << applyVisitor(FieldVisitorToString(), (*(*current_key.columns)[i])[current_key.row_num]);
}
s << ").";
@ -53,10 +53,10 @@ void CollapsingSortedBlockInputStream::insertRows(MutableColumns & merged_column
LOG_INFO(log, "All rows collapsed");
++merged_rows;
for (size_t i = 0; i < num_columns; ++i)
merged_columns[i]->insertFrom(*last_positive.columns[i], last_positive.row_num);
merged_columns[i]->insertFrom(*(*last_positive.columns)[i], last_positive.row_num);
++merged_rows;
for (size_t i = 0; i < num_columns; ++i)
merged_columns[i]->insertFrom(*last_negative.columns[i], last_negative.row_num);
merged_columns[i]->insertFrom(*(*last_negative.columns)[i], last_negative.row_num);
if (out_row_sources_buf)
{
@ -72,7 +72,7 @@ void CollapsingSortedBlockInputStream::insertRows(MutableColumns & merged_column
{
++merged_rows;
for (size_t i = 0; i < num_columns; ++i)
merged_columns[i]->insertFrom(*first_negative.columns[i], first_negative.row_num);
merged_columns[i]->insertFrom(*(*first_negative.columns)[i], first_negative.row_num);
if (out_row_sources_buf)
current_row_sources[first_negative_pos].setSkipFlag(false);
@ -82,7 +82,7 @@ void CollapsingSortedBlockInputStream::insertRows(MutableColumns & merged_column
{
++merged_rows;
for (size_t i = 0; i < num_columns; ++i)
merged_columns[i]->insertFrom(*last_positive.columns[i], last_positive.row_num);
merged_columns[i]->insertFrom(*(*last_positive.columns)[i], last_positive.row_num);
if (out_row_sources_buf)
current_row_sources[last_positive_pos].setSkipFlag(false);
@ -124,13 +124,8 @@ Block CollapsingSortedBlockInputStream::readImpl()
/// Additional initialization.
if (first_negative.empty())
{
first_negative.columns.resize(num_columns);
last_negative.columns.resize(num_columns);
last_positive.columns.resize(num_columns);
sign_column_number = header.getPositionByName(sign_column);
}
merge(merged_columns, queue);
return header.cloneWithColumns(std::move(merged_columns));
@ -147,12 +142,7 @@ void CollapsingSortedBlockInputStream::merge(MutableColumns & merged_columns, st
SortCursor current = queue.top();
if (current_key.empty())
{
current_key.columns.resize(description.size());
next_key.columns.resize(description.size());
setPrimaryKeyRef(current_key, current);
}
Int8 sign = static_cast<const ColumnInt8 &>(*current->all_columns[sign_column_number]).getData()[current->pos];
setPrimaryKeyRef(next_key, current);

View File

@ -25,8 +25,7 @@ class CollapsingSortedBlockInputStream : public MergingSortedBlockInputStream
public:
CollapsingSortedBlockInputStream(
BlockInputStreams inputs_, const SortDescription & description_,
const String & sign_column_, size_t max_block_size_,
WriteBuffer * out_row_sources_buf_ = nullptr)
const String & sign_column_, size_t max_block_size_, WriteBuffer * out_row_sources_buf_ = nullptr)
: MergingSortedBlockInputStream(inputs_, description_, max_block_size_, 0, out_row_sources_buf_)
, sign_column(sign_column_)
{

View File

@ -98,9 +98,6 @@ Block GraphiteRollupSortedBlockInputStream::readImpl()
for (size_t i = 0; i < num_columns; ++i)
if (i != time_column_num && i != value_column_num && i != version_column_num)
unmodified_column_numbers.push_back(i);
if (current_subgroup_newest_row.empty())
current_subgroup_newest_row.columns.resize(num_columns);
}
merge(merged_columns, queue);
@ -257,14 +254,14 @@ void GraphiteRollupSortedBlockInputStream::finishCurrentGroup(MutableColumns & m
}
else
merged_columns[value_column_num]->insertFrom(
*current_subgroup_newest_row.columns[value_column_num], current_subgroup_newest_row.row_num);
*(*current_subgroup_newest_row.columns)[value_column_num], current_subgroup_newest_row.row_num);
}
void GraphiteRollupSortedBlockInputStream::accumulateRow(RowRef & row)
{
if (aggregate_state_created)
current_pattern->function->add(place_for_aggregate_state.data(), &row.columns[value_column_num], row.row_num, nullptr);
current_pattern->function->add(place_for_aggregate_state.data(), &(*row.columns)[value_column_num], row.row_num, nullptr);
}
}

View File

@ -53,10 +53,9 @@ void MergingSortedBlockInputStream::init(Block & header, MutableColumns & merged
{
first = false;
size_t i = 0;
for (auto it = source_blocks.begin(); it != source_blocks.end(); ++it, ++i)
for (size_t i = 0; i < source_blocks.size(); ++i)
{
SharedBlockPtr & shared_block_ptr = *it;
SharedBlockPtr & shared_block_ptr = source_blocks[i];
if (shared_block_ptr.get())
continue;
@ -75,6 +74,8 @@ void MergingSortedBlockInputStream::init(Block & header, MutableColumns & merged
expected_block_size = std::min(rows, max_block_size);
cursors[i] = SortCursorImpl(*shared_block_ptr, description, i);
shared_block_ptr->all_columns = cursors[i].all_columns;
shared_block_ptr->sort_columns = cursors[i].sort_columns;
has_collation |= cursors[i].has_collation;
}
@ -173,25 +174,20 @@ Block MergingSortedBlockInputStream::readImpl()
template <typename TSortCursor>
void MergingSortedBlockInputStream::fetchNextBlock(const TSortCursor & current, std::priority_queue<TSortCursor> & queue)
{
size_t i = 0;
size_t order = current.impl->order;
size_t size = cursors.size();
for (; i < size; ++i)
{
if (&cursors[i] == current.impl)
{
source_blocks[i] = new detail::SharedBlock(children[i]->read());
if (*source_blocks[i])
{
cursors[i].reset(*source_blocks[i]);
queue.push(TSortCursor(&cursors[i]));
}
break;
}
}
if (i == size)
if (order >= size || &cursors[order] != current.impl)
throw Exception("Logical error in MergingSortedBlockInputStream", ErrorCodes::LOGICAL_ERROR);
source_blocks[order] = new detail::SharedBlock(children[order]->read());
if (*source_blocks[order])
{
cursors[order].reset(*source_blocks[order]);
queue.push(TSortCursor(&cursors[order]));
source_blocks[order]->all_columns = cursors[order].all_columns;
source_blocks[order]->sort_columns = cursors[order].sort_columns;
}
}
template

View File

@ -30,13 +30,15 @@ namespace ErrorCodes
/// The reference counter is not atomic, since it is used from one thread.
namespace detail
{
struct SharedBlock : Block
{
int refcount = 0;
struct SharedBlock : Block
{
int refcount = 0;
SharedBlock(Block && value_)
: Block(std::move(value_)) {};
};
ColumnRawPtrs all_columns;
ColumnRawPtrs sort_columns;
SharedBlock(Block && block) : Block(std::move(block)) {}
};
}
using SharedBlockPtr = boost::intrusive_ptr<detail::SharedBlock>;
@ -77,7 +79,7 @@ public:
protected:
struct RowRef
{
ColumnRawPtrs columns;
ColumnRawPtrs * columns = nullptr;
size_t row_num;
SharedBlockPtr shared_block;
@ -91,9 +93,9 @@ protected:
/// The number and types of columns must match.
bool operator==(const RowRef & other) const
{
size_t size = columns.size();
size_t size = columns->size();
for (size_t i = 0; i < size; ++i)
if (0 != columns[i]->compareAt(row_num, other.row_num, *other.columns[i], 1))
if (0 != (*columns)[i]->compareAt(row_num, other.row_num, *(*other.columns)[i], 1))
return false;
return true;
}
@ -103,8 +105,8 @@ protected:
return !(*this == other);
}
bool empty() const { return columns.empty(); }
size_t size() const { return columns.size(); }
bool empty() const { return columns == nullptr; }
size_t size() const { return empty() ? 0 : columns->size(); }
};
@ -190,9 +192,7 @@ protected:
{
row_ref.row_num = cursor.impl->pos;
row_ref.shared_block = source_blocks[cursor.impl->order];
for (size_t i = 0; i < num_columns; ++i)
row_ref.columns[i] = cursor->all_columns[i];
row_ref.columns = &row_ref.shared_block->all_columns;
}
template <typename TSortCursor>
@ -200,9 +200,7 @@ protected:
{
row_ref.row_num = cursor.impl->pos;
row_ref.shared_block = source_blocks[cursor.impl->order];
for (size_t i = 0; i < cursor->sort_columns_size; ++i)
row_ref.columns[i] = cursor->sort_columns[i];
row_ref.columns = &row_ref.shared_block->sort_columns;
}
private:

View File

@ -26,7 +26,7 @@ void ReplacingSortedBlockInputStream::insertRow(MutableColumns & merged_columns,
++merged_rows;
for (size_t i = 0; i < num_columns; ++i)
merged_columns[i]->insertFrom(*selected_row.columns[i], selected_row.row_num);
merged_columns[i]->insertFrom(*(*selected_row.columns)[i], selected_row.row_num);
}
@ -52,8 +52,6 @@ Block ReplacingSortedBlockInputStream::readImpl()
/// Additional initialization.
if (selected_row.empty())
{
selected_row.columns.resize(num_columns);
if (!version_column.empty())
version_column_number = header.getPositionByName(version_column);
}
@ -73,12 +71,7 @@ void ReplacingSortedBlockInputStream::merge(MutableColumns & merged_columns, std
SortCursor current = queue.top();
if (current_key.empty())
{
current_key.columns.resize(description.size());
next_key.columns.resize(description.size());
setPrimaryKeyRef(current_key, current);
}
UInt64 version = version_column_number != -1
? current->all_columns[version_column_number]->get64(current->pos)

View File

@ -131,7 +131,6 @@ Block SummingSortedBlockInputStream::readImpl()
if (current_row.empty())
{
current_row.resize(num_columns);
next_key.columns.resize(description.size());
/// name of nested structure -> the column numbers that refer to it.
std::unordered_map<std::string, std::vector<size_t>> discovered_maps;
@ -323,8 +322,7 @@ void SummingSortedBlockInputStream::merge(MutableColumns & merged_columns, std::
bool key_differs;
if (current_key.empty()) /// The first key encountered.
{
current_key.columns.resize(description.size());
{
setPrimaryKeyRef(current_key, current);
key_differs = true;
}

View File

@ -0,0 +1,185 @@
#include <Common/FieldVisitors.h>
#include <DataStreams/VersionedCollapsingSortedBlockInputStream.h>
#include <Columns/ColumnsNumber.h>
namespace DB
{
namespace ErrorCodes
{
extern const int LOGICAL_ERROR;
extern const int NOT_IMPLEMENTED;
}
inline ALWAYS_INLINE static void writeRowSourcePart(WriteBuffer & buffer, RowSourcePart row_source)
{
if constexpr (sizeof(RowSourcePart) == 1)
buffer.write(*reinterpret_cast<const char *>(&row_source));
else
buffer.write(reinterpret_cast<const char *>(&row_source), sizeof(RowSourcePart));
}
void VersionedCollapsingSortedBlockInputStream::insertGap(size_t gap_size)
{
if (out_row_sources_buf)
{
for (size_t i = 0; i < gap_size; ++i)
{
writeRowSourcePart(*out_row_sources_buf, current_row_sources.front());
current_row_sources.pop();
}
}
}
void VersionedCollapsingSortedBlockInputStream::insertRow(size_t skip_rows, const RowRef & row, MutableColumns & merged_columns)
{
const auto & columns = row.shared_block->all_columns;
for (size_t i = 0; i < num_columns; ++i)
merged_columns[i]->insertFrom(*columns[i], row.row_num);
insertGap(skip_rows);
if (out_row_sources_buf)
{
current_row_sources.front().setSkipFlag(false);
writeRowSourcePart(*out_row_sources_buf, current_row_sources.front());
current_row_sources.pop();
}
}
Block VersionedCollapsingSortedBlockInputStream::readImpl()
{
if (finished)
return {};
if (children.size() == 1)
return children[0]->read();
Block header;
MutableColumns merged_columns;
bool is_initialized = !first;
init(header, merged_columns);
if (has_collation)
throw Exception("Logical error: " + getName() + " does not support collations", ErrorCodes::NOT_IMPLEMENTED);
if (merged_columns.empty())
return {};
/// Additional initialization.
if (!is_initialized)
sign_column_number = header.getPositionByName(sign_column);
merge(merged_columns, queue);
return header.cloneWithColumns(std::move(merged_columns));
}
void VersionedCollapsingSortedBlockInputStream::merge(MutableColumns & merged_columns, std::priority_queue<SortCursor> & queue)
{
size_t merged_rows = 0;
auto update_queue = [this, & queue](SortCursor & cursor)
{
queue.pop();
if (out_row_sources_buf)
current_row_sources.emplace(cursor->order, true);
if (!cursor->isLast())
{
cursor->next();
queue.push(cursor);
}
else
{
/// We take next block from the corresponding source, if there is one.
fetchNextBlock(cursor, queue);
}
};
/// Take rows in correct order and put them into `merged_columns` until the rows no more than `max_block_size`
while (!queue.empty())
{
SortCursor current = queue.top();
RowRef next_key;
Int8 sign = static_cast<const ColumnInt8 &>(*current->all_columns[sign_column_number]).getData()[current->pos];
setPrimaryKeyRef(next_key, current);
size_t rows_to_merge = 0;
/// Each branch either updates queue or increases rows_to_merge.
if (current_keys.empty())
{
sign_in_queue = sign;
current_keys.pushBack(next_key);
update_queue(current);
}
else
{
if (current_keys.back() == next_key)
{
update_queue(current);
/// If all the rows was collapsed, we still want to give at least one block in the result.
/// If queue is empty then don't collapse two last rows.
if (sign == sign_in_queue || (!can_collapse_all_rows && blocks_written == 0
&& merged_rows == 0 && queue.empty() && current_keys.size() == 1))
current_keys.pushBack(next_key);
else
{
current_keys.popBack();
current_keys.pushGap(2);
}
}
else
rows_to_merge = current_keys.size();
}
if (current_keys.size() > max_rows_in_queue)
rows_to_merge = std::max(rows_to_merge, current_keys.size() - max_rows_in_queue);
while (rows_to_merge)
{
const auto & row = current_keys.front();
auto gap = current_keys.frontGap();
insertRow(gap, row, merged_columns);
current_keys.popFront();
++merged_rows;
--rows_to_merge;
if (merged_rows >= max_block_size)
{
++blocks_written;
return;
}
}
}
while (!current_keys.empty())
{
const auto & row = current_keys.front();
auto gap = current_keys.frontGap();
insertRow(gap, row, merged_columns);
current_keys.popFront();
++merged_rows;
}
/// Write information about last collapsed rows.
insertGap(current_keys.frontGap());
finished = true;
}
}

View File

@ -0,0 +1,235 @@
#pragma once
#include <common/logger_useful.h>
#include <DataStreams/MergingSortedBlockInputStream.h>
#include <deque>
namespace DB
{
static const size_t MAX_ROWS_IN_MULTIVERSION_QUEUE = 8192;
/* Deque with fixed memory size. Allows pushing gaps.
* frontGap() returns the number of gaps were inserted before front.
*
* This structure may be implemented via std::deque, but
* - Deque uses fixed amount of memory which is allocated in constructor. No more allocations are performed.
* - Gaps are not stored as separate values in queue, which is more memory efficient.
* - Deque is responsible for gaps invariant: after removing element, moves gaps into neighbor cell.
*
* Note: empty deque may have non-zero front gap.
*/
template <typename T>
class FixedSizeDequeWithGaps
{
public:
struct ValueWithGap
{
/// The number of gaps before current element. The number of gaps after last element stores into end cell.
size_t gap;
/// Store char[] instead of T in order to make ValueWithGap POD.
/// Call placement constructors after push and and destructors after pop.
char value[sizeof(T)];
};
explicit FixedSizeDequeWithGaps(size_t size)
{
container.resize_fill(size + 1);
}
~FixedSizeDequeWithGaps()
{
auto destruct_range = [this](size_t from, size_t to)
{
for (size_t i = from; i < to; ++i)
destructValue(i);
};
if (begin <= end)
destruct_range(begin, end);
else
{
destruct_range(0, end);
destruct_range(begin, container.size());
}
}
void pushBack(const T & value)
{
checkEnoughSpaceToInsert();
constructValue(end, value);
moveRight(end);
container[end].gap = 0;
}
void pushGap(size_t count) { container[end].gap += count; }
void popBack()
{
checkHasValuesToRemove();
size_t curr_gap = container[end].gap;
moveLeft(end);
destructValue(end);
container[end].gap += curr_gap;
}
void popFront()
{
checkHasValuesToRemove();
destructValue(begin);
moveRight(begin);
}
T & front()
{
checkHasValuesToGet();
return getValue(begin);
}
const T & front() const
{
checkHasValuesToGet();
return getValue(begin);
}
const T & back() const
{
size_t ps = end;
moveLeft(ps);
return getValue(ps);
}
size_t & frontGap() { return container[begin].gap; }
const size_t & frontGap() const { return container[begin].gap; }
size_t size() const
{
if (begin <= end)
return end - begin;
return end + (container.size() - begin);
}
bool empty() const { return begin == end; }
private:
PODArray<ValueWithGap> container;
size_t gap_before_first = 0;
size_t begin = 0;
size_t end = 0;
void constructValue(size_t index, const T & value) { new (container[index].value) T(value); }
void destructValue(size_t index) { reinterpret_cast<T *>(container[index].value)->~T(); }
T & getValue(size_t index) { return *reinterpret_cast<T *>(container[index].value); }
const T & getValue(size_t index) const { return *reinterpret_cast<const T *>(container[index].value); }
void moveRight(size_t & index) const
{
++index;
if (index == container.size())
index = 0;
}
void moveLeft(size_t & index) const
{
if (index == 0)
index = container.size();
--index;
}
void checkEnoughSpaceToInsert() const
{
if (size() + 1 == container.size())
throw Exception("Not enough space to insert into FixedSizeDequeWithGaps with capacity "
+ toString(container.size() - 1), ErrorCodes::LOGICAL_ERROR);
}
void checkHasValuesToRemove() const
{
if (empty())
throw Exception("Cannot remove from empty FixedSizeDequeWithGaps", ErrorCodes::LOGICAL_ERROR);
}
void checkHasValuesToGet() const
{
if (empty())
throw Exception("Cannot get value from empty FixedSizeDequeWithGaps", ErrorCodes::LOGICAL_ERROR);
}
};
class VersionedCollapsingSortedBlockInputStream : public MergingSortedBlockInputStream
{
public:
/// Don't need version column. It's in primary key.
/// max_rows_in_queue should be about max_block_size_ if we won't store a lot of extra blocks (RowRef holds SharedBlockPtr).
VersionedCollapsingSortedBlockInputStream(
BlockInputStreams inputs_, const SortDescription & description_,
const String & sign_column_, size_t max_block_size_, bool can_collapse_all_rows_,
WriteBuffer * out_row_sources_buf_ = nullptr)
: MergingSortedBlockInputStream(inputs_, description_, max_block_size_, 0, out_row_sources_buf_)
, sign_column(sign_column_)
, max_rows_in_queue(std::min(std::max<size_t>(3, max_block_size_), MAX_ROWS_IN_MULTIVERSION_QUEUE) - 2)
, current_keys(max_rows_in_queue + 1), can_collapse_all_rows(can_collapse_all_rows_)
{
}
String getName() const override { return "VersionedCollapsingSorted"; }
String getID() const override
{
std::stringstream res;
res << "VersionedCollapsingSortedBlockInputStream(inputs";
for (const auto & child : children)
res << ", " << child->getID();
res << ", description";
for (const auto & descr : description)
res << ", " << descr.getID();
res << ", sign_column, " << sign_column;
res << ", version_column, " << sign_column << ")";
return res.str();
}
protected:
/// Can return 1 more records than max_block_size.
Block readImpl() override;
private:
String sign_column;
size_t sign_column_number = 0;
Logger * log = &Logger::get("VersionedCollapsingSortedBlockInputStream");
/// Read is finished.
bool finished = false;
Int8 sign_in_queue = 0;
const size_t max_rows_in_queue;
/// Rows with the same primary key and sign.
FixedSizeDequeWithGaps<RowRef> current_keys;
size_t blocks_written = 0;
/// Sources of rows for VERTICAL merge algorithm. Size equals to (size + number of gaps) in current_keys.
std::queue<RowSourcePart> current_row_sources;
const bool can_collapse_all_rows;
void merge(MutableColumns & merged_columns, std::priority_queue<SortCursor> & queue);
/// Output to result row for the current primary key.
void insertRow(size_t skip_rows, const RowRef & row, MutableColumns & merged_columns);
void insertGap(size_t gap_size);
};
}

View File

@ -61,6 +61,13 @@ void FunctionArray::executeImpl(Block & block, const ColumnNumbers & arguments,
{
size_t num_elements = arguments.size();
if (num_elements == 0)
{
/// We should return constant empty array.
block.getByPosition(result).column = block.getByPosition(result).type->createColumnConstWithDefaultValue(block.rows());
return;
}
const DataTypePtr & return_type = block.getByPosition(result).type;
const DataTypePtr & elem_type = static_cast<const DataTypeArray &>(*return_type).getNestedType();
@ -91,8 +98,7 @@ void FunctionArray::executeImpl(Block & block, const ColumnNumbers & arguments,
columns[i] = columns_holder[i].get();
}
/** Create and fill the result array.
*/
/// Create and fill the result array.
auto out = ColumnArray::create(elem_type->createColumn());
IColumn & out_data = out->getData();

View File

@ -10,8 +10,9 @@ set (INTERNAL_LINKER_EXECUTABLE "clickhouse-lld" CACHE STRING "")
# Disabling asan reporting for these tools
if (CMAKE_BUILD_TYPE_UC STREQUAL "ASAN")
set(INTERNAL_COMPILER_EXECUTABLE "env ASAN_OPTIONS=detect_leaks=0 ${INTERNAL_COMPILER_EXECUTABLE}")
set(INTERNAL_LINKER_EXECUTABLE "env ASAN_OPTIONS=detect_leaks=0 ${INTERNAL_LINKER_EXECUTABLE}")
set(INTERNAL_COMPILER_ENV "env ASAN_OPTIONS=detect_leaks=0" CACHE STRING "")
else ()
set(INTERNAL_COMPILER_ENV "" CACHE STRING "")
endif ()
set (INTERNAL_COMPILER_NO_WARNING OFF CACHE INTERNAL "")
@ -39,7 +40,7 @@ string (REPLACE ${ClickHouse_SOURCE_DIR} ${INTERNAL_COMPILER_HEADERS} INTERNAL_B
string (REPLACE ${ClickHouse_SOURCE_DIR} ${INTERNAL_COMPILER_HEADERS} INTERNAL_Poco_Foundation_INCLUDE_DIR ${Poco_Foundation_INCLUDE_DIR})
string (REPLACE ${ClickHouse_SOURCE_DIR} ${INTERNAL_COMPILER_HEADERS} INTERNAL_Poco_Util_INCLUDE_DIR ${Poco_Util_INCLUDE_DIR})
message (STATUS "Using internal compiler: headers=${INTERNAL_COMPILER_HEADERS} : ${INTERNAL_COMPILER_EXECUTABLE} ${INTERNAL_COMPILER_FLAGS}; ${INTERNAL_LINKER_EXECUTABLE}")
message (STATUS "Using internal compiler: headers=${INTERNAL_COMPILER_HEADERS} : ${INTERNAL_COMPILER_ENV} ${INTERNAL_COMPILER_EXECUTABLE} ${INTERNAL_COMPILER_FLAGS}; ${INTERNAL_LINKER_EXECUTABLE}")
set (CONFIG_COMPILE ${ClickHouse_BINARY_DIR}/dbms/src/Interpreters/config_compile.h)
configure_file (${ClickHouse_SOURCE_DIR}/dbms/src/Interpreters/config_compile.h.in ${CONFIG_COMPILE})

View File

@ -203,7 +203,6 @@ void Compiler::compile(
std::string prefix = path + "/" + file_name;
std::string cpp_file_path = prefix + ".cpp";
std::string o_file_path = prefix + ".o";
std::string so_file_path = prefix + ".so";
std::string so_tmp_file_path = prefix + ".so.tmp";
@ -219,8 +218,12 @@ void Compiler::compile(
/// Slightly unconvenient.
command <<
"("
INTERNAL_COMPILER_EXECUTABLE
INTERNAL_COMPILER_ENV
" " INTERNAL_COMPILER_EXECUTABLE
" " INTERNAL_COMPILER_FLAGS
/// It is hard to correctly call a ld program manually, because it is easy to skip critical flags, which might lead to
/// unhandled exceptions. Therefore pass path to llvm's lld directly to clang.
" -fuse-ld=/usr/bin/" INTERNAL_LINKER_EXECUTABLE
#if INTERNAL_COMPILER_CUSTOM_ROOT
@ -247,13 +250,7 @@ void Compiler::compile(
" -I " INTERNAL_Boost_INCLUDE_DIRS
" -I " INTERNAL_COMPILER_HEADERS "/libs/libcommon/include/"
" " << additional_compiler_flags <<
" -c -o " << o_file_path << " " << cpp_file_path
<< " 2>&1"
") && ("
INTERNAL_LINKER_EXECUTABLE
" -shared"
" -o " << so_tmp_file_path
<< " " << o_file_path
" -shared -o " << so_tmp_file_path << " " << cpp_file_path
<< " 2>&1"
") || echo Return code: $?";
@ -270,7 +267,6 @@ void Compiler::compile(
/// If there was an error before, the file with the code remains for viewing.
Poco::File(cpp_file_path).remove();
Poco::File(o_file_path).remove();
Poco::File(so_tmp_file_path).renameTo(so_file_path);
SharedLibraryPtr lib(new SharedLibrary(so_file_path));

View File

@ -1097,15 +1097,9 @@ private:
BlockIO executeDDLQueryOnCluster(const ASTPtr & query_ptr_, const Context & context)
{
/// Remove FORMAT <fmt> and INTO OUTFILE <file> if exists
ASTPtr query_ptr = query_ptr_->clone();
/// Remove FORMAT ... INTO OUTFILE if exists
if (dynamic_cast<const ASTQueryWithOutput *>(query_ptr_.get()))
{
auto query_with_output = dynamic_cast<ASTQueryWithOutput *>(query_ptr.get());
query_with_output->out_file = nullptr;
query_with_output->format = nullptr;
}
ASTQueryWithOutput::resetOutputASTIfExist(*query_ptr);
auto query = dynamic_cast<ASTQueryWithOnCluster *>(query_ptr.get());
if (!query)

View File

@ -587,6 +587,7 @@ BlockIO InterpreterCreateQuery::execute()
{
ASTCreateQuery & create = typeid_cast<ASTCreateQuery &>(*query_ptr);
checkAccess(create);
ASTQueryWithOutput::resetOutputASTIfExist(create);
/// CREATE|ATTACH DATABASE
if (!create.database.empty() && create.table.empty())

View File

@ -6,8 +6,12 @@
#endif
#cmakedefine PATH_SHARE "@PATH_SHARE@"
#cmakedefine INTERNAL_COMPILER_FLAGS "@INTERNAL_COMPILER_FLAGS@"
#cmakedefine INTERNAL_COMPILER_EXECUTABLE "@INTERNAL_COMPILER_EXECUTABLE@"
#cmakedefine INTERNAL_LINKER_EXECUTABLE "@INTERNAL_LINKER_EXECUTABLE@"
#cmakedefine INTERNAL_COMPILER_EXECUTABLE "@INTERNAL_COMPILER_EXECUTABLE@"
#cmakedefine INTERNAL_COMPILER_ENV "@INTERNAL_COMPILER_ENV@"
#ifndef INTERNAL_COMPILER_ENV
#define INTERNAL_COMPILER_ENV ""
#endif
#cmakedefine INTERNAL_COMPILER_HEADERS "@INTERNAL_COMPILER_HEADERS@"
#cmakedefine INTERNAL_COMPILER_HEADERS_ROOT "@INTERNAL_COMPILER_HEADERS_ROOT@"
#cmakedefine01 INTERNAL_COMPILER_CUSTOM_ROOT

View File

@ -12,18 +12,45 @@ int main(int, char **)
Logger::root().setChannel(channel);
Logger::root().setLevel("trace");
/// Check exception handling and catching
try
{
Compiler compiler(".", 1);
auto lib = compiler.getOrCount("xxx", 1, "", []() -> std::string
auto lib = compiler.getOrCount("catch_me_if_you_can", 0, "", []() -> std::string
{
return "void f() __attribute__((__visibility__(\"default\"))); void f() {}";
return
"#include <iostream>\n"
"void f() __attribute__((__visibility__(\"default\")));\n"
"void f()"
"{"
"try { throw std::runtime_error(\"Catch me if you can\"); }"
"catch (const std::runtime_error & e) { std::cout << \"Caught in .so: \" << e.what() << std::endl; throw; }\n"
"}"
;
}, [](SharedLibraryPtr&){});
auto f = lib->template get<void (*)()>("_Z1fv");
try
{
f();
}
catch (const std::exception & e)
{
std::cout << "Caught in main(): " << e.what() << "\n";
return 0;
}
catch (...)
{
std::cout << "Unknown exception\n";
return -1;
}
}
catch (const DB::Exception & e)
catch (...)
{
std::cerr << e.displayText() << std::endl;
std::cerr << getCurrentExceptionMessage(true) << "\n";
return -1;
}
return 0;

View File

@ -36,4 +36,17 @@ void ASTQueryWithOutput::formatImpl(const FormatSettings & s, FormatState & stat
}
}
bool ASTQueryWithOutput::resetOutputASTIfExist(IAST & ast)
{
if (auto ast_with_output = dynamic_cast<ASTQueryWithOutput *>(&ast))
{
ast_with_output->format.reset();
ast_with_output->out_file.reset();
return true;
}
return false;
}
}

View File

@ -19,6 +19,9 @@ public:
void formatImpl(const FormatSettings & s, FormatState & state, FormatStateStacked frame) const final;
/// Remove 'FORMAT <fmt> and INTO OUTFILE <file>' if exists
static bool resetOutputASTIfExist(IAST & ast);
protected:
/// NOTE: call this helper at the end of the clone() method of descendant class.
void cloneOutputOptions(ASTQueryWithOutput & cloned) const;

View File

@ -313,12 +313,12 @@ int Server::main(const std::vector<std::string> & /*args*/)
std::vector<std::string> listen_hosts = DB::getMultipleValuesFromConfig(config(), "", "listen_host");
bool try_listen = false;
bool listen_try = config().getUInt("listen_try", false);
if (listen_hosts.empty())
{
listen_hosts.emplace_back("::1");
listen_hosts.emplace_back("127.0.0.1");
try_listen = true;
listen_try = true;
}
auto make_socket_address = [&](const std::string & host, UInt16 port)
@ -452,7 +452,7 @@ int Server::main(const std::vector<std::string> & /*args*/)
}
catch (const Poco::Net::NetException & e)
{
if (try_listen && (e.code() == POCO_EPROTONOSUPPORT || e.code() == POCO_EADDRNOTAVAIL))
if (listen_try && (e.code() == POCO_EPROTONOSUPPORT || e.code() == POCO_EADDRNOTAVAIL))
LOG_ERROR(log, "Listen [" << listen_host << "]: " << e.what() << ": " << e.message()
<< " If it is an IPv6 or IPv4 address and your host has disabled IPv6 or IPv4, then consider to "
"specify not disabled IPv4 or IPv6 address to listen in <listen_host> element of configuration "

View File

@ -72,6 +72,8 @@
<listen_host>::1</listen_host>
<listen_host>127.0.0.1</listen_host>
-->
<!-- Don't exit if ipv6 or ipv4 unavailable, but listen_host with this protocol specified -->
<!-- <listen_try>0</listen_try> -->
<max_connections>4096</max_connections>
<keep_alive_timeout>3</keep_alive_timeout>

View File

@ -142,9 +142,9 @@ Block MergeTreeBaseBlockInputStream::readFromPart()
/// Have to call resize(0) instead of cloneEmpty to save structure.
/// (To keep offsets possibly shared between different arrays.)
static_cast<ColumnArray &>(*column_array->assumeMutable()).getOffsets().resize(0);
static_cast<ColumnArray &>(column_array->assumeMutableRef()).getOffsets().resize(0);
/// It's ok until multidimensional arrays are not stored in MergeTree.
static_cast<ColumnArray &>(*column_array->assumeMutable()).getDataPtr() = column_array->getDataPtr()->cloneEmpty();
static_cast<ColumnArray &>(column_array->assumeMutableRef()).getDataPtr() = column_array->getDataPtr()->cloneEmpty();
}
else
col.column = col.column->cloneEmpty();
@ -235,14 +235,14 @@ Block MergeTreeBaseBlockInputStream::readFromPart()
{
/*
If this filter is PREWHERE 0, MergeTree Stream can be marked as done,
and this task can be clear.
and this task can be clear.
If we don't mark this task finished here, readImpl could
jump into endless loop.
Error scenario:
select * from table where isNull(NOT_NULLABLE_COLUMN) AND OTHER PRED;
and isNull pred is promoted to PREWHERE.
(though it is difficult to reproduce)
*/
*/
task->current_range_reader.reset();
task->mark_ranges.clear();
res.clear();

View File

@ -85,6 +85,7 @@ MergeTreeData::MergeTreeData(
const ColumnDefaults & column_defaults_,
Context & context_,
const ASTPtr & primary_expr_ast_,
const ASTPtr & secondary_sort_expr_ast_,
const String & date_column_name,
const ASTPtr & partition_expr_ast_,
const ASTPtr & sampling_expression_,
@ -100,6 +101,7 @@ MergeTreeData::MergeTreeData(
merging_params(merging_params_),
settings(settings_),
primary_expr_ast(primary_expr_ast_),
secondary_sort_expr_ast(secondary_sort_expr_ast_),
partition_expr_ast(partition_expr_ast_),
require_part_metadata(require_part_metadata_),
database_name(database_), table_name(table_),
@ -194,19 +196,27 @@ void MergeTreeData::initPrimaryKey()
if (!primary_expr_ast)
return;
/// Initialize description of sorting.
sort_descr.clear();
sort_descr.reserve(primary_expr_ast->children.size());
for (const ASTPtr & ast : primary_expr_ast->children)
auto addSortDescription = [](SortDescription & descr, const ASTPtr & expr_ast)
{
String name = ast->getColumnName();
sort_descr.emplace_back(name, 1, 1);
}
descr.reserve(descr.size() + expr_ast->children.size());
for (const ASTPtr & ast : expr_ast->children)
{
String name = ast->getColumnName();
descr.emplace_back(name, 1, 1);
}
};
/// Initialize description of sorting for primary key.
primary_sort_descr.clear();
addSortDescription(primary_sort_descr, primary_expr_ast);
primary_expr = ExpressionAnalyzer(primary_expr_ast, context, nullptr, getColumnsList()).getActions(false);
ExpressionActionsPtr projected_expr = ExpressionAnalyzer(primary_expr_ast, context, nullptr, getColumnsList()).getActions(true);
primary_key_sample = projected_expr->getSampleBlock();
{
ExpressionActionsPtr projected_expr =
ExpressionAnalyzer(primary_expr_ast, context, nullptr, getColumnsList()).getActions(true);
primary_key_sample = projected_expr->getSampleBlock();
}
size_t primary_key_size = primary_key_sample.columns();
@ -219,6 +229,20 @@ void MergeTreeData::initPrimaryKey()
primary_key_data_types.resize(primary_key_size);
for (size_t i = 0; i < primary_key_size; ++i)
primary_key_data_types[i] = primary_key_sample.getByPosition(i).type;
sort_descr = primary_sort_descr;
if (secondary_sort_expr_ast)
{
addSortDescription(sort_descr, secondary_sort_expr_ast);
secondary_sort_expr = ExpressionAnalyzer(secondary_sort_expr_ast, context, nullptr, getColumnsList()).getActions(false);
ExpressionActionsPtr projected_expr =
ExpressionAnalyzer(secondary_sort_expr_ast, context, nullptr, getColumnsList()).getActions(true);
auto secondary_key_sample = projected_expr->getSampleBlock();
for (size_t i = 0; i < secondary_key_sample.columns(); ++i)
checkForAllowedKeyColumns(secondary_key_sample.getByPosition(i), "Secondary");
}
}
@ -272,11 +296,28 @@ void MergeTreeData::initPartitionKey()
void MergeTreeData::MergingParams::check(const NamesAndTypesList & columns) const
{
if (!sign_column.empty() && mode != MergingParams::Collapsing && mode != MergingParams::VersionedCollapsing)
throw Exception("Sign column for MergeTree cannot be specified in modes except Collapsing or VersionedCollapsing.",
ErrorCodes::LOGICAL_ERROR);
if (!version_column.empty() && mode != MergingParams::Replacing && mode != MergingParams::VersionedCollapsing)
throw Exception("Version column for MergeTree cannot be specified in modes except Replacing or VersionedCollapsing.",
ErrorCodes::LOGICAL_ERROR);
if (!columns_to_sum.empty() && mode != MergingParams::Summing)
throw Exception("List of columns to sum for MergeTree cannot be specified in all modes except Summing.",
ErrorCodes::LOGICAL_ERROR);
/// Check that if the sign column is needed, it exists and is of type Int8.
if (mode == MergingParams::Collapsing)
auto check_sign_column = [this, & columns](bool is_optional, const std::string & storage)
{
if (sign_column.empty())
throw Exception("Logical error: Sign column for storage CollapsingMergeTree is empty", ErrorCodes::LOGICAL_ERROR);
{
if (is_optional)
return;
throw Exception("Logical error: Sign column for storage " + storage + " is empty", ErrorCodes::LOGICAL_ERROR);
}
bool miss_column = true;
for (const auto & column : columns)
@ -284,59 +325,69 @@ void MergeTreeData::MergingParams::check(const NamesAndTypesList & columns) cons
if (column.name == sign_column)
{
if (!typeid_cast<const DataTypeInt8 *>(column.type.get()))
throw Exception("Sign column (" + sign_column + ")"
" for storage CollapsingMergeTree must have type Int8."
" Provided column of type " + column.type->getName() + ".", ErrorCodes::BAD_TYPE_OF_FIELD);
throw Exception("Sign column (" + sign_column + ") for storage " + storage + " must have type Int8."
" Provided column of type " + column.type->getName() + ".", ErrorCodes::BAD_TYPE_OF_FIELD);
miss_column = false;
break;
}
}
if (miss_column)
throw Exception("Sign column " + sign_column + " does not exist in table declaration.");
}
else if (!sign_column.empty())
throw Exception("Sign column for MergeTree cannot be specified in all modes except Collapsing.", ErrorCodes::LOGICAL_ERROR);
};
/// If colums_to_sum are set, then check that such columns exist.
if (!columns_to_sum.empty())
/// that if the version_column column is needed, it exists and is of unsigned integer type.
auto check_version_column = [this, & columns](bool is_optional, const std::string & storage)
{
if (mode != MergingParams::Summing)
throw Exception("List of columns to sum for MergeTree cannot be specified in all modes except Summing.",
ErrorCodes::LOGICAL_ERROR);
if (version_column.empty())
{
if (is_optional)
return;
for (const auto & column_to_sum : columns_to_sum)
if (columns.end() == std::find_if(columns.begin(), columns.end(),
[&](const NameAndTypePair & name_and_type) { return column_to_sum == Nested::extractTableName(name_and_type.name); }))
throw Exception("Column " + column_to_sum + " listed in columns to sum does not exist in table declaration.");
}
/// Check that version_column column is set only for Replacing mode and is of unsigned integer type.
if (!version_column.empty())
{
if (mode != MergingParams::Replacing)
throw Exception("Version column for MergeTree cannot be specified in all modes except Replacing.",
ErrorCodes::LOGICAL_ERROR);
throw Exception("Logical error: Version column for storage " + storage + " is empty", ErrorCodes::LOGICAL_ERROR);
}
bool miss_column = true;
for (const auto & column : columns)
{
if (column.name == version_column)
{
if (!typeid_cast<const DataTypeUInt8 *>(column.type.get())
&& !typeid_cast<const DataTypeUInt16 *>(column.type.get())
&& !typeid_cast<const DataTypeUInt32 *>(column.type.get())
&& !typeid_cast<const DataTypeUInt64 *>(column.type.get())
&& !typeid_cast<const DataTypeDate *>(column.type.get())
&& !typeid_cast<const DataTypeDateTime *>(column.type.get()))
if (!column.type->isUnsignedInteger() && !column.type->isDateOrDateTime())
throw Exception("Version column (" + version_column + ")"
" for storage ReplacingMergeTree must have type of UInt family or Date or DateTime."
" Provided column of type " + column.type->getName() + ".", ErrorCodes::BAD_TYPE_OF_FIELD);
" for storage " + storage + " must have type of UInt family or Date or DateTime."
" Provided column of type " + column.type->getName() + ".", ErrorCodes::BAD_TYPE_OF_FIELD);
miss_column = false;
break;
}
}
if (miss_column)
throw Exception("Version column " + version_column + " does not exist in table declaration.");
};
if (mode == MergingParams::Collapsing)
check_sign_column(false, "CollapsingMergeTree");
if (mode == MergingParams::Summing)
{
/// If columns_to_sum are set, then check that such columns exist.
for (const auto & column_to_sum : columns_to_sum)
{
auto check_column_to_sum_exists = [& column_to_sum](const NameAndTypePair & name_and_type)
{
return column_to_sum == Nested::extractTableName(name_and_type.name);
};
if (columns.end() == std::find_if(columns.begin(), columns.end(), check_column_to_sum_exists))
throw Exception(
"Column " + column_to_sum + " listed in columns to sum does not exist in table declaration.");
}
}
if (mode == MergingParams::Replacing)
check_version_column(true, "ReplacingMergeTree");
if (mode == MergingParams::VersionedCollapsing)
{
check_sign_column(false, "VersionedCollapsingMergeTree");
check_version_column(false, "VersionedCollapsingMergeTree");
}
/// TODO Checks for Graphite mode.
@ -354,6 +405,7 @@ String MergeTreeData::MergingParams::getModeName() const
case Unsorted: return "Unsorted";
case Replacing: return "Replacing";
case Graphite: return "Graphite";
case VersionedCollapsing: return "VersionedCollapsing";
default:
throw Exception("Unknown mode of operation for MergeTreeData: " + toString<int>(mode), ErrorCodes::LOGICAL_ERROR);
@ -826,18 +878,25 @@ void MergeTreeData::checkAlter(const AlterCommands & commands)
columns_alter_forbidden.insert(col);
}
if (primary_expr)
auto processSortingColumns =
[&columns_alter_forbidden, &columns_alter_metadata_only] (const ExpressionActionsPtr & expression)
{
for (const ExpressionAction & action : primary_expr->getActions())
for (const ExpressionAction & action : expression->getActions())
{
auto action_columns = action.getNeededColumns();
columns_alter_forbidden.insert(action_columns.begin(), action_columns.end());
}
for (const String & col : primary_expr->getRequiredColumns())
for (const String & col : expression->getRequiredColumns())
columns_alter_metadata_only.insert(col);
}
};
if (primary_expr)
processSortingColumns(primary_expr);
/// We don't process sampling_expression separately because it must be among the primary key columns.
if (secondary_sort_expr)
processSortingColumns(secondary_sort_expr);
if (!merging_params.sign_column.empty())
columns_alter_forbidden.insert(merging_params.sign_column);
@ -1084,6 +1143,7 @@ MergeTreeData::AlterDataPartTransactionPtr MergeTreeData::alterDataPart(
size_t new_primary_key_file_size{};
MergeTreeDataPartChecksum::uint128 new_primary_key_hash{};
/// TODO: Check the order of secondary sorting key columns.
if (new_primary_key.get() != primary_expr_ast.get())
{
ExpressionActionsPtr new_primary_expr = ExpressionAnalyzer(new_primary_key, context, nullptr, new_columns).getActions(true);
@ -2190,7 +2250,7 @@ bool MergeTreeData::isPrimaryKeyColumn(const ASTPtr &node) const
{
String column_name = node->getColumnName();
for (const auto & column : sort_descr)
for (const auto & column : primary_sort_descr)
if (column_name == column.column_name)
return true;

View File

@ -235,24 +235,25 @@ public:
/// Merging mode. See above.
enum Mode
{
Ordinary = 0, /// Enum values are saved. Do not change them.
Collapsing = 1,
Summing = 2,
Aggregating = 3,
Unsorted = 4,
Replacing = 5,
Graphite = 6,
Ordinary = 0, /// Enum values are saved. Do not change them.
Collapsing = 1,
Summing = 2,
Aggregating = 3,
Unsorted = 4,
Replacing = 5,
Graphite = 6,
VersionedCollapsing = 7,
};
Mode mode;
/// For collapsing mode.
/// For Collapsing and VersionedCollapsing mode.
String sign_column;
/// For Summing mode. If empty - columns_to_sum is determined automatically.
Names columns_to_sum;
/// For Replacing mode. Can be empty.
/// For Replacing and VersionedCollapsing mode. Can be empty for Replacing.
String version_column;
/// For Graphite mode.
@ -280,6 +281,7 @@ public:
const ColumnDefaults & column_defaults_,
Context & context_,
const ASTPtr & primary_expr_ast_,
const ASTPtr & secondary_sort_expr_ast_,
const String & date_column_name,
const ASTPtr & partition_expr_ast_,
const ASTPtr & sampling_expression_, /// nullptr, if sampling is not supported.
@ -300,7 +302,8 @@ public:
return merging_params.mode == MergingParams::Collapsing
|| merging_params.mode == MergingParams::Summing
|| merging_params.mode == MergingParams::Aggregating
|| merging_params.mode == MergingParams::Replacing;
|| merging_params.mode == MergingParams::Replacing
|| merging_params.mode == MergingParams::VersionedCollapsing;
}
bool mayBenefitFromIndexForIn(const ASTPtr & left_in_operand) const;
@ -440,6 +443,8 @@ public:
}
ExpressionActionsPtr getPrimaryExpression() const { return primary_expr; }
ExpressionActionsPtr getSecondarySortExpression() const { return secondary_sort_expr; } /// may return nullptr
SortDescription getPrimarySortDescription() const { return primary_sort_descr; }
SortDescription getSortDescription() const { return sort_descr; }
/// Check that the part is not broken and calculate the checksums for it if they are not present.
@ -515,6 +520,7 @@ public:
const MergeTreeSettings settings;
ASTPtr primary_expr_ast;
ASTPtr secondary_sort_expr_ast;
Block primary_key_sample;
DataTypes primary_key_data_types;
@ -544,6 +550,11 @@ private:
bool require_part_metadata;
ExpressionActionsPtr primary_expr;
/// Additional expression for sorting (of rows with the same primary keys).
ExpressionActionsPtr secondary_sort_expr;
/// Sort description for primary key. Is the prefix of sort_descr.
SortDescription primary_sort_descr;
/// Sort description for primary key + secondary sorting columns.
SortDescription sort_descr;
String database_name;

View File

@ -13,6 +13,7 @@
#include <DataStreams/ReplacingSortedBlockInputStream.h>
#include <DataStreams/GraphiteRollupSortedBlockInputStream.h>
#include <DataStreams/AggregatingSortedBlockInputStream.h>
#include <DataStreams/VersionedCollapsingSortedBlockInputStream.h>
#include <DataStreams/MaterializingBlockInputStream.h>
#include <DataStreams/ConcatBlockInputStream.h>
#include <DataStreams/ColumnGathererStream.h>
@ -322,14 +323,20 @@ MergeTreeData::DataPartsVector MergeTreeDataMerger::selectAllPartsFromPartition(
/// PK columns are sorted and merged, ordinary columns are gathered using info from merge step
static void extractMergingAndGatheringColumns(const NamesAndTypesList & all_columns, ExpressionActionsPtr primary_key_expressions,
static void extractMergingAndGatheringColumns(const NamesAndTypesList & all_columns,
const ExpressionActionsPtr & primary_key_expressions, const ExpressionActionsPtr & secondary_key_expressions,
const MergeTreeData::MergingParams & merging_params,
NamesAndTypesList & gathering_columns, Names & gathering_column_names,
NamesAndTypesList & merging_columns, Names & merging_column_names
)
{
Names key_columns_dup = primary_key_expressions->getRequiredColumns();
std::set<String> key_columns(key_columns_dup.cbegin(), key_columns_dup.cend());
Names primary_key_columns_dup = primary_key_expressions->getRequiredColumns();
std::set<String> key_columns(primary_key_columns_dup.cbegin(), primary_key_columns_dup.cend());
if (secondary_key_expressions)
{
Names secondary_key_columns_dup = secondary_key_expressions->getRequiredColumns();
key_columns.insert(secondary_key_columns_dup.begin(), secondary_key_columns_dup.end());
}
/// Force sign column for Collapsing mode
if (merging_params.mode == MergeTreeData::MergingParams::Collapsing)
@ -339,6 +346,10 @@ static void extractMergingAndGatheringColumns(const NamesAndTypesList & all_colu
if (merging_params.mode == MergeTreeData::MergingParams::Replacing)
key_columns.emplace(merging_params.version_column);
/// Force sign column for VersionedCollapsing mode. Version is already in primary key.
if (merging_params.mode == MergeTreeData::MergingParams::VersionedCollapsing)
key_columns.emplace(merging_params.sign_column);
/// TODO: also force "summing" and "aggregating" columns to make Horizontal merge only for such columns
for (auto & column : all_columns)
@ -530,8 +541,8 @@ MergeTreeData::MutableDataPartPtr MergeTreeDataMerger::mergePartsToTemporaryPart
NamesAndTypesList gathering_columns, merging_columns;
Names gathering_column_names, merging_column_names;
extractMergingAndGatheringColumns(all_columns, data.getPrimaryExpression(), data.merging_params,
gathering_columns, gathering_column_names, merging_columns, merging_column_names);
extractMergingAndGatheringColumns(all_columns, data.getPrimaryExpression(), data.getSecondarySortExpression()
, data.merging_params, gathering_columns, gathering_column_names, merging_columns, merging_column_names);
MergeTreeData::MutableDataPartPtr new_data_part = std::make_shared<MergeTreeData::DataPart>(
data, future_part.name, future_part.part_info);
@ -626,6 +637,11 @@ MergeTreeData::MutableDataPartPtr MergeTreeDataMerger::mergePartsToTemporaryPart
data.merging_params.graphite_params, time_of_merge);
break;
case MergeTreeData::MergingParams::VersionedCollapsing:
merged_stream = std::make_unique<VersionedCollapsingSortedBlockInputStream>(
src_streams, sort_desc, data.merging_params.sign_column, DEFAULT_MERGE_BLOCK_SIZE, false, rows_sources_write_buf.get());
break;
case MergeTreeData::MergingParams::Unsorted:
merged_stream = std::make_unique<ConcatBlockInputStream>(src_streams);
break;
@ -792,7 +808,8 @@ MergeTreeDataMerger::MergeAlgorithm MergeTreeDataMerger::chooseMergeAlgorithm(
bool is_supported_storage =
data.merging_params.mode == MergeTreeData::MergingParams::Ordinary ||
data.merging_params.mode == MergeTreeData::MergingParams::Collapsing ||
data.merging_params.mode == MergeTreeData::MergingParams::Replacing;
data.merging_params.mode == MergeTreeData::MergingParams::Replacing ||
data.merging_params.mode == MergeTreeData::MergingParams::VersionedCollapsing;
bool enough_ordinary_cols = gathering_columns.size() >= data.context.getMergeTreeSettings().vertical_merge_algorithm_min_columns_to_activate;

View File

@ -644,7 +644,7 @@ void MergeTreeDataPart::loadIndex()
.getSize() / MERGE_TREE_MARK_SIZE;
}
size_t key_size = storage.sort_descr.size();
size_t key_size = storage.primary_sort_descr.size();
if (key_size)
{
@ -820,7 +820,7 @@ void MergeTreeDataPart::checkConsistency(bool require_part_metadata)
if (!checksums.empty())
{
if (!storage.sort_descr.empty() && !checksums.files.count("primary.idx"))
if (!storage.primary_sort_descr.empty() && !checksums.files.count("primary.idx"))
throw Exception("No checksum for primary.idx", ErrorCodes::NO_FILE_IN_DATA_PART);
if (require_part_metadata)
@ -870,7 +870,7 @@ void MergeTreeDataPart::checkConsistency(bool require_part_metadata)
};
/// Check that the primary key index is not empty.
if (!storage.sort_descr.empty())
if (!storage.primary_sort_descr.empty())
check_file_not_empty(path + "primary.idx");
if (storage.format_version >= MERGE_TREE_DATA_MIN_FORMAT_VERSION_WITH_CUSTOM_PARTITIONING)

View File

@ -38,6 +38,7 @@ namespace std
#include <DataStreams/SummingSortedBlockInputStream.h>
#include <DataStreams/ReplacingSortedBlockInputStream.h>
#include <DataStreams/AggregatingSortedBlockInputStream.h>
#include <DataStreams/VersionedCollapsingSortedBlockInputStream.h>
#include <DataTypes/DataTypesNumber.h>
#include <DataTypes/DataTypeDate.h>
#include <DataTypes/DataTypeEnum.h>
@ -195,7 +196,7 @@ BlockInputStreams MergeTreeDataSelectExecutor::read(
processed_stage = QueryProcessingStage::FetchColumns;
const Settings & settings = context.getSettingsRef();
SortDescription sort_descr = data.getSortDescription();
SortDescription sort_descr = data.getPrimarySortDescription();
PKCondition key_condition(query_info, context, available_real_and_virtual_columns, sort_descr,
data.getPrimaryExpression());
@ -783,7 +784,7 @@ BlockInputStreams MergeTreeDataSelectExecutor::spreadMarkRangesAmongStreamsFinal
BlockInputStreams res;
if (to_merge.size() == 1)
{
if (!data.merging_params.sign_column.empty())
if (data.merging_params.mode == MergeTreeData::MergingParams::Collapsing)
{
ExpressionActionsPtr sign_filter_expression;
String sign_filter_column;
@ -806,12 +807,13 @@ BlockInputStreams MergeTreeDataSelectExecutor::spreadMarkRangesAmongStreamsFinal
break;
case MergeTreeData::MergingParams::Collapsing:
merged = std::make_shared<CollapsingFinalBlockInputStream>(to_merge, data.getSortDescription(), data.merging_params.sign_column);
merged = std::make_shared<CollapsingFinalBlockInputStream>(
to_merge, data.getSortDescription(), data.merging_params.sign_column);
break;
case MergeTreeData::MergingParams::Summing:
merged = std::make_shared<SummingSortedBlockInputStream>(to_merge,
data.getSortDescription(), data.merging_params.columns_to_sum, max_block_size);
data.getSortDescription(), data.merging_params.columns_to_sum, max_block_size);
break;
case MergeTreeData::MergingParams::Aggregating:
@ -820,7 +822,12 @@ BlockInputStreams MergeTreeDataSelectExecutor::spreadMarkRangesAmongStreamsFinal
case MergeTreeData::MergingParams::Replacing: /// TODO Make ReplacingFinalBlockInputStream
merged = std::make_shared<ReplacingSortedBlockInputStream>(to_merge,
data.getSortDescription(), data.merging_params.version_column, max_block_size);
data.getSortDescription(), data.merging_params.version_column, max_block_size);
break;
case MergeTreeData::MergingParams::VersionedCollapsing: /// TODO Make VersionedCollapsingFinalBlockInputStream
merged = std::make_shared<VersionedCollapsingSortedBlockInputStream>(
to_merge, data.getSortDescription(), data.merging_params.sign_column, max_block_size, true);
break;
case MergeTreeData::MergingParams::Unsorted:

View File

@ -174,7 +174,12 @@ MergeTreeData::MutableDataPartPtr MergeTreeDataWriter::writeTempPart(BlockWithPa
/// If you need to calculate some columns to sort, we do it.
if (data.merging_params.mode != MergeTreeData::MergingParams::Unsorted)
{
data.getPrimaryExpression()->execute(block);
auto secondary_sort_expr = data.getSecondarySortExpression();
if (secondary_sort_expr)
secondary_sort_expr->execute(block);
}
SortDescription sort_descr = data.getSortDescription();

View File

@ -39,7 +39,7 @@ MergeTreeWhereOptimizer::MergeTreeWhereOptimizer(
const MergeTreeData & data,
const Names & column_names,
Logger * log)
: primary_key_columns{ext::map<std::unordered_set>(data.getSortDescription(),
: primary_key_columns{ext::map<std::unordered_set>(data.getPrimarySortDescription(),
[] (const SortColumnDescription & col) { return col.column_name; })},
table_columns{ext::map<std::unordered_set>(data.getColumnsList(),
[] (const NameAndTypePair & col) { return col.name; })},

View File

@ -371,7 +371,7 @@ void MergedBlockOutputStream::writeImpl(const Block & block, const IColumn::Perm
/// The set of written offset columns so that you do not write shared offsets of nested structures columns several times
OffsetColumns offset_columns;
auto sort_description = storage.getSortDescription();
auto sort_description = storage.getPrimarySortDescription();
/// Here we will add the columns related to the Primary Key, then write the index.
std::vector<ColumnWithTypeAndName> primary_columns(sort_description.size());

View File

@ -213,9 +213,9 @@ MergeTrees are different in two ways:
- they may be replicated and non-replicated;
- they may do different actions on merge: nothing; sign collapse; sum; apply aggregete functions.
So we have 12 combinations:
MergeTree, CollapsingMergeTree, SummingMergeTree, AggregatingMergeTree, ReplacingMergeTree, GraphiteMergeTree
ReplicatedMergeTree, ReplicatedCollapsingMergeTree, ReplicatedSummingMergeTree, ReplicatedAggregatingMergeTree, ReplicatedReplacingMergeTree, ReplicatedGraphiteMergeTree
So we have 14 combinations:
MergeTree, CollapsingMergeTree, SummingMergeTree, AggregatingMergeTree, ReplacingMergeTree, GraphiteMergeTree, VersionedCollapsingMergeTree
ReplicatedMergeTree, ReplicatedCollapsingMergeTree, ReplicatedSummingMergeTree, ReplicatedAggregatingMergeTree, ReplicatedReplacingMergeTree, ReplicatedGraphiteMergeTree, ReplicatedVersionedCollapsingMergeTree
In most of cases, you need MergeTree or ReplicatedMergeTree.
@ -254,6 +254,8 @@ For the Summing mode, the optional last parameter is a list of columns to sum wh
If this parameter is omitted, the storage will sum all numeric columns except columns participating in the primary key.
For the Replacing mode, the optional last parameter is the name of a 'version' column. While merging, for all rows with the same primary key, only one row is selected: the last row, if the version column was not specified, or the last row with the maximum version value, if specified.
For VersionedCollapsing mode, the last 2 parameters are the name of a sign column and the name of a 'version' column. Version column must be in primary key. While merging, a pair of rows with the same primary key and different sign may collapse.
)";
if (is_extended_syntax)
@ -368,6 +370,8 @@ static StoragePtr create(const StorageFactory::Arguments & args)
merging_params.mode = MergeTreeData::MergingParams::Replacing;
else if (name_part == "Graphite")
merging_params.mode = MergeTreeData::MergingParams::Graphite;
else if (name_part == "VersionedCollapsing")
merging_params.mode = MergeTreeData::MergingParams::VersionedCollapsing;
else if (!name_part.empty())
throw Exception(
"Unknown storage " + args.engine_name + getMergeTreeVerboseHelp(is_extended_storage_def),
@ -424,6 +428,12 @@ static StoragePtr create(const StorageFactory::Arguments & args)
case MergeTreeData::MergingParams::Graphite:
add_mandatory_param("'config_element_for_graphite_schema'");
break;
case MergeTreeData::MergingParams::VersionedCollapsing:
{
add_mandatory_param("sign column");
add_mandatory_param("version");
break;
}
}
ASTs & engine_args = args.engine_args;
@ -482,6 +492,8 @@ static StoragePtr create(const StorageFactory::Arguments & args)
engine_args.erase(engine_args.begin(), engine_args.begin() + 2);
}
ASTPtr secondary_sorting_expr_list;
if (merging_params.mode == MergeTreeData::MergingParams::Collapsing)
{
if (auto ast = typeid_cast<const ASTIdentifier *>(engine_args.back().get()))
@ -536,6 +548,30 @@ static StoragePtr create(const StorageFactory::Arguments & args)
engine_args.pop_back();
setGraphitePatternsFromConfig(args.context, graphite_config_name, merging_params.graphite_params);
}
else if (merging_params.mode == MergeTreeData::MergingParams::VersionedCollapsing)
{
if (auto ast = typeid_cast<const ASTIdentifier *>(engine_args.back().get()))
{
merging_params.version_column = ast->name;
secondary_sorting_expr_list = std::make_shared<ASTExpressionList>();
secondary_sorting_expr_list->children.push_back(engine_args.back());
}
else
throw Exception(
"Version column name must be an unquoted string" + getMergeTreeVerboseHelp(is_extended_storage_def),
ErrorCodes::BAD_ARGUMENTS);
engine_args.pop_back();
if (auto ast = typeid_cast<const ASTIdentifier *>(engine_args.back().get()))
merging_params.sign_column = ast->name;
else
throw Exception(
"Sign column name must be an unquoted string" + getMergeTreeVerboseHelp(is_extended_storage_def),
ErrorCodes::BAD_ARGUMENTS);
engine_args.pop_back();
}
String date_column_name;
ASTPtr partition_expr_list;
@ -589,14 +625,14 @@ static StoragePtr create(const StorageFactory::Arguments & args)
return StorageReplicatedMergeTree::create(
zookeeper_path, replica_name, args.attach, args.data_path, args.database_name, args.table_name,
args.columns, args.materialized_columns, args.alias_columns, args.column_defaults,
args.context, primary_expr_list, date_column_name, partition_expr_list,
args.context, primary_expr_list, secondary_sorting_expr_list, date_column_name, partition_expr_list,
sampling_expression, merging_params, storage_settings,
args.has_force_restore_data_flag);
else
return StorageMergeTree::create(
args.data_path, args.database_name, args.table_name,
args.columns, args.materialized_columns, args.alias_columns, args.column_defaults, args.attach,
args.context, primary_expr_list, date_column_name, partition_expr_list,
args.context, primary_expr_list, secondary_sorting_expr_list, date_column_name, partition_expr_list,
sampling_expression, merging_params, storage_settings,
args.has_force_restore_data_flag);
}
@ -610,6 +646,7 @@ void registerStorageMergeTree(StorageFactory & factory)
factory.registerStorage("AggregatingMergeTree", create);
factory.registerStorage("SummingMergeTree", create);
factory.registerStorage("GraphiteMergeTree", create);
factory.registerStorage("VersionedCollapsingMergeTree", create);
factory.registerStorage("ReplicatedMergeTree", create);
factory.registerStorage("ReplicatedCollapsingMergeTree", create);
@ -617,6 +654,7 @@ void registerStorageMergeTree(StorageFactory & factory)
factory.registerStorage("ReplicatedAggregatingMergeTree", create);
factory.registerStorage("ReplicatedSummingMergeTree", create);
factory.registerStorage("ReplicatedGraphiteMergeTree", create);
factory.registerStorage("ReplicatedVersionedCollapsingMergeTree", create);
}
}

View File

@ -190,6 +190,12 @@ StoragePtr StorageMaterializedView::getTargetTable() const
return global_context.getTable(target_database_name, target_table_name);
}
bool StorageMaterializedView::checkTableCanBeDropped() const
{
/// Don't drop the target table if it was created manually via 'TO inner_table' statement
return has_inner_table ? getTargetTable()->checkTableCanBeDropped() : true;
}
void registerStorageMaterializedView(StorageFactory & factory)
{

View File

@ -33,6 +33,7 @@ public:
void drop() override;
bool optimize(const ASTPtr & query, const ASTPtr & partition, bool final, bool deduplicate, const Context & context) override;
void shutdown() override;
bool checkTableCanBeDropped() const override;
BlockInputStreams read(
const Names & column_names,

View File

@ -43,6 +43,7 @@ StorageMergeTree::StorageMergeTree(
bool attach,
Context & context_,
const ASTPtr & primary_expr_ast_,
const ASTPtr & secondary_sorting_expr_list_,
const String & date_column_name,
const ASTPtr & partition_expr_ast_,
const ASTPtr & sampling_expression_, /// nullptr, if sampling is not supported.
@ -55,7 +56,7 @@ StorageMergeTree::StorageMergeTree(
data(database_name, table_name,
full_path, columns_,
materialized_columns_, alias_columns_, column_defaults_,
context_, primary_expr_ast_, date_column_name, partition_expr_ast_,
context_, primary_expr_ast_, secondary_sorting_expr_list_, date_column_name, partition_expr_ast_,
sampling_expression_, merging_params_,
settings_, false, attach),
reader(data), writer(data), merger(data, context.getBackgroundPool()),

View File

@ -140,6 +140,7 @@ protected:
bool attach,
Context & context_,
const ASTPtr & primary_expr_ast_,
const ASTPtr & secondary_sorting_expr_list_,
const String & date_column_name,
const ASTPtr & partition_expr_ast_,
const ASTPtr & sampling_expression_, /// nullptr, if sampling is not supported.

View File

@ -173,6 +173,7 @@ StorageReplicatedMergeTree::StorageReplicatedMergeTree(
const ColumnDefaults & column_defaults_,
Context & context_,
const ASTPtr & primary_expr_ast_,
const ASTPtr & secondary_sorting_expr_list_,
const String & date_column_name,
const ASTPtr & partition_expr_ast_,
const ASTPtr & sampling_expression_,
@ -187,7 +188,7 @@ StorageReplicatedMergeTree::StorageReplicatedMergeTree(
data(database_name, table_name,
full_path, columns_,
materialized_columns_, alias_columns_, column_defaults_,
context_, primary_expr_ast_, date_column_name, partition_expr_ast_,
context_, primary_expr_ast_, secondary_sorting_expr_list_, date_column_name, partition_expr_ast_,
sampling_expression_, merging_params_,
settings_, true, attach,
[this] (const std::string & name) { enqueuePartForCheck(name); }),

View File

@ -449,6 +449,7 @@ protected:
const ColumnDefaults & column_defaults_,
Context & context_,
const ASTPtr & primary_expr_ast_,
const ASTPtr & secondary_sorting_expr_list_,
const String & date_column_name,
const ASTPtr & partition_expr_ast_,
const ASTPtr & sampling_expression_,

View File

@ -185,18 +185,17 @@ StoragePtr TableFunctionRemote::execute(const ASTPtr & ast_function, const Conte
{
ASTs & args_func = typeid_cast<ASTFunction &>(*ast_function).children;
const char * err = "Table function 'remote' requires from 2 to 5 parameters: "
"addresses pattern, name of remote database, name of remote table, [username, [password]].";
if (args_func.size() != 1)
throw Exception(err, ErrorCodes::NUMBER_OF_ARGUMENTS_DOESNT_MATCH);
throw Exception(help_message, ErrorCodes::NUMBER_OF_ARGUMENTS_DOESNT_MATCH);
ASTs & args = typeid_cast<ASTExpressionList &>(*args_func.at(0)).children;
if (args.size() < 2 || args.size() > 5)
throw Exception(err, ErrorCodes::NUMBER_OF_ARGUMENTS_DOESNT_MATCH);
const size_t max_args = is_cluster_function ? 3 : 5;
if (args.size() < 2 || args.size() > max_args)
throw Exception(help_message, ErrorCodes::NUMBER_OF_ARGUMENTS_DOESNT_MATCH);
String description;
String cluster_name;
String cluster_description;
String remote_database;
String remote_table;
String username;
@ -216,7 +215,18 @@ StoragePtr TableFunctionRemote::execute(const ASTPtr & ast_function, const Conte
return safeGet<const String &>(lit->value);
};
description = getStringLiteral(*args[arg_num], "Hosts pattern");
if (is_cluster_function)
{
ASTPtr ast_name = evaluateConstantExpressionOrIdentifierAsLiteral(args[arg_num], context);
cluster_name = static_cast<const ASTLiteral &>(*ast_name).value.safeGet<const String &>();
}
else
{
if (auto ast_cluster = typeid_cast<const ASTIdentifier *>(args[arg_num].get()))
cluster_name = ast_cluster->name;
else
cluster_description = getStringLiteral(*args[arg_num], "Hosts pattern");
}
++arg_num;
args[arg_num] = evaluateConstantExpressionOrIdentifierAsLiteral(args[arg_num], context);
@ -233,29 +243,33 @@ StoragePtr TableFunctionRemote::execute(const ASTPtr & ast_function, const Conte
else
{
if (arg_num >= args.size())
throw Exception(err, ErrorCodes::NUMBER_OF_ARGUMENTS_DOESNT_MATCH);
throw Exception(help_message, ErrorCodes::NUMBER_OF_ARGUMENTS_DOESNT_MATCH);
args[arg_num] = evaluateConstantExpressionOrIdentifierAsLiteral(args[arg_num], context);
remote_table = static_cast<const ASTLiteral &>(*args[arg_num]).value.safeGet<String>();
++arg_num;
}
if (arg_num < args.size())
/// Username and password parameters are prohibited in cluster version of the function
if (!is_cluster_function)
{
username = getStringLiteral(*args[arg_num], "Username");
++arg_num;
}
else
username = "default";
if (arg_num < args.size())
{
username = getStringLiteral(*args[arg_num], "Username");
++arg_num;
}
else
username = "default";
if (arg_num < args.size())
{
password = getStringLiteral(*args[arg_num], "Password");
++arg_num;
if (arg_num < args.size())
{
password = getStringLiteral(*args[arg_num], "Password");
++arg_num;
}
}
if (arg_num < args.size())
throw Exception(err, ErrorCodes::NUMBER_OF_ARGUMENTS_DOESNT_MATCH);
throw Exception(help_message, ErrorCodes::NUMBER_OF_ARGUMENTS_DOESNT_MATCH);
/// ExpressionAnalyzer will be created in InterpreterSelectQuery that will meet these `Identifier` when processing the request.
/// We need to mark them as the name of the database or table, because the default value is column.
@ -263,18 +277,27 @@ StoragePtr TableFunctionRemote::execute(const ASTPtr & ast_function, const Conte
if (ASTIdentifier * id = typeid_cast<ASTIdentifier *>(arg.get()))
id->kind = ASTIdentifier::Table;
size_t max_addresses = context.getSettingsRef().table_function_remote_max_addresses;
ClusterPtr cluster;
if (!cluster_name.empty())
{
/// Use an existing cluster from the main config
cluster = context.getCluster(cluster_name);
}
else
{
/// Create new cluster from the scratch
size_t max_addresses = context.getSettingsRef().table_function_remote_max_addresses;
std::vector<String> shards = parseDescription(cluster_description, 0, cluster_description.size(), ',', max_addresses);
std::vector<std::vector<String>> names;
std::vector<String> shards = parseDescription(description, 0, description.size(), ',', max_addresses);
std::vector<std::vector<String>> names;
for (size_t i = 0; i < shards.size(); ++i)
names.push_back(parseDescription(shards[i], 0, shards[i].size(), '|', max_addresses));
for (size_t i = 0; i < shards.size(); ++i)
names.push_back(parseDescription(shards[i], 0, shards[i].size(), '|', max_addresses));
if (names.empty())
throw Exception("Shard list is empty after parsing first argument", ErrorCodes::BAD_ARGUMENTS);
if (names.empty())
throw Exception("Shard list is empty after parsing first argument", ErrorCodes::BAD_ARGUMENTS);
auto cluster = std::make_shared<Cluster>(context.getSettings(), names, username, password, context.getTCPPort());
cluster = std::make_shared<Cluster>(context.getSettings(), names, username, password, context.getTCPPort());
}
auto res = StorageDistributed::createWithOwnCluster(
getName(),
@ -288,9 +311,23 @@ StoragePtr TableFunctionRemote::execute(const ASTPtr & ast_function, const Conte
}
TableFunctionRemote::TableFunctionRemote(const std::string & name_)
: name(name_)
{
is_cluster_function = name == "cluster";
std::stringstream ss;
ss << "Table function '" << name + "' requires from 2 to " << (is_cluster_function ? 3 : 5) << " parameters"
<< ": <addresses pattern or cluster name>, <name of remote database>, <name of remote table>"
<< (is_cluster_function ? "" : ", [username, [password]].");
help_message = ss.str();
}
void registerTableFunctionRemote(TableFunctionFactory & factory)
{
factory.registerFunction<TableFunctionRemote>();
factory.registerFunction("remote", [] () -> TableFunctionPtr { return std::make_shared<TableFunctionRemote>("remote"); });
factory.registerFunction("cluster", [] () -> TableFunctionPtr { return std::make_shared<TableFunctionRemote>("cluster"); });
}
}

View File

@ -11,13 +11,23 @@ namespace DB
* For example
* SELECT count() FROM remote('example01-01-1', merge, hits) - go to `example01-01-1`, in the merge database, the hits table.
* An expression that generates a set of shards and replicas can also be specified as the host name - see below.
* Also, there is a cluster version of the function: cluster('existing_cluster_name', 'db', 'table')
*/
class TableFunctionRemote : public ITableFunction
{
public:
static constexpr auto name = "remote";
explicit TableFunctionRemote(const std::string & name_ = "remote");
std::string getName() const override { return name; }
StoragePtr execute(const ASTPtr & ast_function, const Context & context) const override;
private:
std::string name;
bool is_cluster_function;
std::string help_message;
};
}

View File

@ -0,0 +1,22 @@
<test>
<name>complex_array_creation</name>
<type>once</type>
<stop_conditions>
<any_of>
<average_speed_not_changing_for_ms>10000</average_speed_not_changing_for_ms>
<total_time_ms>1000</total_time_ms>
</any_of>
</stop_conditions>
<metrics>
<max_rows_per_second />
</metrics>
<main_metric>
<max_rows_per_second />
</main_metric>
<query>SELECT count() FROM system.numbers WHERE NOT ignore([[number], [number]])</query>
<query>SELECT count() FROM system.numbers WHERE NOT ignore([[], [number]])</query>
</test>

View File

@ -1,23 +1,27 @@
DROP TABLE IF EXISTS test.merge_tree;
DROP TABLE IF EXISTS test.collapsing_merge_tree;
DROP TABLE IF EXISTS test.versioned_collapsing_merge_tree;
DROP TABLE IF EXISTS test.summing_merge_tree;
DROP TABLE IF EXISTS test.summing_merge_tree_with_list_of_columns_to_sum;
DROP TABLE IF EXISTS test.aggregating_merge_tree;
DROP TABLE IF EXISTS test.merge_tree_with_sampling;
DROP TABLE IF EXISTS test.collapsing_merge_tree_with_sampling;
DROP TABLE IF EXISTS test.versioned_collapsing_merge_tree_with_sampling;
DROP TABLE IF EXISTS test.summing_merge_tree_with_sampling;
DROP TABLE IF EXISTS test.summing_merge_tree_with_sampling_with_list_of_columns_to_sum;
DROP TABLE IF EXISTS test.aggregating_merge_tree_with_sampling;
DROP TABLE IF EXISTS test.replicated_merge_tree;
DROP TABLE IF EXISTS test.replicated_collapsing_merge_tree;
DROP TABLE IF EXISTS test.replicated_versioned_collapsing_merge_tree;
DROP TABLE IF EXISTS test.replicated_summing_merge_tree;
DROP TABLE IF EXISTS test.replicated_summing_merge_tree_with_list_of_columns_to_sum;
DROP TABLE IF EXISTS test.replicated_aggregating_merge_tree;
DROP TABLE IF EXISTS test.replicated_merge_tree_with_sampling;
DROP TABLE IF EXISTS test.replicated_collapsing_merge_tree_with_sampling;
DROP TABLE IF EXISTS test.replicated_versioned_collapsing_merge_tree_with_sampling;
DROP TABLE IF EXISTS test.replicated_summing_merge_tree_with_sampling;
DROP TABLE IF EXISTS test.replicated_summing_merge_tree_with_sampling_with_list_of_columns_to_sum;
DROP TABLE IF EXISTS test.replicated_aggregating_merge_tree_with_sampling;
@ -27,6 +31,8 @@ CREATE TABLE test.merge_tree
(d Date, a String, b UInt8, x String, y Int8, z UInt32) ENGINE = MergeTree(d, (a, b), 111);
CREATE TABLE test.collapsing_merge_tree
(d Date, a String, b UInt8, x String, y Int8, z UInt32) ENGINE = CollapsingMergeTree(d, (a, b), 111, y);
CREATE TABLE test.versioned_collapsing_merge_tree
(d Date, a String, b UInt8, x String, y Int8, z UInt32) ENGINE = VersionedCollapsingMergeTree(d, (a, b), 111, y, b);
CREATE TABLE test.summing_merge_tree
(d Date, a String, b UInt8, x String, y Int8, z UInt32) ENGINE = SummingMergeTree(d, (a, b), 111);
CREATE TABLE test.summing_merge_tree_with_list_of_columns_to_sum
@ -38,6 +44,8 @@ CREATE TABLE test.merge_tree_with_sampling
(d Date, a String, b UInt8, x String, y Int8, z UInt32) ENGINE = MergeTree(d, sipHash64(a) + b, (a, sipHash64(a) + b), 111);
CREATE TABLE test.collapsing_merge_tree_with_sampling
(d Date, a String, b UInt8, x String, y Int8, z UInt32) ENGINE = CollapsingMergeTree(d, sipHash64(a) + b, (a, sipHash64(a) + b), 111, y);
CREATE TABLE test.versioned_collapsing_merge_tree_with_sampling
(d Date, a String, b UInt8, x String, y Int8, z UInt32) ENGINE = VersionedCollapsingMergeTree(d, sipHash64(a) + b, (a, sipHash64(a) + b, b), 111, y, b);
CREATE TABLE test.summing_merge_tree_with_sampling
(d Date, a String, b UInt8, x String, y Int8, z UInt32) ENGINE = SummingMergeTree(d, sipHash64(a) + b, (a, sipHash64(a) + b), 111);
CREATE TABLE test.summing_merge_tree_with_sampling_with_list_of_columns_to_sum
@ -49,6 +57,8 @@ CREATE TABLE test.replicated_merge_tree
(d Date, a String, b UInt8, x String, y Int8, z UInt32) ENGINE = ReplicatedMergeTree('/clickhouse/tables/test/01/replicated_merge_tree/', 'r1', d, (a, b), 111);
CREATE TABLE test.replicated_collapsing_merge_tree
(d Date, a String, b UInt8, x String, y Int8, z UInt32) ENGINE = ReplicatedCollapsingMergeTree('/clickhouse/tables/test/01/replicated_collapsing_merge_tree/', 'r1', d, (a, b), 111, y);
CREATE TABLE test.replicated_versioned_collapsing_merge_tree
(d Date, a String, b UInt8, x String, y Int8, z UInt32) ENGINE = ReplicatedVersionedCollapsingMergeTree('/clickhouse/tables/test/01/replicated_versioned_collapsing_merge_tree/', 'r1', d, (a, b), 111, y, b);
CREATE TABLE test.replicated_summing_merge_tree
(d Date, a String, b UInt8, x String, y Int8, z UInt32) ENGINE = ReplicatedSummingMergeTree('/clickhouse/tables/test/01/replicated_summing_merge_tree/', 'r1', d, (a, b), 111);
CREATE TABLE test.replicated_summing_merge_tree_with_list_of_columns_to_sum
@ -60,6 +70,8 @@ CREATE TABLE test.replicated_merge_tree_with_sampling
(d Date, a String, b UInt8, x String, y Int8, z UInt32) ENGINE = ReplicatedMergeTree('/clickhouse/tables/test/01/replicated_merge_tree_with_sampling/', 'r1', d, sipHash64(a) + b, (a, sipHash64(a) + b), 111);
CREATE TABLE test.replicated_collapsing_merge_tree_with_sampling
(d Date, a String, b UInt8, x String, y Int8, z UInt32) ENGINE = ReplicatedCollapsingMergeTree('/clickhouse/tables/test/01/replicated_collapsing_merge_tree_with_sampling/', 'r1', d, sipHash64(a) + b, (a, sipHash64(a) + b), 111, y);
CREATE TABLE test.replicated_versioned_collapsing_merge_tree_with_sampling
(d Date, a String, b UInt8, x String, y Int8, z UInt32) ENGINE = ReplicatedVersionedCollapsingMergeTree('/clickhouse/tables/test/01/replicated_versioned_collapsing_merge_tree_with_sampling/', 'r1', d, sipHash64(a) + b, (a, sipHash64(a) + b, b), 111, y, b);
CREATE TABLE test.replicated_summing_merge_tree_with_sampling
(d Date, a String, b UInt8, x String, y Int8, z UInt32) ENGINE = ReplicatedSummingMergeTree('/clickhouse/tables/test/01/replicated_summing_merge_tree_with_sampling/', 'r1', d, sipHash64(a) + b, (a, sipHash64(a) + b), 111);
CREATE TABLE test.replicated_summing_merge_tree_with_sampling_with_list_of_columns_to_sum
@ -70,24 +82,28 @@ CREATE TABLE test.replicated_aggregating_merge_tree_with_sampling
INSERT INTO test.merge_tree VALUES ('2000-01-01', 'Hello, world!', 123, 'xxx yyy', -123, 123456789);
INSERT INTO test.collapsing_merge_tree VALUES ('2000-01-01', 'Hello, world!', 123, 'xxx yyy', -123, 123456789);
INSERT INTO test.versioned_collapsing_merge_tree VALUES ('2000-01-01', 'Hello, world!', 123, 'xxx yyy', -123, 123456789);
INSERT INTO test.summing_merge_tree VALUES ('2000-01-01', 'Hello, world!', 123, 'xxx yyy', -123, 123456789);
INSERT INTO test.summing_merge_tree_with_list_of_columns_to_sum VALUES ('2000-01-01', 'Hello, world!', 123, 'xxx yyy', -123, 123456789);
INSERT INTO test.aggregating_merge_tree VALUES ('2000-01-01', 'Hello, world!', 123, 'xxx yyy', -123, 123456789);
INSERT INTO test.merge_tree_with_sampling VALUES ('2000-01-01', 'Hello, world!', 123, 'xxx yyy', -123, 123456789);
INSERT INTO test.collapsing_merge_tree_with_sampling VALUES ('2000-01-01', 'Hello, world!', 123, 'xxx yyy', -123, 123456789);
INSERT INTO test.versioned_collapsing_merge_tree_with_sampling VALUES ('2000-01-01', 'Hello, world!', 123, 'xxx yyy', -123, 123456789);
INSERT INTO test.summing_merge_tree_with_sampling VALUES ('2000-01-01', 'Hello, world!', 123, 'xxx yyy', -123, 123456789);
INSERT INTO test.summing_merge_tree_with_sampling_with_list_of_columns_to_sum VALUES ('2000-01-01', 'Hello, world!', 123, 'xxx yyy', -123, 123456789);
INSERT INTO test.aggregating_merge_tree_with_sampling VALUES ('2000-01-01', 'Hello, world!', 123, 'xxx yyy', -123, 123456789);
INSERT INTO test.replicated_merge_tree VALUES ('2000-01-01', 'Hello, world!', 123, 'xxx yyy', -123, 123456789);
INSERT INTO test.replicated_collapsing_merge_tree VALUES ('2000-01-01', 'Hello, world!', 123, 'xxx yyy', -123, 123456789);
INSERT INTO test.replicated_versioned_collapsing_merge_tree VALUES ('2000-01-01', 'Hello, world!', 123, 'xxx yyy', -123, 123456789);
INSERT INTO test.replicated_summing_merge_tree VALUES ('2000-01-01', 'Hello, world!', 123, 'xxx yyy', -123, 123456789);
INSERT INTO test.replicated_summing_merge_tree_with_list_of_columns_to_sum VALUES ('2000-01-01', 'Hello, world!', 123, 'xxx yyy', -123, 123456789);
INSERT INTO test.replicated_aggregating_merge_tree VALUES ('2000-01-01', 'Hello, world!', 123, 'xxx yyy', -123, 123456789);
INSERT INTO test.replicated_merge_tree_with_sampling VALUES ('2000-01-01', 'Hello, world!', 123, 'xxx yyy', -123, 123456789);
INSERT INTO test.replicated_collapsing_merge_tree_with_sampling VALUES ('2000-01-01', 'Hello, world!', 123, 'xxx yyy', -123, 123456789);
INSERT INTO test.replicated_versioned_collapsing_merge_tree_with_sampling VALUES ('2000-01-01', 'Hello, world!', 123, 'xxx yyy', -123, 123456789);
INSERT INTO test.replicated_summing_merge_tree_with_sampling VALUES ('2000-01-01', 'Hello, world!', 123, 'xxx yyy', -123, 123456789);
INSERT INTO test.replicated_summing_merge_tree_with_sampling_with_list_of_columns_to_sum VALUES ('2000-01-01', 'Hello, world!', 123, 'xxx yyy', -123, 123456789);
INSERT INTO test.replicated_aggregating_merge_tree_with_sampling VALUES ('2000-01-01', 'Hello, world!', 123, 'xxx yyy', -123, 123456789);
@ -95,24 +111,28 @@ INSERT INTO test.replicated_aggregating_merge_tree_with_sampling VALUES ('2000-0
DROP TABLE test.merge_tree;
DROP TABLE test.collapsing_merge_tree;
DROP TABLE test.versioned_collapsing_merge_tree;
DROP TABLE test.summing_merge_tree;
DROP TABLE test.summing_merge_tree_with_list_of_columns_to_sum;
DROP TABLE test.aggregating_merge_tree;
DROP TABLE test.merge_tree_with_sampling;
DROP TABLE test.collapsing_merge_tree_with_sampling;
DROP TABLE test.versioned_collapsing_merge_tree_with_sampling;
DROP TABLE test.summing_merge_tree_with_sampling;
DROP TABLE test.summing_merge_tree_with_sampling_with_list_of_columns_to_sum;
DROP TABLE test.aggregating_merge_tree_with_sampling;
DROP TABLE test.replicated_merge_tree;
DROP TABLE test.replicated_collapsing_merge_tree;
DROP TABLE test.replicated_versioned_collapsing_merge_tree;
DROP TABLE test.replicated_summing_merge_tree;
DROP TABLE test.replicated_summing_merge_tree_with_list_of_columns_to_sum;
DROP TABLE test.replicated_aggregating_merge_tree;
DROP TABLE test.replicated_merge_tree_with_sampling;
DROP TABLE test.replicated_collapsing_merge_tree_with_sampling;
DROP TABLE test.replicated_versioned_collapsing_merge_tree_with_sampling;
DROP TABLE test.replicated_summing_merge_tree_with_sampling;
DROP TABLE test.replicated_summing_merge_tree_with_sampling_with_list_of_columns_to_sum;
DROP TABLE test.replicated_aggregating_merge_tree_with_sampling;

View File

@ -4,6 +4,10 @@
2017-10-23 1 c
*** Replicated Collapsing ***
2017-10-23 2 1
*** Replicated VersionedCollapsing ***
2017-10-23 2 1 0
2017-10-23 1 -1 1
2017-10-23 2 1 2
*** Table definition with SETTINGS ***
0
0

View File

@ -45,6 +45,24 @@ SELECT * FROM test.replicated_collapsing;
DROP TABLE test.replicated_collapsing;
SELECT '*** Replicated VersionedCollapsing ***';
DROP TABLE IF EXISTS test.replicated_versioned_collapsing;
CREATE TABLE test.replicated_versioned_collapsing(d Date, x UInt32, sign Int8, version UInt8)
ENGINE = ReplicatedVersionedCollapsingMergeTree('/clickhouse/tables/test/replicated_versioned_collapsing', 'r1', sign, version)
PARTITION BY toYYYYMM(d) ORDER BY (d, version);
INSERT INTO test.replicated_versioned_collapsing VALUES ('2017-10-23', 1, 1, 0);
INSERT INTO test.replicated_versioned_collapsing VALUES ('2017-10-23', 1, -1, 0), ('2017-10-23', 2, 1, 0);
INSERT INTO test.replicated_versioned_collapsing VALUES ('2017-10-23', 1, -1, 1), ('2017-10-23', 2, 1, 2);
OPTIMIZE TABLE test.replicated_versioned_collapsing PARTITION 201710 FINAL;
SELECT * FROM test.replicated_versioned_collapsing;
DROP TABLE test.replicated_versioned_collapsing;
SELECT '*** Table definition with SETTINGS ***';
DROP TABLE IF EXISTS test.with_settings;

View File

@ -6,3 +6,6 @@
0
0
0
0
0
0

View File

@ -20,3 +20,7 @@ fi
$CLICKHOUSE_CLIENT -q "SELECT * FROM remote('${CLICKHOUSE_HOST}', system, one);"
$CLICKHOUSE_CLIENT -q "SELECT * FROM remote('${CLICKHOUSE_HOST}:${CLICKHOUSE_PORT_TCP}', system, one);"
$CLICKHOUSE_CLIENT -q "SELECT * FROM remote(test_shard_localhost, system, one);"
$CLICKHOUSE_CLIENT -q "SELECT * FROM remote(test_shard_localhost, system, one, 'default', '');"
$CLICKHOUSE_CLIENT -q "SELECT * FROM cluster('test_shard_localhost', system, one);"

View File

@ -0,0 +1,533 @@
table with 2 blocks final
table with 2 blocks optimized
2018-01-31 str_8 0 -1
2018-01-31 str_9 0 1
#########################
table with 2 blocks final
2018-01-31 str_0 0 -1
2018-01-31 str_0 0 -1
2018-01-31 str_1 0 1
2018-01-31 str_1 0 1
2018-01-31 str_2 0 -1
2018-01-31 str_2 0 -1
2018-01-31 str_3 0 1
2018-01-31 str_3 0 1
2018-01-31 str_4 0 -1
2018-01-31 str_4 0 -1
2018-01-31 str_5 0 1
2018-01-31 str_5 0 1
2018-01-31 str_6 0 -1
2018-01-31 str_6 0 -1
2018-01-31 str_7 0 1
2018-01-31 str_7 0 1
2018-01-31 str_8 0 -1
2018-01-31 str_8 0 -1
2018-01-31 str_9 0 1
2018-01-31 str_9 0 1
table with 2 blocks optimized
2018-01-31 str_0 0 -1
2018-01-31 str_0 0 -1
2018-01-31 str_1 0 1
2018-01-31 str_1 0 1
2018-01-31 str_2 0 -1
2018-01-31 str_2 0 -1
2018-01-31 str_3 0 1
2018-01-31 str_3 0 1
2018-01-31 str_4 0 -1
2018-01-31 str_4 0 -1
2018-01-31 str_5 0 1
2018-01-31 str_5 0 1
2018-01-31 str_6 0 -1
2018-01-31 str_6 0 -1
2018-01-31 str_7 0 1
2018-01-31 str_7 0 1
2018-01-31 str_8 0 -1
2018-01-31 str_8 0 -1
2018-01-31 str_9 0 1
2018-01-31 str_9 0 1
#########################
table with 2 blocks final
table with 2 blocks optimized
2018-01-31 str_9 0 1
2018-01-31 str_9 0 -1
#########################
table with 2 blocks final
2018-01-31 str_0 0 -1
2018-01-31 str_0 1 1
2018-01-31 str_1 0 1
2018-01-31 str_1 1 -1
2018-01-31 str_2 0 -1
2018-01-31 str_2 1 1
2018-01-31 str_3 0 1
2018-01-31 str_3 1 -1
2018-01-31 str_4 0 -1
2018-01-31 str_4 1 1
2018-01-31 str_5 0 1
2018-01-31 str_5 1 -1
2018-01-31 str_6 0 -1
2018-01-31 str_6 1 1
2018-01-31 str_7 0 1
2018-01-31 str_7 1 -1
2018-01-31 str_8 0 -1
2018-01-31 str_8 1 1
2018-01-31 str_9 0 1
2018-01-31 str_9 1 -1
table with 2 blocks optimized
2018-01-31 str_0 0 -1
2018-01-31 str_0 1 1
2018-01-31 str_1 0 1
2018-01-31 str_1 1 -1
2018-01-31 str_2 0 -1
2018-01-31 str_2 1 1
2018-01-31 str_3 0 1
2018-01-31 str_3 1 -1
2018-01-31 str_4 0 -1
2018-01-31 str_4 1 1
2018-01-31 str_5 0 1
2018-01-31 str_5 1 -1
2018-01-31 str_6 0 -1
2018-01-31 str_6 1 1
2018-01-31 str_7 0 1
2018-01-31 str_7 1 -1
2018-01-31 str_8 0 -1
2018-01-31 str_8 1 1
2018-01-31 str_9 0 1
2018-01-31 str_9 1 -1
#########################
table with 4 blocks final
table with 4 blocks optimized
2018-01-31 str_9 0 1
2018-01-31 str_9 0 -1
#########################
table with 5 blocks final
2018-01-31 str_0 1 -1
2018-01-31 str_1 1 -1
2018-01-31 str_2 1 -1
2018-01-31 str_3 1 -1
2018-01-31 str_4 1 -1
2018-01-31 str_5 1 -1
2018-01-31 str_6 1 -1
2018-01-31 str_7 1 -1
2018-01-31 str_8 1 -1
2018-01-31 str_9 1 -1
table with 5 blocks optimized
2018-01-31 str_0 1 -1
2018-01-31 str_1 1 -1
2018-01-31 str_2 1 -1
2018-01-31 str_3 1 -1
2018-01-31 str_4 1 -1
2018-01-31 str_5 1 -1
2018-01-31 str_6 1 -1
2018-01-31 str_7 1 -1
2018-01-31 str_8 1 -1
2018-01-31 str_9 1 -1
#########################
table with 2 blocks final
table with 2 blocks optimized
2018-01-31 str_999999 0 1
2018-01-31 str_999999 0 -1
#########################
table with 2 blocks final
2018-01-31 0 0 1
2018-01-31 1 0 1
2018-01-31 2 0 1
2018-01-31 3 0 1
2018-01-31 4 0 1
2018-01-31 5 0 1
2018-01-31 6 0 1
2018-01-31 7 0 1
2018-01-31 8 0 1
2018-01-31 9 0 1
2018-01-31 10 0 1
2018-01-31 11 0 1
2018-01-31 12 0 1
2018-01-31 13 0 1
2018-01-31 14 0 1
2018-01-31 15 0 1
2018-01-31 16 0 1
2018-01-31 17 0 1
2018-01-31 18 0 1
2018-01-31 19 0 1
2018-01-31 20 0 1
2018-01-31 21 0 1
2018-01-31 22 0 1
2018-01-31 23 0 1
2018-01-31 24 0 1
2018-01-31 25 0 1
2018-01-31 26 0 1
2018-01-31 27 0 1
2018-01-31 28 0 1
2018-01-31 29 0 1
2018-01-31 30 0 1
2018-01-31 31 0 1
2018-01-31 32 0 1
2018-01-31 95 0 -1
2018-01-31 96 0 -1
2018-01-31 97 0 -1
2018-01-31 98 0 -1
2018-01-31 99 0 -1
2018-01-31 100 0 -1
2018-01-31 101 0 -1
2018-01-31 102 0 -1
2018-01-31 103 0 -1
2018-01-31 104 0 -1
2018-01-31 105 0 -1
2018-01-31 106 0 -1
2018-01-31 107 0 -1
2018-01-31 108 0 -1
2018-01-31 109 0 -1
2018-01-31 110 0 -1
2018-01-31 111 0 -1
2018-01-31 112 0 -1
2018-01-31 113 0 -1
2018-01-31 114 0 -1
2018-01-31 115 0 -1
2018-01-31 116 0 -1
2018-01-31 117 0 -1
2018-01-31 118 0 -1
2018-01-31 119 0 -1
2018-01-31 120 0 -1
2018-01-31 121 0 -1
2018-01-31 122 0 -1
2018-01-31 123 0 -1
2018-01-31 124 0 -1
2018-01-31 125 0 -1
2018-01-31 126 0 -1
2018-01-31 127 0 -1
2018-01-31 0 0 -1
2018-01-31 1 0 -1
2018-01-31 2 0 -1
2018-01-31 3 0 -1
2018-01-31 4 0 -1
2018-01-31 5 0 -1
2018-01-31 6 0 -1
2018-01-31 7 0 -1
2018-01-31 8 0 -1
2018-01-31 9 0 -1
2018-01-31 10 0 -1
2018-01-31 11 0 -1
2018-01-31 12 0 -1
2018-01-31 13 0 -1
2018-01-31 14 0 -1
2018-01-31 15 0 -1
2018-01-31 16 0 -1
2018-01-31 17 0 -1
2018-01-31 18 0 -1
2018-01-31 19 0 -1
2018-01-31 20 0 -1
2018-01-31 21 0 -1
2018-01-31 22 0 -1
2018-01-31 23 0 -1
2018-01-31 24 0 -1
2018-01-31 25 0 -1
2018-01-31 26 0 -1
2018-01-31 27 0 -1
2018-01-31 28 0 -1
2018-01-31 29 0 -1
2018-01-31 30 0 -1
2018-01-31 31 0 -1
2018-01-31 32 0 -1
2018-01-31 95 0 1
2018-01-31 96 0 1
2018-01-31 97 0 1
2018-01-31 98 0 1
2018-01-31 99 0 1
2018-01-31 100 0 1
2018-01-31 101 0 1
2018-01-31 102 0 1
2018-01-31 103 0 1
2018-01-31 104 0 1
2018-01-31 105 0 1
2018-01-31 106 0 1
2018-01-31 107 0 1
2018-01-31 108 0 1
2018-01-31 109 0 1
2018-01-31 110 0 1
2018-01-31 111 0 1
2018-01-31 112 0 1
2018-01-31 113 0 1
2018-01-31 114 0 1
2018-01-31 115 0 1
2018-01-31 116 0 1
2018-01-31 117 0 1
2018-01-31 118 0 1
2018-01-31 119 0 1
2018-01-31 120 0 1
2018-01-31 121 0 1
2018-01-31 122 0 1
2018-01-31 123 0 1
2018-01-31 124 0 1
2018-01-31 125 0 1
2018-01-31 126 0 1
2018-01-31 127 0 1
table with 2 blocks optimized
2018-01-31 0 0 -1
2018-01-31 127 0 1
#########################
Vertival merge
#########################
table with 2 blocks final
table with 2 blocks optimized
2018-01-31 str_8 0 -1
2018-01-31 str_9 0 1
#########################
table with 2 blocks final
2018-01-31 str_0 0 -1
2018-01-31 str_0 0 -1
2018-01-31 str_1 0 1
2018-01-31 str_1 0 1
2018-01-31 str_2 0 -1
2018-01-31 str_2 0 -1
2018-01-31 str_3 0 1
2018-01-31 str_3 0 1
2018-01-31 str_4 0 -1
2018-01-31 str_4 0 -1
2018-01-31 str_5 0 1
2018-01-31 str_5 0 1
2018-01-31 str_6 0 -1
2018-01-31 str_6 0 -1
2018-01-31 str_7 0 1
2018-01-31 str_7 0 1
2018-01-31 str_8 0 -1
2018-01-31 str_8 0 -1
2018-01-31 str_9 0 1
2018-01-31 str_9 0 1
table with 2 blocks optimized
2018-01-31 str_0 0 -1
2018-01-31 str_0 0 -1
2018-01-31 str_1 0 1
2018-01-31 str_1 0 1
2018-01-31 str_2 0 -1
2018-01-31 str_2 0 -1
2018-01-31 str_3 0 1
2018-01-31 str_3 0 1
2018-01-31 str_4 0 -1
2018-01-31 str_4 0 -1
2018-01-31 str_5 0 1
2018-01-31 str_5 0 1
2018-01-31 str_6 0 -1
2018-01-31 str_6 0 -1
2018-01-31 str_7 0 1
2018-01-31 str_7 0 1
2018-01-31 str_8 0 -1
2018-01-31 str_8 0 -1
2018-01-31 str_9 0 1
2018-01-31 str_9 0 1
#########################
table with 2 blocks final
table with 2 blocks optimized
2018-01-31 str_9 0 1
2018-01-31 str_9 0 -1
#########################
table with 2 blocks final
2018-01-31 str_0 0 -1
2018-01-31 str_0 1 1
2018-01-31 str_1 0 1
2018-01-31 str_1 1 -1
2018-01-31 str_2 0 -1
2018-01-31 str_2 1 1
2018-01-31 str_3 0 1
2018-01-31 str_3 1 -1
2018-01-31 str_4 0 -1
2018-01-31 str_4 1 1
2018-01-31 str_5 0 1
2018-01-31 str_5 1 -1
2018-01-31 str_6 0 -1
2018-01-31 str_6 1 1
2018-01-31 str_7 0 1
2018-01-31 str_7 1 -1
2018-01-31 str_8 0 -1
2018-01-31 str_8 1 1
2018-01-31 str_9 0 1
2018-01-31 str_9 1 -1
table with 2 blocks optimized
2018-01-31 str_0 0 -1
2018-01-31 str_0 1 1
2018-01-31 str_1 0 1
2018-01-31 str_1 1 -1
2018-01-31 str_2 0 -1
2018-01-31 str_2 1 1
2018-01-31 str_3 0 1
2018-01-31 str_3 1 -1
2018-01-31 str_4 0 -1
2018-01-31 str_4 1 1
2018-01-31 str_5 0 1
2018-01-31 str_5 1 -1
2018-01-31 str_6 0 -1
2018-01-31 str_6 1 1
2018-01-31 str_7 0 1
2018-01-31 str_7 1 -1
2018-01-31 str_8 0 -1
2018-01-31 str_8 1 1
2018-01-31 str_9 0 1
2018-01-31 str_9 1 -1
#########################
table with 4 blocks final
table with 4 blocks optimized
2018-01-31 str_9 0 1
2018-01-31 str_9 0 -1
#########################
table with 5 blocks final
2018-01-31 str_0 1 -1
2018-01-31 str_1 1 -1
2018-01-31 str_2 1 -1
2018-01-31 str_3 1 -1
2018-01-31 str_4 1 -1
2018-01-31 str_5 1 -1
2018-01-31 str_6 1 -1
2018-01-31 str_7 1 -1
2018-01-31 str_8 1 -1
2018-01-31 str_9 1 -1
table with 5 blocks optimized
2018-01-31 str_0 1 -1
2018-01-31 str_1 1 -1
2018-01-31 str_2 1 -1
2018-01-31 str_3 1 -1
2018-01-31 str_4 1 -1
2018-01-31 str_5 1 -1
2018-01-31 str_6 1 -1
2018-01-31 str_7 1 -1
2018-01-31 str_8 1 -1
2018-01-31 str_9 1 -1
#########################
table with 2 blocks final
table with 2 blocks optimized
2018-01-31 str_999999 0 1
2018-01-31 str_999999 0 -1
#########################
table with 2 blocks final
2018-01-31 0 0 1
2018-01-31 1 0 1
2018-01-31 2 0 1
2018-01-31 3 0 1
2018-01-31 4 0 1
2018-01-31 5 0 1
2018-01-31 6 0 1
2018-01-31 7 0 1
2018-01-31 8 0 1
2018-01-31 9 0 1
2018-01-31 10 0 1
2018-01-31 11 0 1
2018-01-31 12 0 1
2018-01-31 13 0 1
2018-01-31 14 0 1
2018-01-31 15 0 1
2018-01-31 16 0 1
2018-01-31 17 0 1
2018-01-31 18 0 1
2018-01-31 19 0 1
2018-01-31 20 0 1
2018-01-31 21 0 1
2018-01-31 22 0 1
2018-01-31 23 0 1
2018-01-31 24 0 1
2018-01-31 25 0 1
2018-01-31 26 0 1
2018-01-31 27 0 1
2018-01-31 28 0 1
2018-01-31 29 0 1
2018-01-31 30 0 1
2018-01-31 31 0 1
2018-01-31 32 0 1
2018-01-31 95 0 -1
2018-01-31 96 0 -1
2018-01-31 97 0 -1
2018-01-31 98 0 -1
2018-01-31 99 0 -1
2018-01-31 100 0 -1
2018-01-31 101 0 -1
2018-01-31 102 0 -1
2018-01-31 103 0 -1
2018-01-31 104 0 -1
2018-01-31 105 0 -1
2018-01-31 106 0 -1
2018-01-31 107 0 -1
2018-01-31 108 0 -1
2018-01-31 109 0 -1
2018-01-31 110 0 -1
2018-01-31 111 0 -1
2018-01-31 112 0 -1
2018-01-31 113 0 -1
2018-01-31 114 0 -1
2018-01-31 115 0 -1
2018-01-31 116 0 -1
2018-01-31 117 0 -1
2018-01-31 118 0 -1
2018-01-31 119 0 -1
2018-01-31 120 0 -1
2018-01-31 121 0 -1
2018-01-31 122 0 -1
2018-01-31 123 0 -1
2018-01-31 124 0 -1
2018-01-31 125 0 -1
2018-01-31 126 0 -1
2018-01-31 127 0 -1
2018-01-31 0 0 -1
2018-01-31 1 0 -1
2018-01-31 2 0 -1
2018-01-31 3 0 -1
2018-01-31 4 0 -1
2018-01-31 5 0 -1
2018-01-31 6 0 -1
2018-01-31 7 0 -1
2018-01-31 8 0 -1
2018-01-31 9 0 -1
2018-01-31 10 0 -1
2018-01-31 11 0 -1
2018-01-31 12 0 -1
2018-01-31 13 0 -1
2018-01-31 14 0 -1
2018-01-31 15 0 -1
2018-01-31 16 0 -1
2018-01-31 17 0 -1
2018-01-31 18 0 -1
2018-01-31 19 0 -1
2018-01-31 20 0 -1
2018-01-31 21 0 -1
2018-01-31 22 0 -1
2018-01-31 23 0 -1
2018-01-31 24 0 -1
2018-01-31 25 0 -1
2018-01-31 26 0 -1
2018-01-31 27 0 -1
2018-01-31 28 0 -1
2018-01-31 29 0 -1
2018-01-31 30 0 -1
2018-01-31 31 0 -1
2018-01-31 32 0 -1
2018-01-31 95 0 1
2018-01-31 96 0 1
2018-01-31 97 0 1
2018-01-31 98 0 1
2018-01-31 99 0 1
2018-01-31 100 0 1
2018-01-31 101 0 1
2018-01-31 102 0 1
2018-01-31 103 0 1
2018-01-31 104 0 1
2018-01-31 105 0 1
2018-01-31 106 0 1
2018-01-31 107 0 1
2018-01-31 108 0 1
2018-01-31 109 0 1
2018-01-31 110 0 1
2018-01-31 111 0 1
2018-01-31 112 0 1
2018-01-31 113 0 1
2018-01-31 114 0 1
2018-01-31 115 0 1
2018-01-31 116 0 1
2018-01-31 117 0 1
2018-01-31 118 0 1
2018-01-31 119 0 1
2018-01-31 120 0 1
2018-01-31 121 0 1
2018-01-31 122 0 1
2018-01-31 123 0 1
2018-01-31 124 0 1
2018-01-31 125 0 1
2018-01-31 126 0 1
2018-01-31 127 0 1
table with 2 blocks optimized
2018-01-31 0 0 -1
2018-01-31 127 0 1

View File

@ -0,0 +1,203 @@
drop table if exists test.mult_tab;
create table test.mult_tab (date Date, value String, version UInt64, sign Int8) engine = VersionedCollapsingMergeTree(date, (date), 8192, sign, version);
insert into test.mult_tab select '2018-01-31', 'str_' || toString(number), 0, if(number % 2, 1, -1) from system.numbers limit 10;
insert into test.mult_tab select '2018-01-31', 'str_' || toString(number), 0, if(number % 2, 1, -1) from system.numbers limit 10;
select 'table with 2 blocks final';
select * from test.mult_tab final;
optimize table test.mult_tab;
select 'table with 2 blocks optimized';
select * from test.mult_tab;
select '#########################';
drop table if exists test.mult_tab;
create table test.mult_tab (date Date, value String, version UInt64, sign Int8) engine = VersionedCollapsingMergeTree(date, (date, value), 8192, sign, version);
insert into test.mult_tab select '2018-01-31', 'str_' || toString(number), 0, if(number % 2, 1, -1) from system.numbers limit 10;
insert into test.mult_tab select '2018-01-31', 'str_' || toString(number), 0, if(number % 2, 1, -1) from system.numbers limit 10;
select 'table with 2 blocks final';
select * from test.mult_tab final;
optimize table test.mult_tab;
select 'table with 2 blocks optimized';
select * from test.mult_tab;
select '#########################';
drop table if exists test.mult_tab;
create table test.mult_tab (date Date, value String, version UInt64, sign Int8) engine = VersionedCollapsingMergeTree(date, (date, value), 8192, sign, version);
insert into test.mult_tab select '2018-01-31', 'str_' || toString(number), 0, if(number % 2, 1, -1) from system.numbers limit 10;
insert into test.mult_tab select '2018-01-31', 'str_' || toString(number), 0, if(number % 2, -1, 1) from system.numbers limit 10;
select 'table with 2 blocks final';
select * from test.mult_tab final;
optimize table test.mult_tab;
select 'table with 2 blocks optimized';
select * from test.mult_tab;
select '#########################';
drop table if exists test.mult_tab;
create table test.mult_tab (date Date, value String, version UInt64, sign Int8) engine = VersionedCollapsingMergeTree(date, (date, value), 8192, sign, version);
insert into test.mult_tab select '2018-01-31', 'str_' || toString(number), 0, if(number % 2, 1, -1) from system.numbers limit 10;
insert into test.mult_tab select '2018-01-31', 'str_' || toString(number), 1, if(number % 2, -1, 1) from system.numbers limit 10;
select 'table with 2 blocks final';
select * from test.mult_tab final;
optimize table test.mult_tab;
select 'table with 2 blocks optimized';
select * from test.mult_tab;
select '#########################';
drop table if exists test.mult_tab;
create table test.mult_tab (date Date, value String, version UInt64, sign Int8) engine = VersionedCollapsingMergeTree(date, (date, value), 8192, sign, version);
insert into test.mult_tab select '2018-01-31', 'str_' || toString(number), 0, if(number % 2, 1, -1) from system.numbers limit 10;
insert into test.mult_tab select '2018-01-31', 'str_' || toString(number), 0, if(number % 2, -1, 1) from system.numbers limit 10;
insert into test.mult_tab select '2018-01-31', 'str_' || toString(number), 0, if(number % 2, 1, -1) from system.numbers limit 10;
insert into test.mult_tab select '2018-01-31', 'str_' || toString(number), 0, if(number % 2, -1, 1) from system.numbers limit 10;
select 'table with 4 blocks final';
select * from test.mult_tab final;
optimize table test.mult_tab;
select 'table with 4 blocks optimized';
select * from test.mult_tab;
select '#########################';
drop table if exists test.mult_tab;
create table test.mult_tab (date Date, value String, version UInt64, sign Int8) engine = VersionedCollapsingMergeTree(date, (date, value), 8192, sign, version);
insert into test.mult_tab select '2018-01-31', 'str_' || toString(number), 0, if(number % 2, 1, -1) from system.numbers limit 10;
insert into test.mult_tab select '2018-01-31', 'str_' || toString(number), 0, if(number % 2, -1, 1) from system.numbers limit 10;
insert into test.mult_tab select '2018-01-31', 'str_' || toString(number), 1, if(number % 3 = 0, 1, -1) from system.numbers limit 10;
insert into test.mult_tab select '2018-01-31', 'str_' || toString(number), 1, if(number % 3 = 1, 1, -1) from system.numbers limit 10;
insert into test.mult_tab select '2018-01-31', 'str_' || toString(number), 1, if(number % 3 = 2, 1, -1) from system.numbers limit 10;
select 'table with 5 blocks final';
select * from test.mult_tab final;
optimize table test.mult_tab;
select 'table with 5 blocks optimized';
select * from test.mult_tab;
select '#########################';
drop table if exists test.mult_tab;
create table test.mult_tab (date Date, value String, version UInt64, sign Int8) engine = VersionedCollapsingMergeTree(date, (date, value), 8192, sign, version);
insert into test.mult_tab select '2018-01-31', 'str_' || toString(number), 0, if(number % 2, 1, -1) from system.numbers limit 1000000;
insert into test.mult_tab select '2018-01-31', 'str_' || toString(number), 0, if(number % 2, -1, 1) from system.numbers limit 1000000;
select 'table with 2 blocks final';
select * from test.mult_tab final;
optimize table test.mult_tab;
select 'table with 2 blocks optimized';
select * from test.mult_tab;
select '#########################';
drop table if exists test.mult_tab;
create table test.mult_tab (date Date, value UInt64, version UInt64, sign Int8) engine = VersionedCollapsingMergeTree(date, (date), 8192, sign, version);
insert into test.mult_tab select '2018-01-31', number, 0, if(number < 64, 1, -1) from system.numbers limit 128;
insert into test.mult_tab select '2018-01-31', number, 0, if(number < 64, -1, 1) from system.numbers limit 128;
select 'table with 2 blocks final';
select * from test.mult_tab final settings max_block_size=33;
optimize table test.mult_tab;
select 'table with 2 blocks optimized';
select * from test.mult_tab;
select '#########################';
select 'Vertival merge';
select '#########################';
drop table if exists test.mult_tab;
create table test.mult_tab (date Date, value String, version UInt64, sign Int8) engine = VersionedCollapsingMergeTree(sign, version) order by (date) settings enable_vertical_merge_algorithm = 1, vertical_merge_algorithm_min_rows_to_activate = 1, vertical_merge_algorithm_min_columns_to_activate = 0;
insert into test.mult_tab select '2018-01-31', 'str_' || toString(number), 0, if(number % 2, 1, -1) from system.numbers limit 10;
insert into test.mult_tab select '2018-01-31', 'str_' || toString(number), 0, if(number % 2, 1, -1) from system.numbers limit 10;
select 'table with 2 blocks final';
select * from test.mult_tab final;
optimize table test.mult_tab;
select 'table with 2 blocks optimized';
select * from test.mult_tab;
select '#########################';
drop table if exists test.mult_tab;
create table test.mult_tab (date Date, value String, version UInt64, sign Int8) engine = VersionedCollapsingMergeTree(sign, version) order by (date, value) settings enable_vertical_merge_algorithm = 1, vertical_merge_algorithm_min_rows_to_activate = 1, vertical_merge_algorithm_min_columns_to_activate = 0;
insert into test.mult_tab select '2018-01-31', 'str_' || toString(number), 0, if(number % 2, 1, -1) from system.numbers limit 10;
insert into test.mult_tab select '2018-01-31', 'str_' || toString(number), 0, if(number % 2, 1, -1) from system.numbers limit 10;
select 'table with 2 blocks final';
select * from test.mult_tab final;
optimize table test.mult_tab;
select 'table with 2 blocks optimized';
select * from test.mult_tab;
select '#########################';
drop table if exists test.mult_tab;
create table test.mult_tab (date Date, value String, version UInt64, sign Int8) engine = VersionedCollapsingMergeTree(sign, version) order by (date, value) settings enable_vertical_merge_algorithm = 1, vertical_merge_algorithm_min_rows_to_activate = 1, vertical_merge_algorithm_min_columns_to_activate = 0;
insert into test.mult_tab select '2018-01-31', 'str_' || toString(number), 0, if(number % 2, 1, -1) from system.numbers limit 10;
insert into test.mult_tab select '2018-01-31', 'str_' || toString(number), 0, if(number % 2, -1, 1) from system.numbers limit 10;
select 'table with 2 blocks final';
select * from test.mult_tab final;
optimize table test.mult_tab;
select 'table with 2 blocks optimized';
select * from test.mult_tab;
select '#########################';
drop table if exists test.mult_tab;
create table test.mult_tab (date Date, value String, version UInt64, sign Int8) engine = VersionedCollapsingMergeTree(sign, version) order by (date, value) settings enable_vertical_merge_algorithm = 1, vertical_merge_algorithm_min_rows_to_activate = 1, vertical_merge_algorithm_min_columns_to_activate = 0;
insert into test.mult_tab select '2018-01-31', 'str_' || toString(number), 0, if(number % 2, 1, -1) from system.numbers limit 10;
insert into test.mult_tab select '2018-01-31', 'str_' || toString(number), 1, if(number % 2, -1, 1) from system.numbers limit 10;
select 'table with 2 blocks final';
select * from test.mult_tab final;
optimize table test.mult_tab;
select 'table with 2 blocks optimized';
select * from test.mult_tab;
select '#########################';
drop table if exists test.mult_tab;
create table test.mult_tab (date Date, value String, version UInt64, sign Int8) engine = VersionedCollapsingMergeTree(sign, version) order by (date, value) settings enable_vertical_merge_algorithm = 1, vertical_merge_algorithm_min_rows_to_activate = 1, vertical_merge_algorithm_min_columns_to_activate = 0;
insert into test.mult_tab select '2018-01-31', 'str_' || toString(number), 0, if(number % 2, 1, -1) from system.numbers limit 10;
insert into test.mult_tab select '2018-01-31', 'str_' || toString(number), 0, if(number % 2, -1, 1) from system.numbers limit 10;
insert into test.mult_tab select '2018-01-31', 'str_' || toString(number), 0, if(number % 2, 1, -1) from system.numbers limit 10;
insert into test.mult_tab select '2018-01-31', 'str_' || toString(number), 0, if(number % 2, -1, 1) from system.numbers limit 10;
select 'table with 4 blocks final';
select * from test.mult_tab final;
optimize table test.mult_tab;
select 'table with 4 blocks optimized';
select * from test.mult_tab;
select '#########################';
drop table if exists test.mult_tab;
create table test.mult_tab (date Date, value String, version UInt64, sign Int8) engine = VersionedCollapsingMergeTree(sign, version) order by (date, value) settings enable_vertical_merge_algorithm = 1, vertical_merge_algorithm_min_rows_to_activate = 1, vertical_merge_algorithm_min_columns_to_activate = 0;
insert into test.mult_tab select '2018-01-31', 'str_' || toString(number), 0, if(number % 2, 1, -1) from system.numbers limit 10;
insert into test.mult_tab select '2018-01-31', 'str_' || toString(number), 0, if(number % 2, -1, 1) from system.numbers limit 10;
insert into test.mult_tab select '2018-01-31', 'str_' || toString(number), 1, if(number % 3 = 0, 1, -1) from system.numbers limit 10;
insert into test.mult_tab select '2018-01-31', 'str_' || toString(number), 1, if(number % 3 = 1, 1, -1) from system.numbers limit 10;
insert into test.mult_tab select '2018-01-31', 'str_' || toString(number), 1, if(number % 3 = 2, 1, -1) from system.numbers limit 10;
select 'table with 5 blocks final';
select * from test.mult_tab final;
optimize table test.mult_tab;
select 'table with 5 blocks optimized';
select * from test.mult_tab;
select '#########################';
drop table if exists test.mult_tab;
create table test.mult_tab (date Date, value String, version UInt64, sign Int8) engine = VersionedCollapsingMergeTree(sign, version) order by (date, value) settings enable_vertical_merge_algorithm = 1, vertical_merge_algorithm_min_rows_to_activate = 1, vertical_merge_algorithm_min_columns_to_activate = 0;
insert into test.mult_tab select '2018-01-31', 'str_' || toString(number), 0, if(number % 2, 1, -1) from system.numbers limit 1000000;
insert into test.mult_tab select '2018-01-31', 'str_' || toString(number), 0, if(number % 2, -1, 1) from system.numbers limit 1000000;
select 'table with 2 blocks final';
select * from test.mult_tab final;
optimize table test.mult_tab;
select 'table with 2 blocks optimized';
select * from test.mult_tab;
select '#########################';
drop table if exists test.mult_tab;
create table test.mult_tab (date Date, value UInt64, version UInt64, sign Int8) engine = VersionedCollapsingMergeTree(sign, version) order by (date) settings enable_vertical_merge_algorithm = 1, vertical_merge_algorithm_min_rows_to_activate = 1, vertical_merge_algorithm_min_columns_to_activate = 0;
insert into test.mult_tab select '2018-01-31', number, 0, if(number < 64, 1, -1) from system.numbers limit 128;
insert into test.mult_tab select '2018-01-31', number, 0, if(number < 64, -1, 1) from system.numbers limit 128;
select 'table with 2 blocks final';
select * from test.mult_tab final settings max_block_size=33;
optimize table test.mult_tab;
select 'table with 2 blocks optimized';
select * from test.mult_tab;

View File

@ -0,0 +1,6 @@
#!/usr/bin/env bash
if $CLICKHOUSE_CLIENT -q "select length(groupArray(number)) FROM (SELECT * FROM system.numbers LIMIT 1000000)" --compile=1 --min_count_to_compile=0 --max_threads=1 --max_memory_usage=8000000 2>/dev/null ; then
echo 'There is no expected exception "Memory limit (for query) exceeded: would use..."'
fi
$CLICKHOUSE_CLIENT -q "SELECT 1"

View File

@ -0,0 +1,3 @@
Array(Nothing), Const(size = 1, Array(size = 1, UInt64(size = 1), Nothing(size = 0)))
Array(Array(Array(Nothing))), Const(size = 1, Array(size = 1, UInt64(size = 1), Array(size = 1, UInt64(size = 1), Array(size = 1, UInt64(size = 1), Nothing(size = 0)))))
Array(Array(UInt8)), Const(size = 1, Array(size = 1, UInt64(size = 1), Array(size = 2, UInt64(size = 2), UInt8(size = 1))))

View File

@ -0,0 +1,3 @@
SELECT dumpColumnStructure([]);
SELECT dumpColumnStructure([[[]]]);
SELECT DISTINCT dumpColumnStructure([[], [1]]) FROM numbers(2);

View File

@ -8,6 +8,9 @@
<size>10M</size>
<count>1</count>
</logger>
<listen_host>0.0.0.0</listen_host>
<listen_host>::</listen_host>
<listen_try>1</listen_try>
<http_port>58123</http_port>
<tcp_port>59000</tcp_port>
<interserver_http_port>59009</interserver_http_port>

View File

@ -8,4 +8,4 @@ image:
docker build -t yandex/clickhouse-builder .
image_push:
docker push yandex/clickhouse-builder
docker push yandex/clickhouse-builder

View File

@ -1,4 +1,6 @@
<yandex>
<!-- Listen wildcard address to allow accepting connections from other containers and host network. -->
<listen_host>0.0.0.0</listen_host>
<listen_host>::</listen_host>
<listen_try>1</listen_try>
</yandex>