Merge branch 'master' into alias_column_partition_prune

This commit is contained in:
sundy-li 2020-12-16 18:44:00 +08:00
commit 5c9f84b2f4
68 changed files with 501 additions and 237 deletions

2
contrib/libunwind vendored

@ -1 +1 @@
Subproject commit 51b84d9b6d2548f1cbdcafe622d5a753853b6149
Subproject commit 8fe25d7dc70f2a4ea38c3e5a33fa9d4199b67a5a

View File

@ -58,8 +58,7 @@
"docker/test/stateless": {
"name": "yandex/clickhouse-stateless-test",
"dependent": [
"docker/test/stateful",
"docker/test/stateful_with_coverage"
"docker/test/stateful"
]
},
"docker/test/stateless_pytest": {
@ -68,7 +67,9 @@
},
"docker/test/stateless_with_coverage": {
"name": "yandex/clickhouse-stateless-test-with-coverage",
"dependent": []
"dependent": [
"docker/test/stateful_with_coverage"
]
},
"docker/test/unit": {
"name": "yandex/clickhouse-unit-test",

View File

@ -1,5 +1,5 @@
# docker build -t yandex/clickhouse-test-base .
FROM ubuntu:19.10
FROM ubuntu:20.04
ENV DEBIAN_FRONTEND=noninteractive LLVM_VERSION=11

View File

@ -1,8 +1,6 @@
# docker build -t yandex/clickhouse-stateful-test-with-coverage .
FROM yandex/clickhouse-stateless-test-with-coverage
RUN echo "deb [trusted=yes] http://apt.llvm.org/bionic/ llvm-toolchain-bionic-9 main" >> /etc/apt/sources.list
RUN apt-get update -y \
&& env DEBIAN_FRONTEND=noninteractive \
apt-get install --yes --no-install-recommends \

View File

@ -25,6 +25,6 @@ Under the same conditions, ClickHouse can handle several hundred queries per sec
## Performance When Inserting Data {#performance-when-inserting-data}
We recommend inserting data in packets of at least 1000 rows, or no more than a single request per second. When inserting to a MergeTree table from a tab-separated dump, the insertion speed can be from 50 to 200 MB/s. If the inserted rows are around 1 Kb in size, the speed will be from 50,000 to 200,000 rows per second. If the rows are small, the performance can be higher in rows per second (on Banner System data -`>` 500,000 rows per second; on Graphite data -`>` 1,000,000 rows per second). To improve performance, you can make multiple INSERT queries in parallel, which scales linearly.
We recommend inserting data in packets of at least 1000 rows, or no more than a single request per second. When inserting to a MergeTree table from a tab-separated dump, the insertion speed can be from 50 to 200 MB/s. If the inserted rows are around 1 KB in size, the speed will be from 50,000 to 200,000 rows per second. If the rows are small, the performance can be higher in rows per second (on Banner System data -`>` 500,000 rows per second; on Graphite data -`>` 1,000,000 rows per second). To improve performance, you can make multiple INSERT queries in parallel, which scales linearly.
{## [Original article](https://clickhouse.tech/docs/en/introduction/performance/) ##}

View File

@ -91,6 +91,23 @@ The Linux kernel prior to 3.2 had a multitude of problems with IPv6 implementati
Use at least a 10 GB network, if possible. 1 Gb will also work, but it will be much worse for patching replicas with tens of terabytes of data, or for processing distributed queries with a large amount of intermediate data.
## Hypervisor configuration
If you are using OpenStack, set
```
cpu_mode=host-passthrough
```
in nova.conf.
If you are using libvirt, set
```
<cpu mode='host-passthrough'/>
```
in XML configuration.
This is important for ClickHouse to be able to get correct information with `cpuid` instruction.
Otherwise you may get `Illegal instruction` crashes when hypervisor is run on old CPU models.
## ZooKeeper {#zookeeper}
You are probably already using ZooKeeper for other purposes. You can use the same installation of ZooKeeper, if it isnt already overloaded.

View File

@ -593,6 +593,18 @@ SELECT dateDiff('hour', toDateTime('2018-01-01 22:00:00'), toDateTime('2018-01-0
Например, `timeSlots(toDateTime('2012-01-01 12:20:00'), toUInt32(600)) = [toDateTime('2012-01-01 12:00:00'), toDateTime('2012-01-01 12:30:00')]`.
Это нужно для поиска хитов, входящих в соответствующий визит.
## toYYYYMM
Переводит дату или дату со временем в число типа UInt32, содержащее номер года и месяца (YYYY * 100 + MM).
## toYYYYMMDD
Переводит дату или дату со временем в число типа UInt32, содержащее номер года, месяца и дня (YYYY * 10000 + MM * 100 + DD).
## toYYYYMMDDhhmmss
Переводит дату или дату со временем в число типа UInt64 содержащее номер года, месяца, дня и время (YYYY * 10000000000 + MM * 100000000 + DD * 1000000 + hh * 10000 + mm * 100 + ss).
## formatDateTime {#formatdatetime}
Функция преобразует дату-и-время в строку по заданному шаблону. Важно: шаблон — константное выражение, поэтому использовать разные шаблоны в одной колонке не получится.

View File

@ -14,7 +14,7 @@ toc_title: LIMIT
## LIMIT … WITH TIES 修饰符 {#limit-with-ties}
如果为 `LIMIT n[,m]` 设置了 `WITH TIES` ,并且声明了 `ORDER BY expr_list`, you will get in result first `n` or `n,m` rows and all rows with same `ORDER BY` fields values equal to row at position `n` for `LIMIT n` and `m` for `LIMIT n,m`.
如果为 `LIMIT n[,m]` 设置了 `WITH TIES` ,并且声明了 `ORDER BY expr_list`, 除了得到无修饰符的结果(正常情况下的 `limit n`, 前n行数据), 还会返回与第`n`行具有相同排序字段的行(即如果第n+1行的字段与第n行 拥有相同的排序字段,同样返回该结果.
此修饰符可以与: [ORDER BY … WITH FILL modifier](../../../sql-reference/statements/select/order-by.md#orderby-with-fill) 组合使用.
@ -38,7 +38,7 @@ SELECT * FROM (
└───┘
```
单子执行了 `WITH TIES` 修饰符后
添加 `WITH TIES` 修饰符后
``` sql
SELECT * FROM (
@ -59,4 +59,8 @@ SELECT * FROM (
└───┘
```
cause row number 6 have same value “2” for field `n` as row number 5
虽然指定了`LIMIT 5`, 但第6行的`n`字段值为2与第5行相同因此也作为满足条件的记录返回。
简而言之,该修饰符可理解为是否增加“并列行”的数据。
``` sql
``` sql

View File

@ -402,6 +402,7 @@ class IColumn;
M(Bool, enable_global_with_statement, false, "Propagate WITH statements to UNION queries and all subqueries", 0) \
M(Bool, aggregate_functions_null_for_empty, false, "Rewrite all aggregate functions in a query, adding -OrNull suffix to them", 0) \
M(Bool, optimize_skip_merged_partitions, false, "Skip partitions with one part with level > 0 in optimize final", 0) \
M(Bool, optimize_on_insert, true, "Do the same transformation for inserted block of data as if merge was done on this block.", 0) \
\
M(Bool, use_antlr_parser, false, "Parse incoming queries using ANTLR-generated parser", 0) \
\

View File

@ -30,7 +30,6 @@ struct SortCursorImpl
ColumnRawPtrs all_columns;
SortDescription desc;
size_t sort_columns_size = 0;
size_t pos = 0;
size_t rows = 0;
/** Determines order if comparing columns are equal.
@ -49,15 +48,20 @@ struct SortCursorImpl
/** Is there at least one column with Collator. */
bool has_collation = false;
/** We could use SortCursorImpl in case when columns aren't sorted
* but we have their sorted permutation
*/
IColumn::Permutation * permutation = nullptr;
SortCursorImpl() {}
SortCursorImpl(const Block & block, const SortDescription & desc_, size_t order_ = 0)
SortCursorImpl(const Block & block, const SortDescription & desc_, size_t order_ = 0, IColumn::Permutation * perm = nullptr)
: desc(desc_), sort_columns_size(desc.size()), order(order_), need_collation(desc.size())
{
reset(block);
reset(block, perm);
}
SortCursorImpl(const Columns & columns, const SortDescription & desc_, size_t order_ = 0)
SortCursorImpl(const Columns & columns, const SortDescription & desc_, size_t order_ = 0, IColumn::Permutation * perm = nullptr)
: desc(desc_), sort_columns_size(desc.size()), order(order_), need_collation(desc.size())
{
for (auto & column_desc : desc)
@ -66,19 +70,19 @@ struct SortCursorImpl
throw Exception("SortDescription should contain column position if SortCursor was used without header.",
ErrorCodes::LOGICAL_ERROR);
}
reset(columns, {});
reset(columns, {}, perm);
}
bool empty() const { return rows == 0; }
/// Set the cursor to the beginning of the new block.
void reset(const Block & block)
void reset(const Block & block, IColumn::Permutation * perm = nullptr)
{
reset(block.getColumns(), block);
reset(block.getColumns(), block, perm);
}
/// Set the cursor to the beginning of the new block.
void reset(const Columns & columns, const Block & block)
void reset(const Columns & columns, const Block & block, IColumn::Permutation * perm = nullptr)
{
all_columns.clear();
sort_columns.clear();
@ -96,18 +100,33 @@ struct SortCursorImpl
: column_desc.column_number;
sort_columns.push_back(columns[column_number].get());
need_collation[j] = desc[j].collator != nullptr && sort_columns.back()->isCollationSupported(); /// TODO Nullable(String)
need_collation[j] = desc[j].collator != nullptr && sort_columns.back()->isCollationSupported();
has_collation |= need_collation[j];
}
pos = 0;
rows = all_columns[0]->size();
permutation = perm;
}
size_t getRow() const
{
if (permutation)
return (*permutation)[pos];
return pos;
}
/// We need a possibility to change pos (see MergeJoin).
size_t & getPosRef() { return pos; }
bool isFirst() const { return pos == 0; }
bool isLast() const { return pos + 1 >= rows; }
bool isValid() const { return pos < rows; }
void next() { ++pos; }
/// Prevent using pos instead of getRow()
private:
size_t pos;
};
using SortCursorImpls = std::vector<SortCursorImpl>;
@ -127,7 +146,7 @@ struct SortCursorHelper
bool ALWAYS_INLINE greater(const SortCursorHelper & rhs) const
{
return derived().greaterAt(rhs.derived(), impl->pos, rhs.impl->pos);
return derived().greaterAt(rhs.derived(), impl->getRow(), rhs.impl->getRow());
}
/// Inverted so that the priority queue elements are removed in ascending order.

View File

@ -222,7 +222,7 @@ void MergingSortedBlockInputStream::merge(MutableColumns & merged_columns, TSort
// std::cerr << "total_merged_rows: " << total_merged_rows << ", merged_rows: " << merged_rows << "\n";
// std::cerr << "Inserting row\n";
for (size_t i = 0; i < num_columns; ++i)
merged_columns[i]->insertFrom(*current->all_columns[i], current->pos);
merged_columns[i]->insertFrom(*current->all_columns[i], current->getRow());
if (out_row_sources_buf)
{

View File

@ -1557,7 +1557,12 @@ void InterpreterSelectQuery::executeFetchColumns(
throw Exception("Logical error in InterpreterSelectQuery: nowhere to read", ErrorCodes::LOGICAL_ERROR);
/// Specify the number of threads only if it wasn't specified in storage.
if (!query_plan.getMaxThreads())
///
/// But in case of remote query and prefer_localhost_replica=1 (default)
/// The inner local query (that is done in the same process, without
/// network interaction), it will setMaxThreads earlier and distributed
/// query will not update it.
if (!query_plan.getMaxThreads() || is_remote)
query_plan.setMaxThreads(max_threads_execute_query);
/// Aliases in table declaration.

View File

@ -180,12 +180,16 @@ class MergeJoinCursor
public:
MergeJoinCursor(const Block & block, const SortDescription & desc_)
: impl(SortCursorImpl(block, desc_))
{}
{
/// SortCursorImpl can work with permutation, but MergeJoinCursor can't.
if (impl.permutation)
throw Exception("Logical error: MergeJoinCursor doesn't support permutation", ErrorCodes::LOGICAL_ERROR);
}
size_t position() const { return impl.pos; }
size_t position() const { return impl.getRow(); }
size_t end() const { return impl.rows; }
bool atEnd() const { return impl.pos >= impl.rows; }
void nextN(size_t num) { impl.pos += num; }
bool atEnd() const { return impl.getRow() >= impl.rows; }
void nextN(size_t num) { impl.getPosRef() += num; }
void setCompareNullability(const MergeJoinCursor & rhs)
{
@ -254,10 +258,10 @@ private:
else if (cmp > 0)
rhs.impl.next();
else if (!cmp)
return Range{impl.pos, rhs.impl.pos, getEqualLength(), rhs.getEqualLength()};
return Range{impl.getRow(), rhs.impl.getRow(), getEqualLength(), rhs.getEqualLength()};
}
return Range{impl.pos, rhs.impl.pos, 0, 0};
return Range{impl.getRow(), rhs.impl.getRow(), 0, 0};
}
template <bool left_nulls, bool right_nulls>
@ -268,7 +272,7 @@ private:
const auto * left_column = impl.sort_columns[i];
const auto * right_column = rhs.impl.sort_columns[i];
int res = nullableCompareAt<left_nulls, right_nulls>(*left_column, *right_column, impl.pos, rhs.impl.pos);
int res = nullableCompareAt<left_nulls, right_nulls>(*left_column, *right_column, impl.getRow(), rhs.impl.getRow());
if (res)
return res;
}
@ -278,11 +282,11 @@ private:
/// Expects !atEnd()
size_t getEqualLength()
{
size_t pos = impl.pos + 1;
size_t pos = impl.getRow() + 1;
for (; pos < impl.rows; ++pos)
if (!samePrev(pos))
break;
return pos - impl.pos;
return pos - impl.getRow();
}
/// Expects lhs_pos > 0

View File

@ -257,12 +257,12 @@ void AggregatingSortedAlgorithm::AggregatingMergedData::addRow(SortCursor & curs
throw Exception("Can't add a row to the group because it was not started.", ErrorCodes::LOGICAL_ERROR);
for (auto & desc : def.columns_to_aggregate)
desc.column->insertMergeFrom(*cursor->all_columns[desc.column_number], cursor->pos);
desc.column->insertMergeFrom(*cursor->all_columns[desc.column_number], cursor->getRow());
for (auto & desc : def.columns_to_simple_aggregate)
{
auto & col = cursor->all_columns[desc.column_number];
desc.add_function(desc.function.get(), desc.state.data(), &col, cursor->pos, arena.get());
desc.add_function(desc.function.get(), desc.state.data(), &col, cursor->getRow(), arena.get());
}
}
@ -352,7 +352,7 @@ IMergingAlgorithm::Status AggregatingSortedAlgorithm::merge()
return Status(merged_data.pull());
}
merged_data.startGroup(current->all_columns, current->pos);
merged_data.startGroup(current->all_columns, current->getRow());
}
merged_data.addRow(current);

View File

@ -26,9 +26,9 @@ CollapsingSortedAlgorithm::CollapsingSortedAlgorithm(
const String & sign_column,
bool only_positive_sign_,
size_t max_block_size,
Poco::Logger * log_,
WriteBuffer * out_row_sources_buf_,
bool use_average_block_sizes,
Poco::Logger * log_)
bool use_average_block_sizes)
: IMergingAlgorithmWithSharedChunks(num_inputs, std::move(description_), out_row_sources_buf_, max_row_refs)
, merged_data(header.cloneEmptyColumns(), use_average_block_sizes, max_block_size)
, sign_column_number(header.getPositionByName(sign_column))
@ -123,7 +123,7 @@ IMergingAlgorithm::Status CollapsingSortedAlgorithm::merge()
return Status(current.impl->order);
}
Int8 sign = assert_cast<const ColumnInt8 &>(*current->all_columns[sign_column_number]).getData()[current->pos];
Int8 sign = assert_cast<const ColumnInt8 &>(*current->all_columns[sign_column_number]).getData()[current->getRow()];
RowRef current_row;
setRowRef(current_row, current);

View File

@ -33,9 +33,9 @@ public:
const String & sign_column,
bool only_positive_sign_, /// For select final. Skip rows with sum(sign) < 0.
size_t max_block_size,
WriteBuffer * out_row_sources_buf_,
bool use_average_block_sizes,
Poco::Logger * log_);
Poco::Logger * log_,
WriteBuffer * out_row_sources_buf_ = nullptr,
bool use_average_block_sizes = false);
Status merge() override;

View File

@ -164,12 +164,12 @@ IMergingAlgorithm::Status GraphiteRollupSortedAlgorithm::merge()
return Status(current.impl->order);
}
StringRef next_path = current->all_columns[columns_definition.path_column_num]->getDataAt(current->pos);
StringRef next_path = current->all_columns[columns_definition.path_column_num]->getDataAt(current->getRow());
bool new_path = is_first || next_path != current_group_path;
is_first = false;
time_t next_row_time = current->all_columns[columns_definition.time_column_num]->getUInt(current->pos);
time_t next_row_time = current->all_columns[columns_definition.time_column_num]->getUInt(current->getRow());
/// Is new key before rounding.
bool is_new_key = new_path || next_row_time != current_time;
@ -227,7 +227,7 @@ IMergingAlgorithm::Status GraphiteRollupSortedAlgorithm::merge()
/// and for rows with same maximum version - only last row.
if (is_new_key
|| current->all_columns[columns_definition.version_column_num]->compareAt(
current->pos, current_subgroup_newest_row.row_num,
current->getRow(), current_subgroup_newest_row.row_num,
*(*current_subgroup_newest_row.all_columns)[columns_definition.version_column_num],
/* nan_direction_hint = */ 1) >= 0)
{
@ -263,7 +263,7 @@ IMergingAlgorithm::Status GraphiteRollupSortedAlgorithm::merge()
void GraphiteRollupSortedAlgorithm::startNextGroup(SortCursor & cursor, Graphite::RollupRule next_rule)
{
merged_data.startNextGroup(cursor->all_columns, cursor->pos, next_rule, columns_definition);
merged_data.startNextGroup(cursor->all_columns, cursor->getRow(), next_rule, columns_definition);
}
void GraphiteRollupSortedAlgorithm::finishCurrentGroup()

View File

@ -29,6 +29,8 @@ public:
/// between different algorithm objects in parallel FINAL.
bool skip_last_row = false;
IColumn::Permutation * permutation = nullptr;
void swap(Input & other)
{
chunk.swap(other.chunk);

View File

@ -22,7 +22,7 @@ void IMergingAlgorithmWithDelayedChunk::initializeQueue(Inputs inputs)
if (!current_inputs[source_num].chunk)
continue;
cursors[source_num] = SortCursorImpl(current_inputs[source_num].chunk.getColumns(), description, source_num);
cursors[source_num] = SortCursorImpl(current_inputs[source_num].chunk.getColumns(), description, source_num, current_inputs[source_num].permutation);
}
queue = SortingHeap<SortCursor>(cursors);
@ -37,7 +37,7 @@ void IMergingAlgorithmWithDelayedChunk::updateCursor(Input & input, size_t sourc
last_chunk_sort_columns = std::move(cursors[source_num].sort_columns);
current_input.swap(input);
cursors[source_num].reset(current_input.chunk.getColumns(), {});
cursors[source_num].reset(current_input.chunk.getColumns(), {}, current_input.permutation);
queue.push(cursors[source_num]);
}

View File

@ -39,7 +39,7 @@ void IMergingAlgorithmWithSharedChunks::initialize(Inputs inputs)
source.skip_last_row = inputs[source_num].skip_last_row;
source.chunk = chunk_allocator.alloc(inputs[source_num].chunk);
cursors[source_num] = SortCursorImpl(source.chunk->getColumns(), description, source_num);
cursors[source_num] = SortCursorImpl(source.chunk->getColumns(), description, source_num, inputs[source_num].permutation);
source.chunk->all_columns = cursors[source_num].all_columns;
source.chunk->sort_columns = cursors[source_num].sort_columns;
@ -55,7 +55,7 @@ void IMergingAlgorithmWithSharedChunks::consume(Input & input, size_t source_num
auto & source = sources[source_num];
source.skip_last_row = input.skip_last_row;
source.chunk = chunk_allocator.alloc(input.chunk);
cursors[source_num].reset(source.chunk->getColumns(), {});
cursors[source_num].reset(source.chunk->getColumns(), {}, input.permutation);
source.chunk->all_columns = cursors[source_num].all_columns;
source.chunk->sort_columns = cursors[source_num].sort_columns;

View File

@ -139,7 +139,7 @@ IMergingAlgorithm::Status MergingSortedAlgorithm::mergeImpl(TSortingHeap & queue
//std::cerr << "total_merged_rows: " << total_merged_rows << ", merged_rows: " << merged_rows << "\n";
//std::cerr << "Inserting row\n";
merged_data.insertRow(current->all_columns, current->pos, current->rows);
merged_data.insertRow(current->all_columns, current->getRow(), current->rows);
if (out_row_sources_buf)
{

View File

@ -18,9 +18,9 @@ public:
size_t num_inputs,
SortDescription description_,
size_t max_block_size,
UInt64 limit_,
WriteBuffer * out_row_sources_buf_,
bool use_average_block_sizes);
UInt64 limit_ = 0,
WriteBuffer * out_row_sources_buf_ = nullptr,
bool use_average_block_sizes = false);
void addInput();

View File

@ -73,7 +73,7 @@ IMergingAlgorithm::Status ReplacingSortedAlgorithm::merge()
if (version_column_number == -1
|| selected_row.empty()
|| current->all_columns[version_column_number]->compareAt(
current->pos, selected_row.row_num,
current->getRow(), selected_row.row_num,
*(*selected_row.all_columns)[version_column_number],
/* nan_direction_hint = */ 1) >= 0)
{

View File

@ -136,7 +136,7 @@ struct RowRef
{
sort_columns = cursor.impl->sort_columns.data();
num_columns = cursor.impl->sort_columns.size();
row_num = cursor.impl->pos;
row_num = cursor.impl->getRow();
}
static bool checkEquals(size_t size, const IColumn ** lhs, size_t lhs_row, const IColumn ** rhs, size_t rhs_row)
@ -192,7 +192,7 @@ struct RowRefWithOwnedChunk
void set(SortCursor & cursor, SharedChunkPtr chunk)
{
owned_chunk = std::move(chunk);
row_num = cursor.impl->pos;
row_num = cursor.impl->getRow();
all_columns = &owned_chunk->all_columns;
sort_columns = &owned_chunk->sort_columns;
}

View File

@ -688,10 +688,10 @@ IMergingAlgorithm::Status SummingSortedAlgorithm::merge()
return Status(merged_data.pull());
}
merged_data.startGroup(current->all_columns, current->pos);
merged_data.startGroup(current->all_columns, current->getRow());
}
else
merged_data.addRow(current->all_columns, current->pos);
merged_data.addRow(current->all_columns, current->getRow());
if (!current->isLast())
{

View File

@ -73,7 +73,7 @@ IMergingAlgorithm::Status VersionedCollapsingAlgorithm::merge()
RowRef current_row;
Int8 sign = assert_cast<const ColumnInt8 &>(*current->all_columns[sign_column_number]).getData()[current->pos];
Int8 sign = assert_cast<const ColumnInt8 &>(*current->all_columns[sign_column_number]).getData()[current->getRow()];
setRowRef(current_row, current);

View File

@ -27,9 +27,9 @@ public:
sign_column,
only_positive_sign,
max_block_size,
&Poco::Logger::get("CollapsingSortedTransform"),
out_row_sources_buf_,
use_average_block_sizes,
&Poco::Logger::get("CollapsingSortedTransform"))
use_average_block_sizes)
{
}

View File

@ -100,7 +100,7 @@ Chunk MergeSorter::mergeImpl(TSortingHeap & queue)
/// Append a row from queue.
for (size_t i = 0; i < num_columns; ++i)
merged_columns[i]->insertFrom(*current->all_columns[i], current->pos);
merged_columns[i]->insertFrom(*current->all_columns[i], current->getRow());
++total_merged_rows;
++merged_rows;

View File

@ -22,7 +22,7 @@ void MergeTreeBlockOutputStream::write(const Block & block)
{
Stopwatch watch;
MergeTreeData::MutableDataPartPtr part = storage.writer.writeTempPart(current_block, metadata_snapshot);
MergeTreeData::MutableDataPartPtr part = storage.writer.writeTempPart(current_block, metadata_snapshot, optimize_on_insert);
storage.renameTempPartAndAdd(part, &storage.increment);
PartLog::addNewPart(storage.global_context, part, watch.elapsed());

View File

@ -14,10 +14,11 @@ class StorageMergeTree;
class MergeTreeBlockOutputStream : public IBlockOutputStream
{
public:
MergeTreeBlockOutputStream(StorageMergeTree & storage_, const StorageMetadataPtr metadata_snapshot_, size_t max_parts_per_block_)
MergeTreeBlockOutputStream(StorageMergeTree & storage_, const StorageMetadataPtr metadata_snapshot_, size_t max_parts_per_block_, bool optimize_on_insert_)
: storage(storage_)
, metadata_snapshot(metadata_snapshot_)
, max_parts_per_block(max_parts_per_block_)
, optimize_on_insert(optimize_on_insert_)
{
}
@ -28,6 +29,7 @@ private:
StorageMergeTree & storage;
StorageMetadataPtr metadata_snapshot;
size_t max_parts_per_block;
bool optimize_on_insert;
};
}

View File

@ -16,6 +16,14 @@
#include <Parsers/queryToString.h>
#include <Processors/Merges/Algorithms/ReplacingSortedAlgorithm.h>
#include <Processors/Merges/Algorithms/MergingSortedAlgorithm.h>
#include <Processors/Merges/Algorithms/CollapsingSortedAlgorithm.h>
#include <Processors/Merges/Algorithms/SummingSortedAlgorithm.h>
#include <Processors/Merges/Algorithms/AggregatingSortedAlgorithm.h>
#include <Processors/Merges/Algorithms/VersionedCollapsingAlgorithm.h>
#include <Processors/Merges/Algorithms/GraphiteRollupSortedAlgorithm.h>
namespace ProfileEvents
{
extern const Event MergeTreeDataWriterBlocks;
@ -194,7 +202,74 @@ BlocksWithPartition MergeTreeDataWriter::splitBlockIntoParts(const Block & block
return result;
}
MergeTreeData::MutableDataPartPtr MergeTreeDataWriter::writeTempPart(BlockWithPartition & block_with_partition, const StorageMetadataPtr & metadata_snapshot)
Block MergeTreeDataWriter::mergeBlock(const Block & block, SortDescription sort_description, Names & partition_key_columns, IColumn::Permutation *& permutation)
{
size_t block_size = block.rows();
auto get_merging_algorithm = [&]() -> std::shared_ptr<IMergingAlgorithm>
{
switch (data.merging_params.mode)
{
/// There is nothing to merge in single block in ordinary MergeTree
case MergeTreeData::MergingParams::Ordinary:
return nullptr;
case MergeTreeData::MergingParams::Replacing:
return std::make_shared<ReplacingSortedAlgorithm>(
block, 1, sort_description, data.merging_params.version_column, block_size + 1);
case MergeTreeData::MergingParams::Collapsing:
return std::make_shared<CollapsingSortedAlgorithm>(
block, 1, sort_description, data.merging_params.sign_column,
false, block_size + 1, &Poco::Logger::get("MergeTreeBlockOutputStream"));
case MergeTreeData::MergingParams::Summing:
return std::make_shared<SummingSortedAlgorithm>(
block, 1, sort_description, data.merging_params.columns_to_sum,
partition_key_columns, block_size + 1);
case MergeTreeData::MergingParams::Aggregating:
return std::make_shared<AggregatingSortedAlgorithm>(block, 1, sort_description, block_size + 1);
case MergeTreeData::MergingParams::VersionedCollapsing:
return std::make_shared<VersionedCollapsingAlgorithm>(
block, 1, sort_description, data.merging_params.sign_column, block_size + 1);
case MergeTreeData::MergingParams::Graphite:
return std::make_shared<GraphiteRollupSortedAlgorithm>(
block, 1, sort_description, block_size + 1, data.merging_params.graphite_params, time(nullptr));
}
__builtin_unreachable();
};
auto merging_algorithm = get_merging_algorithm();
if (!merging_algorithm)
return block;
Chunk chunk(block.getColumns(), block_size);
IMergingAlgorithm::Input input;
input.set(std::move(chunk));
input.permutation = permutation;
IMergingAlgorithm::Inputs inputs;
inputs.push_back(std::move(input));
merging_algorithm->initialize(std::move(inputs));
IMergingAlgorithm::Status status = merging_algorithm->merge();
/// Check that after first merge merging_algorithm is waiting for data from input 0.
if (status.required_source != 0)
throw Exception("Logical error: required source after the first merge is not 0.", ErrorCodes::LOGICAL_ERROR);
status = merging_algorithm->merge();
/// Check that merge is finished.
if (!status.is_finished)
throw Exception("Logical error: merge is not finished after the second merge.", ErrorCodes::LOGICAL_ERROR);
/// Merged Block is sorted and we don't need to use permutation anymore
permutation = nullptr;
return block.cloneWithColumns(status.chunk.getColumns());
}
MergeTreeData::MutableDataPartPtr MergeTreeDataWriter::writeTempPart(BlockWithPartition & block_with_partition, const StorageMetadataPtr & metadata_snapshot, bool optimize_on_insert)
{
Block & block = block_with_partition.block;
@ -228,6 +303,38 @@ MergeTreeData::MutableDataPartPtr MergeTreeDataWriter::writeTempPart(BlockWithPa
else
part_name = new_part_info.getPartName();
/// If we need to calculate some columns to sort.
if (metadata_snapshot->hasSortingKey() || metadata_snapshot->hasSecondaryIndices())
data.getSortingKeyAndSkipIndicesExpression(metadata_snapshot)->execute(block);
Names sort_columns = metadata_snapshot->getSortingKeyColumns();
SortDescription sort_description;
size_t sort_columns_size = sort_columns.size();
sort_description.reserve(sort_columns_size);
for (size_t i = 0; i < sort_columns_size; ++i)
sort_description.emplace_back(block.getPositionByName(sort_columns[i]), 1, 1);
ProfileEvents::increment(ProfileEvents::MergeTreeDataWriterBlocks);
/// Sort
IColumn::Permutation * perm_ptr = nullptr;
IColumn::Permutation perm;
if (!sort_description.empty())
{
if (!isAlreadySorted(block, sort_description))
{
stableGetPermutation(block, sort_description, perm);
perm_ptr = &perm;
}
else
ProfileEvents::increment(ProfileEvents::MergeTreeDataWriterBlocksAlreadySorted);
}
Names partition_key_columns = metadata_snapshot->getPartitionKey().column_names;
if (optimize_on_insert)
block = mergeBlock(block, sort_description, partition_key_columns, perm_ptr);
/// Size of part would not be greater than block.bytes() + epsilon
size_t expected_size = block.bytes();
@ -274,34 +381,6 @@ MergeTreeData::MutableDataPartPtr MergeTreeDataWriter::writeTempPart(BlockWithPa
sync_guard.emplace(disk, full_path);
}
/// If we need to calculate some columns to sort.
if (metadata_snapshot->hasSortingKey() || metadata_snapshot->hasSecondaryIndices())
data.getSortingKeyAndSkipIndicesExpression(metadata_snapshot)->execute(block);
Names sort_columns = metadata_snapshot->getSortingKeyColumns();
SortDescription sort_description;
size_t sort_columns_size = sort_columns.size();
sort_description.reserve(sort_columns_size);
for (size_t i = 0; i < sort_columns_size; ++i)
sort_description.emplace_back(block.getPositionByName(sort_columns[i]), 1, 1);
ProfileEvents::increment(ProfileEvents::MergeTreeDataWriterBlocks);
/// Sort
IColumn::Permutation * perm_ptr = nullptr;
IColumn::Permutation perm;
if (!sort_description.empty())
{
if (!isAlreadySorted(block, sort_description))
{
stableGetPermutation(block, sort_description, perm);
perm_ptr = &perm;
}
else
ProfileEvents::increment(ProfileEvents::MergeTreeDataWriterBlocksAlreadySorted);
}
if (metadata_snapshot->hasRowsTTL())
updateTTL(metadata_snapshot->getRowsTTL(), new_data_part->ttl_infos, new_data_part->ttl_infos.table_ttl, block, true);

View File

@ -45,7 +45,9 @@ public:
/** All rows must correspond to same partition.
* Returns part with unique name starting with 'tmp_', yet not added to MergeTreeData.
*/
MergeTreeData::MutableDataPartPtr writeTempPart(BlockWithPartition & block, const StorageMetadataPtr & metadata_snapshot);
MergeTreeData::MutableDataPartPtr writeTempPart(BlockWithPartition & block, const StorageMetadataPtr & metadata_snapshot, bool optimize_on_insert);
Block mergeBlock(const Block & block, SortDescription sort_description, Names & partition_key_columns, IColumn::Permutation *& permutation);
private:
MergeTreeData & data;

View File

@ -40,7 +40,8 @@ ReplicatedMergeTreeBlockOutputStream::ReplicatedMergeTreeBlockOutputStream(
size_t quorum_timeout_ms_,
size_t max_parts_per_block_,
bool quorum_parallel_,
bool deduplicate_)
bool deduplicate_,
bool optimize_on_insert_)
: storage(storage_)
, metadata_snapshot(metadata_snapshot_)
, quorum(quorum_)
@ -49,6 +50,7 @@ ReplicatedMergeTreeBlockOutputStream::ReplicatedMergeTreeBlockOutputStream(
, quorum_parallel(quorum_parallel_)
, deduplicate(deduplicate_)
, log(&Poco::Logger::get(storage.getLogName() + " (Replicated OutputStream)"))
, optimize_on_insert(optimize_on_insert_)
{
/// The quorum value `1` has the same meaning as if it is disabled.
if (quorum == 1)
@ -142,7 +144,7 @@ void ReplicatedMergeTreeBlockOutputStream::write(const Block & block)
/// Write part to the filesystem under temporary name. Calculate a checksum.
MergeTreeData::MutableDataPartPtr part = storage.writer.writeTempPart(current_block, metadata_snapshot);
MergeTreeData::MutableDataPartPtr part = storage.writer.writeTempPart(current_block, metadata_snapshot, optimize_on_insert);
String block_id;

View File

@ -29,7 +29,8 @@ public:
size_t quorum_timeout_ms_,
size_t max_parts_per_block_,
bool quorum_parallel_,
bool deduplicate_);
bool deduplicate_,
bool optimize_on_insert);
Block getHeader() const override;
void writePrefix() override;
@ -71,6 +72,8 @@ private:
using Logger = Poco::Logger;
Poco::Logger * log;
bool optimize_on_insert;
};
}

View File

@ -3,6 +3,83 @@
#include <Storages/MergeTree/MergeSelector.h>
/**
We have a set of data parts that is dynamically changing - new data parts are added and there is background merging process.
Background merging process periodically selects continuous range of data parts to merge.
It tries to optimize the following metrics:
1. Write amplification: total amount of data written on disk (source data + merges) relative to the amount of source data.
It can be also considered as the total amount of work for merges.
2. The number of data parts in the set at the random moment of time (average + quantiles).
Also taking the following considerations:
1. Older data parts should be merged less frequently than newer data parts.
2. Larger data parts should be merged less frequently than smaller data parts.
3. If no new parts arrive, we should continue to merge existing data parts to eventually optimize the table.
4. Never allow too many parts, because it will slow down SELECT queries significantly.
5. Multiple background merges can run concurrently but not too many.
It is not possible to optimize both metrics, because they contradict to each other.
To lower the number of parts we can merge eagerly but write amplification will increase.
Then we need some balance between optimization of these two metrics.
But some optimizations may improve both metrics.
For example, we can look at the "merge tree" - the tree of data parts that were merged.
If the tree is perfectly balanced then its depth is proportonal to the log(data size),
the total amount of work is proportional to data_size * log(data_size)
and the write amplification is proportional to log(data_size).
If it's not balanced (e.g. every new data part is always merged with existing data parts),
its depth is proportional to the data size, total amount of work is proportional to data_size^2.
We can also control the "base of the logarithm" - you can look it as the number of data parts
that are merged at once (the tree "arity"). But as the data parts are of different size, we should generalize it:
calculate the ratio between total size of merged parts to the size of the largest part participated in merge.
For example, if we merge 4 parts of size 5, 3, 1, 1 - then "base" will be 2 (10 / 5).
Base of the logarithm (simply called `base` in `SimpleMergeSelector`) is the main knob to control the write amplification.
The more it is, the less is write amplification but we will have more data parts on average.
To fit all the considerations, we also adjust `base` depending on total parts count,
parts size and parts age, with linear interpolation (then `base` is not a constant but a function of multiple variables,
looking like a section of hyperplane).
Then we apply the algorithm to select the optimal range of data parts to merge.
There is a proof that this algorithm is optimal if we look in the future only by single step.
The best range of data parts is selected.
We also apply some tunes:
- there is a fixed const of merging small parts (that is added to the size of data part before all estimations);
- there are some heuristics to "stick" ranges to large data parts.
It's still unclear if this algorithm is good or optimal at all. It's unclear if this algorithm is using the optimal coefficients.
To test and optimize SimpleMergeSelector, we apply the following methods:
- insert/merge simulator: a model simulating parts insertion and merging;
merge selecting algorithm is applied and the relevant metrics are calculated to allow to tune the algorithm;
- insert/merge simulator on real `system.part_log` from production - it gives realistic information about inserted data parts:
their sizes, at what time intervals they are inserted;
There is a research thesis dedicated to optimization of merge algorithm:
https://presentations.clickhouse.tech/hse_2019/merge_algorithm.pptx
This work made attempt to variate the coefficients in SimpleMergeSelector and to solve the optimization task:
maybe some change in coefficients will give a clear win on all metrics. Surprisingly enough, it has found
that our selection of coefficients is near optimal. It has found slightly more optimal coefficients,
but I decided not to use them, because the representativeness of the test data is in question.
This work did not make any attempt to propose any other algorithm.
This work did not make any attempt to analyze the task with analytical methods.
That's why I still believe that there are many opportunities to optimize the merge selection algorithm.
Please do not mix the task with a similar task in other LSM-based systems (like RocksDB).
Their problem statement is subtly different. Our set of data parts is consisted of data parts
that are completely independent in stored data. Ranges of primary keys in data parts can intersect.
When doing SELECT we read from all data parts. INSERTed data parts comes with unknown size...
*/
namespace DB
{

View File

@ -84,22 +84,6 @@ StorageMerge::StorageMerge(
setInMemoryMetadata(storage_metadata);
}
StorageMerge::StorageMerge(
const StorageID & table_id_,
const ColumnsDescription & columns_,
const String & source_database_,
const Tables & tables_,
const Context & context_)
: IStorage(table_id_)
, source_database(source_database_)
, tables(tables_)
, global_context(context_.getGlobalContext())
{
StorageInMemoryMetadata storage_metadata;
storage_metadata.setColumns(columns_);
setInMemoryMetadata(storage_metadata);
}
template <typename F>
StoragePtr StorageMerge::getFirstTable(F && predicate) const
{
@ -455,12 +439,8 @@ DatabaseTablesIteratorPtr StorageMerge::getDatabaseIterator(const Context & cont
e.addMessage("while getting table iterator of Merge table. Maybe caused by two Merge tables that will endlessly try to read each other's data");
throw;
}
if (tables)
return std::make_unique<DatabaseTablesSnapshotIterator>(*tables, source_database);
auto database = DatabaseCatalog::instance().getDatabase(source_database);
auto table_name_match = [this](const String & table_name_) { return table_name_regexp->match(table_name_); };
auto table_name_match = [this](const String & table_name_) { return table_name_regexp.match(table_name_); };
return database->getTablesIterator(context, table_name_match);
}

View File

@ -48,8 +48,7 @@ public:
private:
String source_database;
std::optional<OptimizedRegularExpression> table_name_regexp;
std::optional<Tables> tables;
OptimizedRegularExpression table_name_regexp;
const Context & global_context;
using StorageWithLockAndName = std::tuple<StoragePtr, TableLockHolder, String>;
@ -76,13 +75,6 @@ protected:
const String & table_name_regexp_,
const Context & context_);
StorageMerge(
const StorageID & table_id_,
const ColumnsDescription & columns_,
const String & source_database_,
const Tables & source_tables_,
const Context & context_);
Pipe createSources(
const StorageMetadataPtr & metadata_snapshot,
SelectQueryInfo & query_info,

View File

@ -233,7 +233,7 @@ BlockOutputStreamPtr StorageMergeTree::write(const ASTPtr & /*query*/, const Sto
const auto & settings = context.getSettingsRef();
return std::make_shared<MergeTreeBlockOutputStream>(
*this, metadata_snapshot, settings.max_partitions_per_insert_block);
*this, metadata_snapshot, settings.max_partitions_per_insert_block, context.getSettingsRef().optimize_on_insert);
}
void StorageMergeTree::checkTableCanBeDropped() const

View File

@ -3861,7 +3861,8 @@ BlockOutputStreamPtr StorageReplicatedMergeTree::write(const ASTPtr & /*query*/,
query_settings.insert_quorum_timeout.totalMilliseconds(),
query_settings.max_partitions_per_insert_block,
query_settings.insert_quorum_parallel,
deduplicate);
deduplicate,
context.getSettingsRef().optimize_on_insert);
}
@ -4444,7 +4445,7 @@ PartitionCommandsResultInfo StorageReplicatedMergeTree::attachPartition(
PartsTemporaryRename renamed_parts(*this, "detached/");
MutableDataPartsVector loaded_parts = tryLoadPartsToAttach(partition, attach_part, query_context, renamed_parts);
ReplicatedMergeTreeBlockOutputStream output(*this, metadata_snapshot, 0, 0, 0, false, false); /// TODO Allow to use quorum here.
ReplicatedMergeTreeBlockOutputStream output(*this, metadata_snapshot, 0, 0, 0, false, false, false); /// TODO Allow to use quorum here.
for (size_t i = 0; i < loaded_parts.size(); ++i)
{
String old_name = loaded_parts[i]->name;

View File

@ -39,7 +39,8 @@ namespace DB
* - the structure of the table (/metadata, /columns)
* - action log with data (/log/log-...,/replicas/replica_name/queue/queue-...);
* - a replica list (/replicas), and replica activity tag (/replicas/replica_name/is_active), replica addresses (/replicas/replica_name/host);
* - select the leader replica (/leader_election) - this is the replica that assigns the merge;
* - select the leader replica (/leader_election) - these are the replicas that assigning merges, mutations and partition manipulations
* (after ClickHouse version 20.5 we allow multiple leaders to act concurrently);
* - a set of parts of data on each replica (/replicas/replica_name/parts);
* - list of the last N blocks of data with checksum, for deduplication (/blocks);
* - the list of incremental block numbers (/block_numbers) that we are about to insert,

View File

@ -6,7 +6,6 @@
#include <TableFunctions/ITableFunction.h>
#include <Interpreters/evaluateConstantExpression.h>
#include <Interpreters/Context.h>
#include <Access/ContextAccess.h>
#include <TableFunctions/TableFunctionMerge.h>
#include <TableFunctions/TableFunctionFactory.h>
#include <TableFunctions/registerTableFunctions.h>
@ -23,6 +22,29 @@ namespace ErrorCodes
}
static NamesAndTypesList chooseColumns(const String & source_database, const String & table_name_regexp_, const Context & context)
{
OptimizedRegularExpression table_name_regexp(table_name_regexp_);
auto table_name_match = [&](const String & table_name) { return table_name_regexp.match(table_name); };
StoragePtr any_table;
{
auto database = DatabaseCatalog::instance().getDatabase(source_database);
auto iterator = database->getTablesIterator(context, table_name_match);
if (iterator->isValid())
if (const auto & table = iterator->table())
any_table = table;
}
if (!any_table)
throw Exception("Error while executing table function merge. In database " + source_database + " no one matches regular expression: "
+ table_name_regexp_, ErrorCodes::UNKNOWN_TABLE);
return any_table->getInMemoryMetadataPtr()->getColumns().getAllPhysical();
}
void TableFunctionMerge::parseArguments(const ASTPtr & ast_function, const Context & context)
{
ASTs & args_func = ast_function->children;
@ -46,46 +68,9 @@ void TableFunctionMerge::parseArguments(const ASTPtr & ast_function, const Conte
table_name_regexp = args[1]->as<ASTLiteral &>().value.safeGet<String>();
}
const Tables & TableFunctionMerge::getMatchingTables(const Context & context) const
{
if (tables)
return *tables;
auto database = DatabaseCatalog::instance().getDatabase(source_database);
OptimizedRegularExpression re(table_name_regexp);
auto table_name_match = [&](const String & table_name_) { return re.match(table_name_); };
auto access = context.getAccess();
bool granted_show_on_all_tables = access->isGranted(AccessType::SHOW_TABLES, source_database);
bool granted_select_on_all_tables = access->isGranted(AccessType::SELECT, source_database);
tables.emplace();
for (auto it = database->getTablesIterator(context, table_name_match); it->isValid(); it->next())
{
if (!it->table())
continue;
bool granted_show = granted_show_on_all_tables || access->isGranted(AccessType::SHOW_TABLES, source_database, it->name());
if (!granted_show)
continue;
if (!granted_select_on_all_tables)
access->checkAccess(AccessType::SELECT, source_database, it->name());
tables->emplace(it->name(), it->table());
}
if (tables->empty())
throw Exception("Error while executing table function merge. In database " + source_database + " no one matches regular expression: "
+ table_name_regexp, ErrorCodes::UNKNOWN_TABLE);
return *tables;
}
ColumnsDescription TableFunctionMerge::getActualTableStructure(const Context & context) const
{
auto first_table = getMatchingTables(context).begin()->second;
return ColumnsDescription{first_table->getInMemoryMetadataPtr()->getColumns().getAllPhysical()};
return ColumnsDescription{chooseColumns(source_database, table_name_regexp, context)};
}
StoragePtr TableFunctionMerge::executeImpl(const ASTPtr & /*ast_function*/, const Context & context, const std::string & table_name, ColumnsDescription /*cached_columns*/) const
@ -94,7 +79,7 @@ StoragePtr TableFunctionMerge::executeImpl(const ASTPtr & /*ast_function*/, cons
StorageID(getDatabaseName(), table_name),
getActualTableStructure(context),
source_database,
getMatchingTables(context),
table_name_regexp,
context);
res->startup();

View File

@ -19,13 +19,11 @@ private:
StoragePtr executeImpl(const ASTPtr & ast_function, const Context & context, const std::string & table_name, ColumnsDescription cached_columns) const override;
const char * getStorageTypeName() const override { return "Merge"; }
const Tables & getMatchingTables(const Context & context) const;
ColumnsDescription getActualTableStructure(const Context & context) const override;
void parseArguments(const ASTPtr & ast_function, const Context & context) override;
String source_database;
String table_name_regexp;
mutable std::optional<Tables> tables;
};

View File

@ -15,6 +15,7 @@ from subprocess import check_call
from subprocess import Popen
from subprocess import PIPE
from subprocess import CalledProcessError
from subprocess import TimeoutExpired
from datetime import datetime
from time import time, sleep
from errno import ESRCH
@ -114,6 +115,7 @@ def get_db_engine(args):
def run_single_test(args, ext, server_logs_level, client_options, case_file, stdout_file, stderr_file):
# print(client_options)
start_time = datetime.now()
if args.database:
database = args.database
os.environ.setdefault("CLICKHOUSE_DATABASE", database)
@ -129,7 +131,11 @@ def run_single_test(args, ext, server_logs_level, client_options, case_file, std
database = 'test_{suffix}'.format(suffix=random_str())
clickhouse_proc_create = Popen(shlex.split(args.client), stdin=PIPE, stdout=PIPE, stderr=PIPE, universal_newlines=True)
clickhouse_proc_create.communicate(("CREATE DATABASE " + database + get_db_engine(args)))
try:
clickhouse_proc_create.communicate(("CREATE DATABASE " + database + get_db_engine(args)), timeout=args.timeout)
except TimeoutExpired:
total_time = (datetime.now() - start_time).total_seconds()
return clickhouse_proc_create, "", "Timeout creating database {} before test".format(database), total_time
os.environ["CLICKHOUSE_DATABASE"] = database
@ -152,14 +158,24 @@ def run_single_test(args, ext, server_logs_level, client_options, case_file, std
# print(command)
proc = Popen(command, shell=True, env=os.environ)
start_time = datetime.now()
while (datetime.now() - start_time).total_seconds() < args.timeout and proc.poll() is None:
sleep(0.01)
if not args.database:
clickhouse_proc_create = Popen(shlex.split(args.client), stdin=PIPE, stdout=PIPE, stderr=PIPE, universal_newlines=True)
clickhouse_proc_create.communicate(("DROP DATABASE " + database))
seconds_left = max(args.timeout - (datetime.now() - start_time).total_seconds(), 10)
try:
clickhouse_proc_create.communicate(("DROP DATABASE " + database), timeout=seconds_left)
except TimeoutExpired:
# kill test process because it can also hung
if proc.returncode is None:
try:
proc.kill()
except OSError as e:
if e.errno != ESRCH:
raise
return clickhouse_proc_create, "", "Timeout dropping database {} after test".format(database), total_time
total_time = (datetime.now() - start_time).total_seconds()
@ -305,7 +321,7 @@ def run_tests_array(all_tests_with_params):
if args.testname:
clickhouse_proc = Popen(shlex.split(args.client), stdin=PIPE, stdout=PIPE, stderr=PIPE, universal_newlines=True)
clickhouse_proc.communicate(("SELECT 'Running test {suite}/{case} from pid={pid}';".format(pid = os.getpid(), case = case, suite = suite)))
clickhouse_proc.communicate(("SELECT 'Running test {suite}/{case} from pid={pid}';".format(pid = os.getpid(), case = case, suite = suite)), timeout=10)
if clickhouse_proc.returncode != 0:
failures += 1
@ -330,6 +346,8 @@ def run_tests_array(all_tests_with_params):
print(MSG_FAIL, end='')
print_test_time(total_time)
print(" - Timeout!")
if stderr:
print(stderr)
else:
counter = 1
while proc.returncode != 0 and need_retry(stderr):

View File

@ -135,3 +135,13 @@ named `test.py` containing tests in it. All functions with names starting with `
To assert that two TSV files must be equal, wrap them in the `TSV` class and use the regular `assert`
statement. Example: `assert TSV(result) == TSV(reference)`. In case the assertion fails, `pytest`
will automagically detect the types of variables and only the small diff of two files is printed.
### Troubleshooting
If tests failing for misterious reasons, this may help:
```
sudo service docker stop
sudo bash -c 'rm -rf /var/lib/docker/*'
sudo service docker start
```

View File

@ -0,0 +1,8 @@
<?xml version="1.0"?>
<yandex>
<profiles>
<default>
<optimize_on_insert>0</optimize_on_insert>
</default>
</profiles>
</yandex>

View File

@ -8,7 +8,8 @@ from helpers.test_tools import TSV
cluster = ClickHouseCluster(__file__)
instance = cluster.add_instance('instance',
main_configs=['configs/graphite_rollup.xml'])
main_configs=['configs/graphite_rollup.xml'],
user_configs=["configs/users.xml"])
q = instance.query

View File

@ -3,6 +3,8 @@
<profiles>
<default>
<allow_experimental_database_materialize_mysql>1</allow_experimental_database_materialize_mysql>
<allow_introspection_functions>1</allow_introspection_functions>
<optimize_on_insert>0</optimize_on_insert>
<default_database_engine>Ordinary</default_database_engine>
</default>
</profiles>

View File

@ -1,5 +1,10 @@
<?xml version="1.0"?>
<yandex>
<profiles>
<default>
<optimize_on_insert>0</optimize_on_insert>
</default>
</profiles>
<users>
<another>
<password/>

View File

@ -1,55 +0,0 @@
import pytest
from helpers.cluster import ClickHouseCluster
from helpers.test_tools import TSV
cluster = ClickHouseCluster(__file__)
instance = cluster.add_instance('instance')
@pytest.fixture(scope="module", autouse=True)
def started_cluster():
try:
cluster.start()
instance.query("CREATE TABLE table1(x UInt32) ENGINE = MergeTree ORDER BY tuple()")
instance.query("CREATE TABLE table2(x UInt32) ENGINE = MergeTree ORDER BY tuple()")
instance.query("INSERT INTO table1 VALUES (1)")
instance.query("INSERT INTO table2 VALUES (2)")
yield cluster
finally:
cluster.shutdown()
@pytest.fixture(autouse=True)
def cleanup_after_test():
try:
yield
finally:
instance.query("DROP USER IF EXISTS A")
def test_merge():
select_query = "SELECT * FROM merge('default', 'table[0-9]+') ORDER BY x"
assert instance.query(select_query) == "1\n2\n"
instance.query("CREATE USER A")
assert "it's necessary to have the grant CREATE TEMPORARY TABLE ON *.*" in instance.query_and_get_error(select_query, user = 'A')
instance.query("GRANT CREATE TEMPORARY TABLE ON *.* TO A")
assert "no one matches regular expression" in instance.query_and_get_error(select_query, user = 'A')
instance.query("GRANT SELECT ON default.table1 TO A")
assert instance.query(select_query, user = 'A') == "1\n"
instance.query("GRANT SELECT ON default.* TO A")
assert instance.query(select_query, user = 'A') == "1\n2\n"
instance.query("REVOKE SELECT ON default.table1 FROM A")
assert instance.query(select_query, user = 'A') == "2\n"
instance.query("REVOKE ALL ON default.* FROM A")
instance.query("GRANT SELECT ON default.table1 TO A")
instance.query("GRANT INSERT ON default.table2 TO A")
assert "it's necessary to have the grant SELECT ON default.table2" in instance.query_and_get_error(select_query, user = 'A')

View File

@ -1,3 +1,5 @@
SET optimize_on_insert = 0;
DROP TABLE IF EXISTS merge_tree;
DROP TABLE IF EXISTS collapsing_merge_tree;
DROP TABLE IF EXISTS versioned_collapsing_merge_tree;

View File

@ -1,3 +1,5 @@
SET optimize_on_insert = 0;
DROP TABLE IF EXISTS summing_composite_key;
CREATE TABLE summing_composite_key (d Date, k UInt64, FirstMap Nested(k1 UInt32, k2ID Int8, s Float64), SecondMap Nested(k1ID UInt64, k2Key String, k3Type Int32, s Int64)) ENGINE = SummingMergeTree(d, k, 1);

View File

@ -42,13 +42,13 @@ $CLICKHOUSE_CLIENT -q "INSERT INTO $name (date, Sign, ki) SELECT
toDate(0) AS date,
toInt8(1) AS Sign,
toUInt64(0) AS ki
FROM system.numbers LIMIT 9000"
FROM system.numbers LIMIT 9000" --server_logs_file=/dev/null
$CLICKHOUSE_CLIENT -q "INSERT INTO $name (date, Sign, ki) SELECT
toDate(0) AS date,
toInt8(1) AS Sign,
number AS ki
FROM system.numbers LIMIT 9000, 9000"
FROM system.numbers LIMIT 9000, 9000" --server_logs_file=/dev/null
$CLICKHOUSE_CLIENT -q "INSERT INTO $name SELECT
toDate(0) AS date,
@ -67,7 +67,7 @@ number AS di09,
number AS di10,
[number, number+1] AS \`n.i\`,
[hex(number), hex(number+1)] AS \`n.s\`
FROM system.numbers LIMIT $res_rows"
FROM system.numbers LIMIT $res_rows" --server_logs_file=/dev/null
while [[ $(get_num_parts) -ne 1 ]] ; do $CLICKHOUSE_CLIENT -q "OPTIMIZE TABLE $name PARTITION 197001" --server_logs_file=/dev/null; done

View File

@ -1,3 +1,5 @@
SET optimize_on_insert = 0;
SELECT '*** Replicated with sampling ***';
DROP TABLE IF EXISTS replicated_with_sampling;

View File

@ -1,3 +1,5 @@
set optimize_on_insert = 0;
drop table if exists mult_tab;
create table mult_tab (date Date, value String, version UInt64, sign Int8) engine = VersionedCollapsingMergeTree(date, (date), 8192, sign, version);
insert into mult_tab select '2018-01-31', 'str_' || toString(number), 0, if(number % 2, 1, -1) from system.numbers limit 10;

View File

@ -1,3 +1,5 @@
set optimize_on_insert = 0;
drop table if exists tab_00577;
create table tab_00577 (date Date, version UInt64, val UInt64) engine = ReplacingMergeTree(version) partition by date order by date settings enable_vertical_merge_algorithm = 1, vertical_merge_algorithm_min_rows_to_activate = 1, vertical_merge_algorithm_min_columns_to_activate = 0;
insert into tab_00577 values ('2018-01-01', 2, 2), ('2018-01-01', 1, 1);

View File

@ -1,3 +1,5 @@
SET optimize_on_insert = 0;
DROP TABLE IF EXISTS test_00616;
DROP TABLE IF EXISTS replacing_00616;

View File

@ -1,3 +1,5 @@
SET optimize_on_insert = 0;
DROP TABLE IF EXISTS partitioned_by_tuple;
CREATE TABLE partitioned_by_tuple (d Date, x UInt8, w String, y UInt8) ENGINE SummingMergeTree (y) PARTITION BY (d, x) ORDER BY (d, x, w);

View File

@ -1,3 +1,5 @@
SET optimize_on_insert = 0;
DROP TABLE IF EXISTS partitioned_by_tuple_replica1_00661;
DROP TABLE IF EXISTS partitioned_by_tuple_replica2_00661;
CREATE TABLE partitioned_by_tuple_replica1_00661(d Date, x UInt8, w String, y UInt8) ENGINE = ReplicatedSummingMergeTree('/clickhouse/tables/test/partitioned_by_tuple_00661', '1') PARTITION BY (d, x) ORDER BY (d, x, w);

View File

@ -1,4 +1,5 @@
SET send_logs_level = 'fatal';
SET optimize_on_insert = 0;
DROP TABLE IF EXISTS old_style;
CREATE TABLE old_style(d Date, x UInt32) ENGINE MergeTree(d, x, 8192);

View File

@ -1,3 +1,5 @@
SET optimize_on_insert = 0;
SET send_logs_level = 'fatal';
DROP TABLE IF EXISTS old_style;

View File

@ -1,3 +1,5 @@
SET optimize_on_insert = 0;
select '-- SummingMergeTree with Nullable column without duplicates.';
drop table if exists tst;

View File

@ -1,3 +1,5 @@
SET optimize_on_insert = 0;
DROP TABLE IF EXISTS data_01285;
SET max_threads=1;

View File

@ -1,3 +1,5 @@
SET optimize_on_insert = 0;
DROP TABLE IF EXISTS tags;
CREATE TABLE tags (

View File

@ -1,3 +1,5 @@
SET optimize_on_insert = 0;
DROP TABLE IF EXISTS tt_01373;
CREATE TABLE tt_01373

View File

@ -0,0 +1,13 @@
Replacing Merge Tree
1 2020-01-01 00:00:00
2 2020-01-02 00:00:00
Collapsing Merge Tree
1 1 2020-01-01 00:00:00
Versioned Collapsing Merge Tree
1 1 2 2020-01-01 00:00:00
Summing Merge Tree
1 6 2020-01-01 00:00:00
2 6 2020-01-02 00:00:00
Aggregating Merge Tree
1 5 2020-01-01 00:00:00
2 5 2020-01-02 00:00:00

View File

@ -0,0 +1,35 @@
SELECT 'Replacing Merge Tree';
DROP TABLE IF EXISTS replacing_merge_tree;
CREATE TABLE replacing_merge_tree (key UInt32, date Datetime) ENGINE=ReplacingMergeTree() PARTITION BY date ORDER BY key;
INSERT INTO replacing_merge_tree VALUES (1, '2020-01-01'), (2, '2020-01-02'), (1, '2020-01-01'), (2, '2020-01-02');
SELECT * FROM replacing_merge_tree ORDER BY key;
DROP TABLE replacing_merge_tree;
SELECT 'Collapsing Merge Tree';
DROP TABLE IF EXISTS collapsing_merge_tree;
CREATE TABLE collapsing_merge_tree (key UInt32, sign Int8, date Datetime) ENGINE=CollapsingMergeTree(sign) PARTITION BY date ORDER BY key;
INSERT INTO collapsing_merge_tree VALUES (1, 1, '2020-01-01'), (2, 1, '2020-01-02'), (1, -1, '2020-01-01'), (2, -1, '2020-01-02'), (1, 1, '2020-01-01');
SELECT * FROM collapsing_merge_tree ORDER BY key;
DROP TABLE collapsing_merge_tree;
SELECT 'Versioned Collapsing Merge Tree';
DROP TABLE IF EXISTS versioned_collapsing_merge_tree;
CREATE TABLE versioned_collapsing_merge_tree (key UInt32, sign Int8, version Int32, date Datetime) ENGINE=VersionedCollapsingMergeTree(sign, version) PARTITION BY date ORDER BY (key, version);
INSERT INTO versioned_collapsing_merge_tree VALUES (1, 1, 1, '2020-01-01'), (1, -1, 1, '2020-01-01'), (1, 1, 2, '2020-01-01');
SELECT * FROM versioned_collapsing_merge_tree ORDER BY key;
DROP TABLE versioned_collapsing_merge_tree;
SELECT 'Summing Merge Tree';
DROP TABLE IF EXISTS summing_merge_tree;
CREATE TABLE summing_merge_tree (key UInt32, val UInt32, date Datetime) ENGINE=SummingMergeTree(val) PARTITION BY date ORDER BY key;
INSERT INTO summing_merge_tree VALUES (1, 1, '2020-01-01'), (2, 1, '2020-01-02'), (1, 5, '2020-01-01'), (2, 5, '2020-01-02');
SELECT * FROM summing_merge_tree ORDER BY key;
DROP TABLE summing_merge_tree;
SELECT 'Aggregating Merge Tree';
DROP TABLE IF EXISTS aggregating_merge_tree;
CREATE TABLE aggregating_merge_tree (key UInt32, val SimpleAggregateFunction(max, UInt32), date Datetime) ENGINE=AggregatingMergeTree() PARTITION BY date ORDER BY key;
INSERT INTO aggregating_merge_tree VALUES (1, 1, '2020-01-01'), (2, 1, '2020-01-02'), (1, 5, '2020-01-01'), (2, 5, '2020-01-02');
SELECT * FROM aggregating_merge_tree ORDER BY key;
DROP TABLE aggregating_merge_tree;

View File

@ -0,0 +1,15 @@
#!/usr/bin/env bash
CURDIR=$(cd "$(dirname "${BASH_SOURCE[0]}")" && pwd)
. "$CURDIR"/../shell_config.sh
common_opts=(
"--format=Null"
"--max_threads=1"
"--max_distributed_connections=3"
)
# NOTE: the test use higher timeout to avoid flakiness.
timeout 9s ${CLICKHOUSE_CLIENT} "$@" "${common_opts[@]}" -q "select sleep(3) from remote('127.{1,2,3,4,5}', system.one)" --prefer_localhost_replica=0
timeout 9s ${CLICKHOUSE_CLIENT} "$@" "${common_opts[@]}" -q "select sleep(3) from remote('127.{1,2,3,4,5}', system.one)" --prefer_localhost_replica=1