diff --git a/contrib/libunwind b/contrib/libunwind
index 51b84d9b6d2..8fe25d7dc70 160000
--- a/contrib/libunwind
+++ b/contrib/libunwind
@@ -1 +1 @@
-Subproject commit 51b84d9b6d2548f1cbdcafe622d5a753853b6149
+Subproject commit 8fe25d7dc70f2a4ea38c3e5a33fa9d4199b67a5a
diff --git a/docker/images.json b/docker/images.json
index f5b10a14313..e0859d3e17c 100644
--- a/docker/images.json
+++ b/docker/images.json
@@ -58,8 +58,7 @@
"docker/test/stateless": {
"name": "yandex/clickhouse-stateless-test",
"dependent": [
- "docker/test/stateful",
- "docker/test/stateful_with_coverage"
+ "docker/test/stateful"
]
},
"docker/test/stateless_pytest": {
@@ -68,7 +67,9 @@
},
"docker/test/stateless_with_coverage": {
"name": "yandex/clickhouse-stateless-test-with-coverage",
- "dependent": []
+ "dependent": [
+ "docker/test/stateful_with_coverage"
+ ]
},
"docker/test/unit": {
"name": "yandex/clickhouse-unit-test",
diff --git a/docker/test/base/Dockerfile b/docker/test/base/Dockerfile
index 61a40673a96..e8653c2122e 100644
--- a/docker/test/base/Dockerfile
+++ b/docker/test/base/Dockerfile
@@ -1,5 +1,5 @@
# docker build -t yandex/clickhouse-test-base .
-FROM ubuntu:19.10
+FROM ubuntu:20.04
ENV DEBIAN_FRONTEND=noninteractive LLVM_VERSION=11
diff --git a/docker/test/stateful_with_coverage/Dockerfile b/docker/test/stateful_with_coverage/Dockerfile
index ac6645b9463..07acf4ed4e6 100644
--- a/docker/test/stateful_with_coverage/Dockerfile
+++ b/docker/test/stateful_with_coverage/Dockerfile
@@ -1,8 +1,6 @@
# docker build -t yandex/clickhouse-stateful-test-with-coverage .
FROM yandex/clickhouse-stateless-test-with-coverage
-RUN echo "deb [trusted=yes] http://apt.llvm.org/bionic/ llvm-toolchain-bionic-9 main" >> /etc/apt/sources.list
-
RUN apt-get update -y \
&& env DEBIAN_FRONTEND=noninteractive \
apt-get install --yes --no-install-recommends \
diff --git a/docs/en/introduction/performance.md b/docs/en/introduction/performance.md
index d2780aedccb..6e5710347a1 100644
--- a/docs/en/introduction/performance.md
+++ b/docs/en/introduction/performance.md
@@ -25,6 +25,6 @@ Under the same conditions, ClickHouse can handle several hundred queries per sec
## Performance When Inserting Data {#performance-when-inserting-data}
-We recommend inserting data in packets of at least 1000 rows, or no more than a single request per second. When inserting to a MergeTree table from a tab-separated dump, the insertion speed can be from 50 to 200 MB/s. If the inserted rows are around 1 Kb in size, the speed will be from 50,000 to 200,000 rows per second. If the rows are small, the performance can be higher in rows per second (on Banner System data -`>` 500,000 rows per second; on Graphite data -`>` 1,000,000 rows per second). To improve performance, you can make multiple INSERT queries in parallel, which scales linearly.
+We recommend inserting data in packets of at least 1000 rows, or no more than a single request per second. When inserting to a MergeTree table from a tab-separated dump, the insertion speed can be from 50 to 200 MB/s. If the inserted rows are around 1 KB in size, the speed will be from 50,000 to 200,000 rows per second. If the rows are small, the performance can be higher in rows per second (on Banner System data -`>` 500,000 rows per second; on Graphite data -`>` 1,000,000 rows per second). To improve performance, you can make multiple INSERT queries in parallel, which scales linearly.
{## [Original article](https://clickhouse.tech/docs/en/introduction/performance/) ##}
diff --git a/docs/en/operations/tips.md b/docs/en/operations/tips.md
index 024eba1c899..e62dea0b04e 100644
--- a/docs/en/operations/tips.md
+++ b/docs/en/operations/tips.md
@@ -91,6 +91,23 @@ The Linux kernel prior to 3.2 had a multitude of problems with IPv6 implementati
Use at least a 10 GB network, if possible. 1 Gb will also work, but it will be much worse for patching replicas with tens of terabytes of data, or for processing distributed queries with a large amount of intermediate data.
+## Hypervisor configuration
+
+If you are using OpenStack, set
+```
+cpu_mode=host-passthrough
+```
+in nova.conf.
+
+If you are using libvirt, set
+```
+
+```
+in XML configuration.
+
+This is important for ClickHouse to be able to get correct information with `cpuid` instruction.
+Otherwise you may get `Illegal instruction` crashes when hypervisor is run on old CPU models.
+
## ZooKeeper {#zookeeper}
You are probably already using ZooKeeper for other purposes. You can use the same installation of ZooKeeper, if it isn’t already overloaded.
diff --git a/docs/ru/sql-reference/functions/date-time-functions.md b/docs/ru/sql-reference/functions/date-time-functions.md
index b7a077b3bd6..31482cde77f 100644
--- a/docs/ru/sql-reference/functions/date-time-functions.md
+++ b/docs/ru/sql-reference/functions/date-time-functions.md
@@ -593,6 +593,18 @@ SELECT dateDiff('hour', toDateTime('2018-01-01 22:00:00'), toDateTime('2018-01-0
Например, `timeSlots(toDateTime('2012-01-01 12:20:00'), toUInt32(600)) = [toDateTime('2012-01-01 12:00:00'), toDateTime('2012-01-01 12:30:00')]`.
Это нужно для поиска хитов, входящих в соответствующий визит.
+## toYYYYMM
+
+Переводит дату или дату со временем в число типа UInt32, содержащее номер года и месяца (YYYY * 100 + MM).
+
+## toYYYYMMDD
+
+Переводит дату или дату со временем в число типа UInt32, содержащее номер года, месяца и дня (YYYY * 10000 + MM * 100 + DD).
+
+## toYYYYMMDDhhmmss
+
+Переводит дату или дату со временем в число типа UInt64 содержащее номер года, месяца, дня и время (YYYY * 10000000000 + MM * 100000000 + DD * 1000000 + hh * 10000 + mm * 100 + ss).
+
## formatDateTime {#formatdatetime}
Функция преобразует дату-и-время в строку по заданному шаблону. Важно: шаблон — константное выражение, поэтому использовать разные шаблоны в одной колонке не получится.
diff --git a/docs/zh/sql-reference/statements/select/limit.md b/docs/zh/sql-reference/statements/select/limit.md
index b079248deca..d6d827552b0 100644
--- a/docs/zh/sql-reference/statements/select/limit.md
+++ b/docs/zh/sql-reference/statements/select/limit.md
@@ -14,7 +14,7 @@ toc_title: LIMIT
## LIMIT … WITH TIES 修饰符 {#limit-with-ties}
-如果为 `LIMIT n[,m]` 设置了 `WITH TIES` ,并且声明了 `ORDER BY expr_list`, you will get in result first `n` or `n,m` rows and all rows with same `ORDER BY` fields values equal to row at position `n` for `LIMIT n` and `m` for `LIMIT n,m`.
+如果为 `LIMIT n[,m]` 设置了 `WITH TIES` ,并且声明了 `ORDER BY expr_list`, 除了得到无修饰符的结果(正常情况下的 `limit n`, 前n行数据), 还会返回与第`n`行具有相同排序字段的行(即如果第n+1行的字段与第n行 拥有相同的排序字段,同样返回该结果.
此修饰符可以与: [ORDER BY … WITH FILL modifier](../../../sql-reference/statements/select/order-by.md#orderby-with-fill) 组合使用.
@@ -38,7 +38,7 @@ SELECT * FROM (
└───┘
```
-单子执行了 `WITH TIES` 修饰符后
+添加 `WITH TIES` 修饰符后
``` sql
SELECT * FROM (
@@ -59,4 +59,8 @@ SELECT * FROM (
└───┘
```
-cause row number 6 have same value “2” for field `n` as row number 5
+虽然指定了`LIMIT 5`, 但第6行的`n`字段值为2,与第5行相同,因此也作为满足条件的记录返回。
+简而言之,该修饰符可理解为是否增加“并列行”的数据。
+
+``` sql,
+``` sql
diff --git a/src/Core/Settings.h b/src/Core/Settings.h
index 84d1c912b96..bcc45f8c5b4 100644
--- a/src/Core/Settings.h
+++ b/src/Core/Settings.h
@@ -402,6 +402,7 @@ class IColumn;
M(Bool, enable_global_with_statement, false, "Propagate WITH statements to UNION queries and all subqueries", 0) \
M(Bool, aggregate_functions_null_for_empty, false, "Rewrite all aggregate functions in a query, adding -OrNull suffix to them", 0) \
M(Bool, optimize_skip_merged_partitions, false, "Skip partitions with one part with level > 0 in optimize final", 0) \
+ M(Bool, optimize_on_insert, true, "Do the same transformation for inserted block of data as if merge was done on this block.", 0) \
\
M(Bool, use_antlr_parser, false, "Parse incoming queries using ANTLR-generated parser", 0) \
\
diff --git a/src/Core/SortCursor.h b/src/Core/SortCursor.h
index 7a222f70199..f383c3ded8e 100644
--- a/src/Core/SortCursor.h
+++ b/src/Core/SortCursor.h
@@ -30,7 +30,6 @@ struct SortCursorImpl
ColumnRawPtrs all_columns;
SortDescription desc;
size_t sort_columns_size = 0;
- size_t pos = 0;
size_t rows = 0;
/** Determines order if comparing columns are equal.
@@ -49,15 +48,20 @@ struct SortCursorImpl
/** Is there at least one column with Collator. */
bool has_collation = false;
+ /** We could use SortCursorImpl in case when columns aren't sorted
+ * but we have their sorted permutation
+ */
+ IColumn::Permutation * permutation = nullptr;
+
SortCursorImpl() {}
- SortCursorImpl(const Block & block, const SortDescription & desc_, size_t order_ = 0)
+ SortCursorImpl(const Block & block, const SortDescription & desc_, size_t order_ = 0, IColumn::Permutation * perm = nullptr)
: desc(desc_), sort_columns_size(desc.size()), order(order_), need_collation(desc.size())
{
- reset(block);
+ reset(block, perm);
}
- SortCursorImpl(const Columns & columns, const SortDescription & desc_, size_t order_ = 0)
+ SortCursorImpl(const Columns & columns, const SortDescription & desc_, size_t order_ = 0, IColumn::Permutation * perm = nullptr)
: desc(desc_), sort_columns_size(desc.size()), order(order_), need_collation(desc.size())
{
for (auto & column_desc : desc)
@@ -66,19 +70,19 @@ struct SortCursorImpl
throw Exception("SortDescription should contain column position if SortCursor was used without header.",
ErrorCodes::LOGICAL_ERROR);
}
- reset(columns, {});
+ reset(columns, {}, perm);
}
bool empty() const { return rows == 0; }
/// Set the cursor to the beginning of the new block.
- void reset(const Block & block)
+ void reset(const Block & block, IColumn::Permutation * perm = nullptr)
{
- reset(block.getColumns(), block);
+ reset(block.getColumns(), block, perm);
}
/// Set the cursor to the beginning of the new block.
- void reset(const Columns & columns, const Block & block)
+ void reset(const Columns & columns, const Block & block, IColumn::Permutation * perm = nullptr)
{
all_columns.clear();
sort_columns.clear();
@@ -96,18 +100,33 @@ struct SortCursorImpl
: column_desc.column_number;
sort_columns.push_back(columns[column_number].get());
- need_collation[j] = desc[j].collator != nullptr && sort_columns.back()->isCollationSupported(); /// TODO Nullable(String)
+ need_collation[j] = desc[j].collator != nullptr && sort_columns.back()->isCollationSupported();
has_collation |= need_collation[j];
}
pos = 0;
rows = all_columns[0]->size();
+ permutation = perm;
}
+ size_t getRow() const
+ {
+ if (permutation)
+ return (*permutation)[pos];
+ return pos;
+ }
+
+ /// We need a possibility to change pos (see MergeJoin).
+ size_t & getPosRef() { return pos; }
+
bool isFirst() const { return pos == 0; }
bool isLast() const { return pos + 1 >= rows; }
bool isValid() const { return pos < rows; }
void next() { ++pos; }
+
+/// Prevent using pos instead of getRow()
+private:
+ size_t pos;
};
using SortCursorImpls = std::vector;
@@ -127,7 +146,7 @@ struct SortCursorHelper
bool ALWAYS_INLINE greater(const SortCursorHelper & rhs) const
{
- return derived().greaterAt(rhs.derived(), impl->pos, rhs.impl->pos);
+ return derived().greaterAt(rhs.derived(), impl->getRow(), rhs.impl->getRow());
}
/// Inverted so that the priority queue elements are removed in ascending order.
diff --git a/src/DataStreams/MergingSortedBlockInputStream.cpp b/src/DataStreams/MergingSortedBlockInputStream.cpp
index ee6b93dc8e4..b7396a23d6a 100644
--- a/src/DataStreams/MergingSortedBlockInputStream.cpp
+++ b/src/DataStreams/MergingSortedBlockInputStream.cpp
@@ -222,7 +222,7 @@ void MergingSortedBlockInputStream::merge(MutableColumns & merged_columns, TSort
// std::cerr << "total_merged_rows: " << total_merged_rows << ", merged_rows: " << merged_rows << "\n";
// std::cerr << "Inserting row\n";
for (size_t i = 0; i < num_columns; ++i)
- merged_columns[i]->insertFrom(*current->all_columns[i], current->pos);
+ merged_columns[i]->insertFrom(*current->all_columns[i], current->getRow());
if (out_row_sources_buf)
{
diff --git a/src/Interpreters/InterpreterSelectQuery.cpp b/src/Interpreters/InterpreterSelectQuery.cpp
index 81b16ac3d67..84d7a72016f 100644
--- a/src/Interpreters/InterpreterSelectQuery.cpp
+++ b/src/Interpreters/InterpreterSelectQuery.cpp
@@ -1557,7 +1557,12 @@ void InterpreterSelectQuery::executeFetchColumns(
throw Exception("Logical error in InterpreterSelectQuery: nowhere to read", ErrorCodes::LOGICAL_ERROR);
/// Specify the number of threads only if it wasn't specified in storage.
- if (!query_plan.getMaxThreads())
+ ///
+ /// But in case of remote query and prefer_localhost_replica=1 (default)
+ /// The inner local query (that is done in the same process, without
+ /// network interaction), it will setMaxThreads earlier and distributed
+ /// query will not update it.
+ if (!query_plan.getMaxThreads() || is_remote)
query_plan.setMaxThreads(max_threads_execute_query);
/// Aliases in table declaration.
diff --git a/src/Interpreters/MergeJoin.cpp b/src/Interpreters/MergeJoin.cpp
index 60357187270..4839980eac7 100644
--- a/src/Interpreters/MergeJoin.cpp
+++ b/src/Interpreters/MergeJoin.cpp
@@ -180,12 +180,16 @@ class MergeJoinCursor
public:
MergeJoinCursor(const Block & block, const SortDescription & desc_)
: impl(SortCursorImpl(block, desc_))
- {}
+ {
+ /// SortCursorImpl can work with permutation, but MergeJoinCursor can't.
+ if (impl.permutation)
+ throw Exception("Logical error: MergeJoinCursor doesn't support permutation", ErrorCodes::LOGICAL_ERROR);
+ }
- size_t position() const { return impl.pos; }
+ size_t position() const { return impl.getRow(); }
size_t end() const { return impl.rows; }
- bool atEnd() const { return impl.pos >= impl.rows; }
- void nextN(size_t num) { impl.pos += num; }
+ bool atEnd() const { return impl.getRow() >= impl.rows; }
+ void nextN(size_t num) { impl.getPosRef() += num; }
void setCompareNullability(const MergeJoinCursor & rhs)
{
@@ -254,10 +258,10 @@ private:
else if (cmp > 0)
rhs.impl.next();
else if (!cmp)
- return Range{impl.pos, rhs.impl.pos, getEqualLength(), rhs.getEqualLength()};
+ return Range{impl.getRow(), rhs.impl.getRow(), getEqualLength(), rhs.getEqualLength()};
}
- return Range{impl.pos, rhs.impl.pos, 0, 0};
+ return Range{impl.getRow(), rhs.impl.getRow(), 0, 0};
}
template
@@ -268,7 +272,7 @@ private:
const auto * left_column = impl.sort_columns[i];
const auto * right_column = rhs.impl.sort_columns[i];
- int res = nullableCompareAt(*left_column, *right_column, impl.pos, rhs.impl.pos);
+ int res = nullableCompareAt(*left_column, *right_column, impl.getRow(), rhs.impl.getRow());
if (res)
return res;
}
@@ -278,11 +282,11 @@ private:
/// Expects !atEnd()
size_t getEqualLength()
{
- size_t pos = impl.pos + 1;
+ size_t pos = impl.getRow() + 1;
for (; pos < impl.rows; ++pos)
if (!samePrev(pos))
break;
- return pos - impl.pos;
+ return pos - impl.getRow();
}
/// Expects lhs_pos > 0
diff --git a/src/Processors/Merges/Algorithms/AggregatingSortedAlgorithm.cpp b/src/Processors/Merges/Algorithms/AggregatingSortedAlgorithm.cpp
index 11d68372b10..6b2ee1c8039 100644
--- a/src/Processors/Merges/Algorithms/AggregatingSortedAlgorithm.cpp
+++ b/src/Processors/Merges/Algorithms/AggregatingSortedAlgorithm.cpp
@@ -257,12 +257,12 @@ void AggregatingSortedAlgorithm::AggregatingMergedData::addRow(SortCursor & curs
throw Exception("Can't add a row to the group because it was not started.", ErrorCodes::LOGICAL_ERROR);
for (auto & desc : def.columns_to_aggregate)
- desc.column->insertMergeFrom(*cursor->all_columns[desc.column_number], cursor->pos);
+ desc.column->insertMergeFrom(*cursor->all_columns[desc.column_number], cursor->getRow());
for (auto & desc : def.columns_to_simple_aggregate)
{
auto & col = cursor->all_columns[desc.column_number];
- desc.add_function(desc.function.get(), desc.state.data(), &col, cursor->pos, arena.get());
+ desc.add_function(desc.function.get(), desc.state.data(), &col, cursor->getRow(), arena.get());
}
}
@@ -352,7 +352,7 @@ IMergingAlgorithm::Status AggregatingSortedAlgorithm::merge()
return Status(merged_data.pull());
}
- merged_data.startGroup(current->all_columns, current->pos);
+ merged_data.startGroup(current->all_columns, current->getRow());
}
merged_data.addRow(current);
diff --git a/src/Processors/Merges/Algorithms/CollapsingSortedAlgorithm.cpp b/src/Processors/Merges/Algorithms/CollapsingSortedAlgorithm.cpp
index 8f6888b9a7a..ccb66259e2e 100644
--- a/src/Processors/Merges/Algorithms/CollapsingSortedAlgorithm.cpp
+++ b/src/Processors/Merges/Algorithms/CollapsingSortedAlgorithm.cpp
@@ -26,9 +26,9 @@ CollapsingSortedAlgorithm::CollapsingSortedAlgorithm(
const String & sign_column,
bool only_positive_sign_,
size_t max_block_size,
+ Poco::Logger * log_,
WriteBuffer * out_row_sources_buf_,
- bool use_average_block_sizes,
- Poco::Logger * log_)
+ bool use_average_block_sizes)
: IMergingAlgorithmWithSharedChunks(num_inputs, std::move(description_), out_row_sources_buf_, max_row_refs)
, merged_data(header.cloneEmptyColumns(), use_average_block_sizes, max_block_size)
, sign_column_number(header.getPositionByName(sign_column))
@@ -123,7 +123,7 @@ IMergingAlgorithm::Status CollapsingSortedAlgorithm::merge()
return Status(current.impl->order);
}
- Int8 sign = assert_cast(*current->all_columns[sign_column_number]).getData()[current->pos];
+ Int8 sign = assert_cast(*current->all_columns[sign_column_number]).getData()[current->getRow()];
RowRef current_row;
setRowRef(current_row, current);
diff --git a/src/Processors/Merges/Algorithms/CollapsingSortedAlgorithm.h b/src/Processors/Merges/Algorithms/CollapsingSortedAlgorithm.h
index d95fac2f02b..028715f715b 100644
--- a/src/Processors/Merges/Algorithms/CollapsingSortedAlgorithm.h
+++ b/src/Processors/Merges/Algorithms/CollapsingSortedAlgorithm.h
@@ -33,9 +33,9 @@ public:
const String & sign_column,
bool only_positive_sign_, /// For select final. Skip rows with sum(sign) < 0.
size_t max_block_size,
- WriteBuffer * out_row_sources_buf_,
- bool use_average_block_sizes,
- Poco::Logger * log_);
+ Poco::Logger * log_,
+ WriteBuffer * out_row_sources_buf_ = nullptr,
+ bool use_average_block_sizes = false);
Status merge() override;
diff --git a/src/Processors/Merges/Algorithms/GraphiteRollupSortedAlgorithm.cpp b/src/Processors/Merges/Algorithms/GraphiteRollupSortedAlgorithm.cpp
index 0e704e5a05b..df10fb26d40 100644
--- a/src/Processors/Merges/Algorithms/GraphiteRollupSortedAlgorithm.cpp
+++ b/src/Processors/Merges/Algorithms/GraphiteRollupSortedAlgorithm.cpp
@@ -164,12 +164,12 @@ IMergingAlgorithm::Status GraphiteRollupSortedAlgorithm::merge()
return Status(current.impl->order);
}
- StringRef next_path = current->all_columns[columns_definition.path_column_num]->getDataAt(current->pos);
+ StringRef next_path = current->all_columns[columns_definition.path_column_num]->getDataAt(current->getRow());
bool new_path = is_first || next_path != current_group_path;
is_first = false;
- time_t next_row_time = current->all_columns[columns_definition.time_column_num]->getUInt(current->pos);
+ time_t next_row_time = current->all_columns[columns_definition.time_column_num]->getUInt(current->getRow());
/// Is new key before rounding.
bool is_new_key = new_path || next_row_time != current_time;
@@ -227,7 +227,7 @@ IMergingAlgorithm::Status GraphiteRollupSortedAlgorithm::merge()
/// and for rows with same maximum version - only last row.
if (is_new_key
|| current->all_columns[columns_definition.version_column_num]->compareAt(
- current->pos, current_subgroup_newest_row.row_num,
+ current->getRow(), current_subgroup_newest_row.row_num,
*(*current_subgroup_newest_row.all_columns)[columns_definition.version_column_num],
/* nan_direction_hint = */ 1) >= 0)
{
@@ -263,7 +263,7 @@ IMergingAlgorithm::Status GraphiteRollupSortedAlgorithm::merge()
void GraphiteRollupSortedAlgorithm::startNextGroup(SortCursor & cursor, Graphite::RollupRule next_rule)
{
- merged_data.startNextGroup(cursor->all_columns, cursor->pos, next_rule, columns_definition);
+ merged_data.startNextGroup(cursor->all_columns, cursor->getRow(), next_rule, columns_definition);
}
void GraphiteRollupSortedAlgorithm::finishCurrentGroup()
diff --git a/src/Processors/Merges/Algorithms/IMergingAlgorithm.h b/src/Processors/Merges/Algorithms/IMergingAlgorithm.h
index f86be2a7d1b..5c8d18875e7 100644
--- a/src/Processors/Merges/Algorithms/IMergingAlgorithm.h
+++ b/src/Processors/Merges/Algorithms/IMergingAlgorithm.h
@@ -29,6 +29,8 @@ public:
/// between different algorithm objects in parallel FINAL.
bool skip_last_row = false;
+ IColumn::Permutation * permutation = nullptr;
+
void swap(Input & other)
{
chunk.swap(other.chunk);
diff --git a/src/Processors/Merges/Algorithms/IMergingAlgorithmWithDelayedChunk.cpp b/src/Processors/Merges/Algorithms/IMergingAlgorithmWithDelayedChunk.cpp
index 0b13d689636..e4c60d7609c 100644
--- a/src/Processors/Merges/Algorithms/IMergingAlgorithmWithDelayedChunk.cpp
+++ b/src/Processors/Merges/Algorithms/IMergingAlgorithmWithDelayedChunk.cpp
@@ -22,7 +22,7 @@ void IMergingAlgorithmWithDelayedChunk::initializeQueue(Inputs inputs)
if (!current_inputs[source_num].chunk)
continue;
- cursors[source_num] = SortCursorImpl(current_inputs[source_num].chunk.getColumns(), description, source_num);
+ cursors[source_num] = SortCursorImpl(current_inputs[source_num].chunk.getColumns(), description, source_num, current_inputs[source_num].permutation);
}
queue = SortingHeap(cursors);
@@ -37,7 +37,7 @@ void IMergingAlgorithmWithDelayedChunk::updateCursor(Input & input, size_t sourc
last_chunk_sort_columns = std::move(cursors[source_num].sort_columns);
current_input.swap(input);
- cursors[source_num].reset(current_input.chunk.getColumns(), {});
+ cursors[source_num].reset(current_input.chunk.getColumns(), {}, current_input.permutation);
queue.push(cursors[source_num]);
}
diff --git a/src/Processors/Merges/Algorithms/IMergingAlgorithmWithSharedChunks.cpp b/src/Processors/Merges/Algorithms/IMergingAlgorithmWithSharedChunks.cpp
index 39abe5c0ec7..97abffdc167 100644
--- a/src/Processors/Merges/Algorithms/IMergingAlgorithmWithSharedChunks.cpp
+++ b/src/Processors/Merges/Algorithms/IMergingAlgorithmWithSharedChunks.cpp
@@ -39,7 +39,7 @@ void IMergingAlgorithmWithSharedChunks::initialize(Inputs inputs)
source.skip_last_row = inputs[source_num].skip_last_row;
source.chunk = chunk_allocator.alloc(inputs[source_num].chunk);
- cursors[source_num] = SortCursorImpl(source.chunk->getColumns(), description, source_num);
+ cursors[source_num] = SortCursorImpl(source.chunk->getColumns(), description, source_num, inputs[source_num].permutation);
source.chunk->all_columns = cursors[source_num].all_columns;
source.chunk->sort_columns = cursors[source_num].sort_columns;
@@ -55,7 +55,7 @@ void IMergingAlgorithmWithSharedChunks::consume(Input & input, size_t source_num
auto & source = sources[source_num];
source.skip_last_row = input.skip_last_row;
source.chunk = chunk_allocator.alloc(input.chunk);
- cursors[source_num].reset(source.chunk->getColumns(), {});
+ cursors[source_num].reset(source.chunk->getColumns(), {}, input.permutation);
source.chunk->all_columns = cursors[source_num].all_columns;
source.chunk->sort_columns = cursors[source_num].sort_columns;
diff --git a/src/Processors/Merges/Algorithms/MergingSortedAlgorithm.cpp b/src/Processors/Merges/Algorithms/MergingSortedAlgorithm.cpp
index ee13ef70203..511bd9dd74f 100644
--- a/src/Processors/Merges/Algorithms/MergingSortedAlgorithm.cpp
+++ b/src/Processors/Merges/Algorithms/MergingSortedAlgorithm.cpp
@@ -139,7 +139,7 @@ IMergingAlgorithm::Status MergingSortedAlgorithm::mergeImpl(TSortingHeap & queue
//std::cerr << "total_merged_rows: " << total_merged_rows << ", merged_rows: " << merged_rows << "\n";
//std::cerr << "Inserting row\n";
- merged_data.insertRow(current->all_columns, current->pos, current->rows);
+ merged_data.insertRow(current->all_columns, current->getRow(), current->rows);
if (out_row_sources_buf)
{
diff --git a/src/Processors/Merges/Algorithms/MergingSortedAlgorithm.h b/src/Processors/Merges/Algorithms/MergingSortedAlgorithm.h
index 531b2636747..63dced26dd4 100644
--- a/src/Processors/Merges/Algorithms/MergingSortedAlgorithm.h
+++ b/src/Processors/Merges/Algorithms/MergingSortedAlgorithm.h
@@ -18,9 +18,9 @@ public:
size_t num_inputs,
SortDescription description_,
size_t max_block_size,
- UInt64 limit_,
- WriteBuffer * out_row_sources_buf_,
- bool use_average_block_sizes);
+ UInt64 limit_ = 0,
+ WriteBuffer * out_row_sources_buf_ = nullptr,
+ bool use_average_block_sizes = false);
void addInput();
diff --git a/src/Processors/Merges/Algorithms/ReplacingSortedAlgorithm.cpp b/src/Processors/Merges/Algorithms/ReplacingSortedAlgorithm.cpp
index 3ee0df0efd8..132241844d7 100644
--- a/src/Processors/Merges/Algorithms/ReplacingSortedAlgorithm.cpp
+++ b/src/Processors/Merges/Algorithms/ReplacingSortedAlgorithm.cpp
@@ -73,7 +73,7 @@ IMergingAlgorithm::Status ReplacingSortedAlgorithm::merge()
if (version_column_number == -1
|| selected_row.empty()
|| current->all_columns[version_column_number]->compareAt(
- current->pos, selected_row.row_num,
+ current->getRow(), selected_row.row_num,
*(*selected_row.all_columns)[version_column_number],
/* nan_direction_hint = */ 1) >= 0)
{
diff --git a/src/Processors/Merges/Algorithms/RowRef.h b/src/Processors/Merges/Algorithms/RowRef.h
index 1b4da9781f8..e4610c88581 100644
--- a/src/Processors/Merges/Algorithms/RowRef.h
+++ b/src/Processors/Merges/Algorithms/RowRef.h
@@ -136,7 +136,7 @@ struct RowRef
{
sort_columns = cursor.impl->sort_columns.data();
num_columns = cursor.impl->sort_columns.size();
- row_num = cursor.impl->pos;
+ row_num = cursor.impl->getRow();
}
static bool checkEquals(size_t size, const IColumn ** lhs, size_t lhs_row, const IColumn ** rhs, size_t rhs_row)
@@ -192,7 +192,7 @@ struct RowRefWithOwnedChunk
void set(SortCursor & cursor, SharedChunkPtr chunk)
{
owned_chunk = std::move(chunk);
- row_num = cursor.impl->pos;
+ row_num = cursor.impl->getRow();
all_columns = &owned_chunk->all_columns;
sort_columns = &owned_chunk->sort_columns;
}
diff --git a/src/Processors/Merges/Algorithms/SummingSortedAlgorithm.cpp b/src/Processors/Merges/Algorithms/SummingSortedAlgorithm.cpp
index 17e5e4364ff..f558a6f0677 100644
--- a/src/Processors/Merges/Algorithms/SummingSortedAlgorithm.cpp
+++ b/src/Processors/Merges/Algorithms/SummingSortedAlgorithm.cpp
@@ -688,10 +688,10 @@ IMergingAlgorithm::Status SummingSortedAlgorithm::merge()
return Status(merged_data.pull());
}
- merged_data.startGroup(current->all_columns, current->pos);
+ merged_data.startGroup(current->all_columns, current->getRow());
}
else
- merged_data.addRow(current->all_columns, current->pos);
+ merged_data.addRow(current->all_columns, current->getRow());
if (!current->isLast())
{
diff --git a/src/Processors/Merges/Algorithms/VersionedCollapsingAlgorithm.cpp b/src/Processors/Merges/Algorithms/VersionedCollapsingAlgorithm.cpp
index 6273bd28371..672242b253b 100644
--- a/src/Processors/Merges/Algorithms/VersionedCollapsingAlgorithm.cpp
+++ b/src/Processors/Merges/Algorithms/VersionedCollapsingAlgorithm.cpp
@@ -73,7 +73,7 @@ IMergingAlgorithm::Status VersionedCollapsingAlgorithm::merge()
RowRef current_row;
- Int8 sign = assert_cast(*current->all_columns[sign_column_number]).getData()[current->pos];
+ Int8 sign = assert_cast(*current->all_columns[sign_column_number]).getData()[current->getRow()];
setRowRef(current_row, current);
diff --git a/src/Processors/Merges/CollapsingSortedTransform.h b/src/Processors/Merges/CollapsingSortedTransform.h
index 4e65504a101..9e6bd306eee 100644
--- a/src/Processors/Merges/CollapsingSortedTransform.h
+++ b/src/Processors/Merges/CollapsingSortedTransform.h
@@ -27,9 +27,9 @@ public:
sign_column,
only_positive_sign,
max_block_size,
+ &Poco::Logger::get("CollapsingSortedTransform"),
out_row_sources_buf_,
- use_average_block_sizes,
- &Poco::Logger::get("CollapsingSortedTransform"))
+ use_average_block_sizes)
{
}
diff --git a/src/Processors/Transforms/SortingTransform.cpp b/src/Processors/Transforms/SortingTransform.cpp
index 03c8e87ce7a..11f23530c9e 100644
--- a/src/Processors/Transforms/SortingTransform.cpp
+++ b/src/Processors/Transforms/SortingTransform.cpp
@@ -100,7 +100,7 @@ Chunk MergeSorter::mergeImpl(TSortingHeap & queue)
/// Append a row from queue.
for (size_t i = 0; i < num_columns; ++i)
- merged_columns[i]->insertFrom(*current->all_columns[i], current->pos);
+ merged_columns[i]->insertFrom(*current->all_columns[i], current->getRow());
++total_merged_rows;
++merged_rows;
diff --git a/src/Storages/MergeTree/MergeTreeBlockOutputStream.cpp b/src/Storages/MergeTree/MergeTreeBlockOutputStream.cpp
index 40714e5af31..1b5d45f0daf 100644
--- a/src/Storages/MergeTree/MergeTreeBlockOutputStream.cpp
+++ b/src/Storages/MergeTree/MergeTreeBlockOutputStream.cpp
@@ -22,7 +22,7 @@ void MergeTreeBlockOutputStream::write(const Block & block)
{
Stopwatch watch;
- MergeTreeData::MutableDataPartPtr part = storage.writer.writeTempPart(current_block, metadata_snapshot);
+ MergeTreeData::MutableDataPartPtr part = storage.writer.writeTempPart(current_block, metadata_snapshot, optimize_on_insert);
storage.renameTempPartAndAdd(part, &storage.increment);
PartLog::addNewPart(storage.global_context, part, watch.elapsed());
diff --git a/src/Storages/MergeTree/MergeTreeBlockOutputStream.h b/src/Storages/MergeTree/MergeTreeBlockOutputStream.h
index 8aae7f3e625..5853d80e3c6 100644
--- a/src/Storages/MergeTree/MergeTreeBlockOutputStream.h
+++ b/src/Storages/MergeTree/MergeTreeBlockOutputStream.h
@@ -14,10 +14,11 @@ class StorageMergeTree;
class MergeTreeBlockOutputStream : public IBlockOutputStream
{
public:
- MergeTreeBlockOutputStream(StorageMergeTree & storage_, const StorageMetadataPtr metadata_snapshot_, size_t max_parts_per_block_)
+ MergeTreeBlockOutputStream(StorageMergeTree & storage_, const StorageMetadataPtr metadata_snapshot_, size_t max_parts_per_block_, bool optimize_on_insert_)
: storage(storage_)
, metadata_snapshot(metadata_snapshot_)
, max_parts_per_block(max_parts_per_block_)
+ , optimize_on_insert(optimize_on_insert_)
{
}
@@ -28,6 +29,7 @@ private:
StorageMergeTree & storage;
StorageMetadataPtr metadata_snapshot;
size_t max_parts_per_block;
+ bool optimize_on_insert;
};
}
diff --git a/src/Storages/MergeTree/MergeTreeDataWriter.cpp b/src/Storages/MergeTree/MergeTreeDataWriter.cpp
index e65a8e22f32..c93d4bceba0 100644
--- a/src/Storages/MergeTree/MergeTreeDataWriter.cpp
+++ b/src/Storages/MergeTree/MergeTreeDataWriter.cpp
@@ -16,6 +16,14 @@
#include
+#include
+#include
+#include
+#include
+#include
+#include
+#include
+
namespace ProfileEvents
{
extern const Event MergeTreeDataWriterBlocks;
@@ -194,7 +202,74 @@ BlocksWithPartition MergeTreeDataWriter::splitBlockIntoParts(const Block & block
return result;
}
-MergeTreeData::MutableDataPartPtr MergeTreeDataWriter::writeTempPart(BlockWithPartition & block_with_partition, const StorageMetadataPtr & metadata_snapshot)
+Block MergeTreeDataWriter::mergeBlock(const Block & block, SortDescription sort_description, Names & partition_key_columns, IColumn::Permutation *& permutation)
+{
+ size_t block_size = block.rows();
+
+ auto get_merging_algorithm = [&]() -> std::shared_ptr
+ {
+ switch (data.merging_params.mode)
+ {
+ /// There is nothing to merge in single block in ordinary MergeTree
+ case MergeTreeData::MergingParams::Ordinary:
+ return nullptr;
+ case MergeTreeData::MergingParams::Replacing:
+ return std::make_shared(
+ block, 1, sort_description, data.merging_params.version_column, block_size + 1);
+ case MergeTreeData::MergingParams::Collapsing:
+ return std::make_shared(
+ block, 1, sort_description, data.merging_params.sign_column,
+ false, block_size + 1, &Poco::Logger::get("MergeTreeBlockOutputStream"));
+ case MergeTreeData::MergingParams::Summing:
+ return std::make_shared(
+ block, 1, sort_description, data.merging_params.columns_to_sum,
+ partition_key_columns, block_size + 1);
+ case MergeTreeData::MergingParams::Aggregating:
+ return std::make_shared(block, 1, sort_description, block_size + 1);
+ case MergeTreeData::MergingParams::VersionedCollapsing:
+ return std::make_shared(
+ block, 1, sort_description, data.merging_params.sign_column, block_size + 1);
+ case MergeTreeData::MergingParams::Graphite:
+ return std::make_shared(
+ block, 1, sort_description, block_size + 1, data.merging_params.graphite_params, time(nullptr));
+ }
+
+ __builtin_unreachable();
+ };
+
+ auto merging_algorithm = get_merging_algorithm();
+ if (!merging_algorithm)
+ return block;
+
+ Chunk chunk(block.getColumns(), block_size);
+
+ IMergingAlgorithm::Input input;
+ input.set(std::move(chunk));
+ input.permutation = permutation;
+
+ IMergingAlgorithm::Inputs inputs;
+ inputs.push_back(std::move(input));
+ merging_algorithm->initialize(std::move(inputs));
+
+ IMergingAlgorithm::Status status = merging_algorithm->merge();
+
+ /// Check that after first merge merging_algorithm is waiting for data from input 0.
+ if (status.required_source != 0)
+ throw Exception("Logical error: required source after the first merge is not 0.", ErrorCodes::LOGICAL_ERROR);
+
+ status = merging_algorithm->merge();
+
+ /// Check that merge is finished.
+ if (!status.is_finished)
+ throw Exception("Logical error: merge is not finished after the second merge.", ErrorCodes::LOGICAL_ERROR);
+
+ /// Merged Block is sorted and we don't need to use permutation anymore
+ permutation = nullptr;
+
+ return block.cloneWithColumns(status.chunk.getColumns());
+}
+
+MergeTreeData::MutableDataPartPtr MergeTreeDataWriter::writeTempPart(BlockWithPartition & block_with_partition, const StorageMetadataPtr & metadata_snapshot, bool optimize_on_insert)
{
Block & block = block_with_partition.block;
@@ -228,6 +303,38 @@ MergeTreeData::MutableDataPartPtr MergeTreeDataWriter::writeTempPart(BlockWithPa
else
part_name = new_part_info.getPartName();
+ /// If we need to calculate some columns to sort.
+ if (metadata_snapshot->hasSortingKey() || metadata_snapshot->hasSecondaryIndices())
+ data.getSortingKeyAndSkipIndicesExpression(metadata_snapshot)->execute(block);
+
+ Names sort_columns = metadata_snapshot->getSortingKeyColumns();
+ SortDescription sort_description;
+ size_t sort_columns_size = sort_columns.size();
+ sort_description.reserve(sort_columns_size);
+
+ for (size_t i = 0; i < sort_columns_size; ++i)
+ sort_description.emplace_back(block.getPositionByName(sort_columns[i]), 1, 1);
+
+ ProfileEvents::increment(ProfileEvents::MergeTreeDataWriterBlocks);
+
+ /// Sort
+ IColumn::Permutation * perm_ptr = nullptr;
+ IColumn::Permutation perm;
+ if (!sort_description.empty())
+ {
+ if (!isAlreadySorted(block, sort_description))
+ {
+ stableGetPermutation(block, sort_description, perm);
+ perm_ptr = &perm;
+ }
+ else
+ ProfileEvents::increment(ProfileEvents::MergeTreeDataWriterBlocksAlreadySorted);
+ }
+
+ Names partition_key_columns = metadata_snapshot->getPartitionKey().column_names;
+ if (optimize_on_insert)
+ block = mergeBlock(block, sort_description, partition_key_columns, perm_ptr);
+
/// Size of part would not be greater than block.bytes() + epsilon
size_t expected_size = block.bytes();
@@ -274,34 +381,6 @@ MergeTreeData::MutableDataPartPtr MergeTreeDataWriter::writeTempPart(BlockWithPa
sync_guard.emplace(disk, full_path);
}
- /// If we need to calculate some columns to sort.
- if (metadata_snapshot->hasSortingKey() || metadata_snapshot->hasSecondaryIndices())
- data.getSortingKeyAndSkipIndicesExpression(metadata_snapshot)->execute(block);
-
- Names sort_columns = metadata_snapshot->getSortingKeyColumns();
- SortDescription sort_description;
- size_t sort_columns_size = sort_columns.size();
- sort_description.reserve(sort_columns_size);
-
- for (size_t i = 0; i < sort_columns_size; ++i)
- sort_description.emplace_back(block.getPositionByName(sort_columns[i]), 1, 1);
-
- ProfileEvents::increment(ProfileEvents::MergeTreeDataWriterBlocks);
-
- /// Sort
- IColumn::Permutation * perm_ptr = nullptr;
- IColumn::Permutation perm;
- if (!sort_description.empty())
- {
- if (!isAlreadySorted(block, sort_description))
- {
- stableGetPermutation(block, sort_description, perm);
- perm_ptr = &perm;
- }
- else
- ProfileEvents::increment(ProfileEvents::MergeTreeDataWriterBlocksAlreadySorted);
- }
-
if (metadata_snapshot->hasRowsTTL())
updateTTL(metadata_snapshot->getRowsTTL(), new_data_part->ttl_infos, new_data_part->ttl_infos.table_ttl, block, true);
diff --git a/src/Storages/MergeTree/MergeTreeDataWriter.h b/src/Storages/MergeTree/MergeTreeDataWriter.h
index b4ad936672c..685d1adf947 100644
--- a/src/Storages/MergeTree/MergeTreeDataWriter.h
+++ b/src/Storages/MergeTree/MergeTreeDataWriter.h
@@ -45,7 +45,9 @@ public:
/** All rows must correspond to same partition.
* Returns part with unique name starting with 'tmp_', yet not added to MergeTreeData.
*/
- MergeTreeData::MutableDataPartPtr writeTempPart(BlockWithPartition & block, const StorageMetadataPtr & metadata_snapshot);
+ MergeTreeData::MutableDataPartPtr writeTempPart(BlockWithPartition & block, const StorageMetadataPtr & metadata_snapshot, bool optimize_on_insert);
+
+ Block mergeBlock(const Block & block, SortDescription sort_description, Names & partition_key_columns, IColumn::Permutation *& permutation);
private:
MergeTreeData & data;
diff --git a/src/Storages/MergeTree/ReplicatedMergeTreeBlockOutputStream.cpp b/src/Storages/MergeTree/ReplicatedMergeTreeBlockOutputStream.cpp
index 9bf1de5d03c..5c2c96a57d9 100644
--- a/src/Storages/MergeTree/ReplicatedMergeTreeBlockOutputStream.cpp
+++ b/src/Storages/MergeTree/ReplicatedMergeTreeBlockOutputStream.cpp
@@ -40,7 +40,8 @@ ReplicatedMergeTreeBlockOutputStream::ReplicatedMergeTreeBlockOutputStream(
size_t quorum_timeout_ms_,
size_t max_parts_per_block_,
bool quorum_parallel_,
- bool deduplicate_)
+ bool deduplicate_,
+ bool optimize_on_insert_)
: storage(storage_)
, metadata_snapshot(metadata_snapshot_)
, quorum(quorum_)
@@ -49,6 +50,7 @@ ReplicatedMergeTreeBlockOutputStream::ReplicatedMergeTreeBlockOutputStream(
, quorum_parallel(quorum_parallel_)
, deduplicate(deduplicate_)
, log(&Poco::Logger::get(storage.getLogName() + " (Replicated OutputStream)"))
+ , optimize_on_insert(optimize_on_insert_)
{
/// The quorum value `1` has the same meaning as if it is disabled.
if (quorum == 1)
@@ -142,7 +144,7 @@ void ReplicatedMergeTreeBlockOutputStream::write(const Block & block)
/// Write part to the filesystem under temporary name. Calculate a checksum.
- MergeTreeData::MutableDataPartPtr part = storage.writer.writeTempPart(current_block, metadata_snapshot);
+ MergeTreeData::MutableDataPartPtr part = storage.writer.writeTempPart(current_block, metadata_snapshot, optimize_on_insert);
String block_id;
diff --git a/src/Storages/MergeTree/ReplicatedMergeTreeBlockOutputStream.h b/src/Storages/MergeTree/ReplicatedMergeTreeBlockOutputStream.h
index 97c094c1128..3ac2c4bcfcb 100644
--- a/src/Storages/MergeTree/ReplicatedMergeTreeBlockOutputStream.h
+++ b/src/Storages/MergeTree/ReplicatedMergeTreeBlockOutputStream.h
@@ -29,7 +29,8 @@ public:
size_t quorum_timeout_ms_,
size_t max_parts_per_block_,
bool quorum_parallel_,
- bool deduplicate_);
+ bool deduplicate_,
+ bool optimize_on_insert);
Block getHeader() const override;
void writePrefix() override;
@@ -71,6 +72,8 @@ private:
using Logger = Poco::Logger;
Poco::Logger * log;
+
+ bool optimize_on_insert;
};
}
diff --git a/src/Storages/MergeTree/SimpleMergeSelector.h b/src/Storages/MergeTree/SimpleMergeSelector.h
index fe57c40320a..4f277ad74cd 100644
--- a/src/Storages/MergeTree/SimpleMergeSelector.h
+++ b/src/Storages/MergeTree/SimpleMergeSelector.h
@@ -3,6 +3,83 @@
#include
+/**
+We have a set of data parts that is dynamically changing - new data parts are added and there is background merging process.
+Background merging process periodically selects continuous range of data parts to merge.
+
+It tries to optimize the following metrics:
+1. Write amplification: total amount of data written on disk (source data + merges) relative to the amount of source data.
+It can be also considered as the total amount of work for merges.
+2. The number of data parts in the set at the random moment of time (average + quantiles).
+
+Also taking the following considerations:
+1. Older data parts should be merged less frequently than newer data parts.
+2. Larger data parts should be merged less frequently than smaller data parts.
+3. If no new parts arrive, we should continue to merge existing data parts to eventually optimize the table.
+4. Never allow too many parts, because it will slow down SELECT queries significantly.
+5. Multiple background merges can run concurrently but not too many.
+
+It is not possible to optimize both metrics, because they contradict to each other.
+To lower the number of parts we can merge eagerly but write amplification will increase.
+Then we need some balance between optimization of these two metrics.
+
+But some optimizations may improve both metrics.
+
+For example, we can look at the "merge tree" - the tree of data parts that were merged.
+If the tree is perfectly balanced then its depth is proportonal to the log(data size),
+the total amount of work is proportional to data_size * log(data_size)
+and the write amplification is proportional to log(data_size).
+If it's not balanced (e.g. every new data part is always merged with existing data parts),
+its depth is proportional to the data size, total amount of work is proportional to data_size^2.
+
+We can also control the "base of the logarithm" - you can look it as the number of data parts
+that are merged at once (the tree "arity"). But as the data parts are of different size, we should generalize it:
+calculate the ratio between total size of merged parts to the size of the largest part participated in merge.
+For example, if we merge 4 parts of size 5, 3, 1, 1 - then "base" will be 2 (10 / 5).
+
+Base of the logarithm (simply called `base` in `SimpleMergeSelector`) is the main knob to control the write amplification.
+The more it is, the less is write amplification but we will have more data parts on average.
+
+To fit all the considerations, we also adjust `base` depending on total parts count,
+parts size and parts age, with linear interpolation (then `base` is not a constant but a function of multiple variables,
+looking like a section of hyperplane).
+
+
+Then we apply the algorithm to select the optimal range of data parts to merge.
+There is a proof that this algorithm is optimal if we look in the future only by single step.
+
+The best range of data parts is selected.
+
+We also apply some tunes:
+- there is a fixed const of merging small parts (that is added to the size of data part before all estimations);
+- there are some heuristics to "stick" ranges to large data parts.
+
+It's still unclear if this algorithm is good or optimal at all. It's unclear if this algorithm is using the optimal coefficients.
+
+To test and optimize SimpleMergeSelector, we apply the following methods:
+- insert/merge simulator: a model simulating parts insertion and merging;
+merge selecting algorithm is applied and the relevant metrics are calculated to allow to tune the algorithm;
+- insert/merge simulator on real `system.part_log` from production - it gives realistic information about inserted data parts:
+their sizes, at what time intervals they are inserted;
+
+There is a research thesis dedicated to optimization of merge algorithm:
+https://presentations.clickhouse.tech/hse_2019/merge_algorithm.pptx
+
+This work made attempt to variate the coefficients in SimpleMergeSelector and to solve the optimization task:
+maybe some change in coefficients will give a clear win on all metrics. Surprisingly enough, it has found
+that our selection of coefficients is near optimal. It has found slightly more optimal coefficients,
+but I decided not to use them, because the representativeness of the test data is in question.
+
+This work did not make any attempt to propose any other algorithm.
+This work did not make any attempt to analyze the task with analytical methods.
+That's why I still believe that there are many opportunities to optimize the merge selection algorithm.
+
+Please do not mix the task with a similar task in other LSM-based systems (like RocksDB).
+Their problem statement is subtly different. Our set of data parts is consisted of data parts
+that are completely independent in stored data. Ranges of primary keys in data parts can intersect.
+When doing SELECT we read from all data parts. INSERTed data parts comes with unknown size...
+*/
+
namespace DB
{
diff --git a/src/Storages/StorageMerge.cpp b/src/Storages/StorageMerge.cpp
index 156b62e64a2..e2ebd4ed424 100644
--- a/src/Storages/StorageMerge.cpp
+++ b/src/Storages/StorageMerge.cpp
@@ -84,22 +84,6 @@ StorageMerge::StorageMerge(
setInMemoryMetadata(storage_metadata);
}
-StorageMerge::StorageMerge(
- const StorageID & table_id_,
- const ColumnsDescription & columns_,
- const String & source_database_,
- const Tables & tables_,
- const Context & context_)
- : IStorage(table_id_)
- , source_database(source_database_)
- , tables(tables_)
- , global_context(context_.getGlobalContext())
-{
- StorageInMemoryMetadata storage_metadata;
- storage_metadata.setColumns(columns_);
- setInMemoryMetadata(storage_metadata);
-}
-
template
StoragePtr StorageMerge::getFirstTable(F && predicate) const
{
@@ -455,12 +439,8 @@ DatabaseTablesIteratorPtr StorageMerge::getDatabaseIterator(const Context & cont
e.addMessage("while getting table iterator of Merge table. Maybe caused by two Merge tables that will endlessly try to read each other's data");
throw;
}
-
- if (tables)
- return std::make_unique(*tables, source_database);
-
auto database = DatabaseCatalog::instance().getDatabase(source_database);
- auto table_name_match = [this](const String & table_name_) { return table_name_regexp->match(table_name_); };
+ auto table_name_match = [this](const String & table_name_) { return table_name_regexp.match(table_name_); };
return database->getTablesIterator(context, table_name_match);
}
diff --git a/src/Storages/StorageMerge.h b/src/Storages/StorageMerge.h
index 3c322c09b36..681ea7015e7 100644
--- a/src/Storages/StorageMerge.h
+++ b/src/Storages/StorageMerge.h
@@ -48,8 +48,7 @@ public:
private:
String source_database;
- std::optional table_name_regexp;
- std::optional tables;
+ OptimizedRegularExpression table_name_regexp;
const Context & global_context;
using StorageWithLockAndName = std::tuple;
@@ -76,13 +75,6 @@ protected:
const String & table_name_regexp_,
const Context & context_);
- StorageMerge(
- const StorageID & table_id_,
- const ColumnsDescription & columns_,
- const String & source_database_,
- const Tables & source_tables_,
- const Context & context_);
-
Pipe createSources(
const StorageMetadataPtr & metadata_snapshot,
SelectQueryInfo & query_info,
diff --git a/src/Storages/StorageMergeTree.cpp b/src/Storages/StorageMergeTree.cpp
index eb28ccfa3d5..591f9840896 100644
--- a/src/Storages/StorageMergeTree.cpp
+++ b/src/Storages/StorageMergeTree.cpp
@@ -233,7 +233,7 @@ BlockOutputStreamPtr StorageMergeTree::write(const ASTPtr & /*query*/, const Sto
const auto & settings = context.getSettingsRef();
return std::make_shared(
- *this, metadata_snapshot, settings.max_partitions_per_insert_block);
+ *this, metadata_snapshot, settings.max_partitions_per_insert_block, context.getSettingsRef().optimize_on_insert);
}
void StorageMergeTree::checkTableCanBeDropped() const
diff --git a/src/Storages/StorageReplicatedMergeTree.cpp b/src/Storages/StorageReplicatedMergeTree.cpp
index 31b04664b17..76513dc4d78 100644
--- a/src/Storages/StorageReplicatedMergeTree.cpp
+++ b/src/Storages/StorageReplicatedMergeTree.cpp
@@ -3861,7 +3861,8 @@ BlockOutputStreamPtr StorageReplicatedMergeTree::write(const ASTPtr & /*query*/,
query_settings.insert_quorum_timeout.totalMilliseconds(),
query_settings.max_partitions_per_insert_block,
query_settings.insert_quorum_parallel,
- deduplicate);
+ deduplicate,
+ context.getSettingsRef().optimize_on_insert);
}
@@ -4444,7 +4445,7 @@ PartitionCommandsResultInfo StorageReplicatedMergeTree::attachPartition(
PartsTemporaryRename renamed_parts(*this, "detached/");
MutableDataPartsVector loaded_parts = tryLoadPartsToAttach(partition, attach_part, query_context, renamed_parts);
- ReplicatedMergeTreeBlockOutputStream output(*this, metadata_snapshot, 0, 0, 0, false, false); /// TODO Allow to use quorum here.
+ ReplicatedMergeTreeBlockOutputStream output(*this, metadata_snapshot, 0, 0, 0, false, false, false); /// TODO Allow to use quorum here.
for (size_t i = 0; i < loaded_parts.size(); ++i)
{
String old_name = loaded_parts[i]->name;
diff --git a/src/Storages/StorageReplicatedMergeTree.h b/src/Storages/StorageReplicatedMergeTree.h
index f1094793944..d396f32dcca 100644
--- a/src/Storages/StorageReplicatedMergeTree.h
+++ b/src/Storages/StorageReplicatedMergeTree.h
@@ -39,7 +39,8 @@ namespace DB
* - the structure of the table (/metadata, /columns)
* - action log with data (/log/log-...,/replicas/replica_name/queue/queue-...);
* - a replica list (/replicas), and replica activity tag (/replicas/replica_name/is_active), replica addresses (/replicas/replica_name/host);
- * - select the leader replica (/leader_election) - this is the replica that assigns the merge;
+ * - select the leader replica (/leader_election) - these are the replicas that assigning merges, mutations and partition manipulations
+ * (after ClickHouse version 20.5 we allow multiple leaders to act concurrently);
* - a set of parts of data on each replica (/replicas/replica_name/parts);
* - list of the last N blocks of data with checksum, for deduplication (/blocks);
* - the list of incremental block numbers (/block_numbers) that we are about to insert,
diff --git a/src/TableFunctions/TableFunctionMerge.cpp b/src/TableFunctions/TableFunctionMerge.cpp
index 5d1601c25f1..a878909e29d 100644
--- a/src/TableFunctions/TableFunctionMerge.cpp
+++ b/src/TableFunctions/TableFunctionMerge.cpp
@@ -6,7 +6,6 @@
#include
#include
#include
-#include
#include
#include
#include
@@ -23,6 +22,29 @@ namespace ErrorCodes
}
+static NamesAndTypesList chooseColumns(const String & source_database, const String & table_name_regexp_, const Context & context)
+{
+ OptimizedRegularExpression table_name_regexp(table_name_regexp_);
+ auto table_name_match = [&](const String & table_name) { return table_name_regexp.match(table_name); };
+
+ StoragePtr any_table;
+
+ {
+ auto database = DatabaseCatalog::instance().getDatabase(source_database);
+ auto iterator = database->getTablesIterator(context, table_name_match);
+
+ if (iterator->isValid())
+ if (const auto & table = iterator->table())
+ any_table = table;
+ }
+
+ if (!any_table)
+ throw Exception("Error while executing table function merge. In database " + source_database + " no one matches regular expression: "
+ + table_name_regexp_, ErrorCodes::UNKNOWN_TABLE);
+
+ return any_table->getInMemoryMetadataPtr()->getColumns().getAllPhysical();
+}
+
void TableFunctionMerge::parseArguments(const ASTPtr & ast_function, const Context & context)
{
ASTs & args_func = ast_function->children;
@@ -46,46 +68,9 @@ void TableFunctionMerge::parseArguments(const ASTPtr & ast_function, const Conte
table_name_regexp = args[1]->as().value.safeGet();
}
-
-const Tables & TableFunctionMerge::getMatchingTables(const Context & context) const
-{
- if (tables)
- return *tables;
-
- auto database = DatabaseCatalog::instance().getDatabase(source_database);
-
- OptimizedRegularExpression re(table_name_regexp);
- auto table_name_match = [&](const String & table_name_) { return re.match(table_name_); };
-
- auto access = context.getAccess();
- bool granted_show_on_all_tables = access->isGranted(AccessType::SHOW_TABLES, source_database);
- bool granted_select_on_all_tables = access->isGranted(AccessType::SELECT, source_database);
-
- tables.emplace();
- for (auto it = database->getTablesIterator(context, table_name_match); it->isValid(); it->next())
- {
- if (!it->table())
- continue;
- bool granted_show = granted_show_on_all_tables || access->isGranted(AccessType::SHOW_TABLES, source_database, it->name());
- if (!granted_show)
- continue;
- if (!granted_select_on_all_tables)
- access->checkAccess(AccessType::SELECT, source_database, it->name());
- tables->emplace(it->name(), it->table());
- }
-
- if (tables->empty())
- throw Exception("Error while executing table function merge. In database " + source_database + " no one matches regular expression: "
- + table_name_regexp, ErrorCodes::UNKNOWN_TABLE);
-
- return *tables;
-}
-
-
ColumnsDescription TableFunctionMerge::getActualTableStructure(const Context & context) const
{
- auto first_table = getMatchingTables(context).begin()->second;
- return ColumnsDescription{first_table->getInMemoryMetadataPtr()->getColumns().getAllPhysical()};
+ return ColumnsDescription{chooseColumns(source_database, table_name_regexp, context)};
}
StoragePtr TableFunctionMerge::executeImpl(const ASTPtr & /*ast_function*/, const Context & context, const std::string & table_name, ColumnsDescription /*cached_columns*/) const
@@ -94,7 +79,7 @@ StoragePtr TableFunctionMerge::executeImpl(const ASTPtr & /*ast_function*/, cons
StorageID(getDatabaseName(), table_name),
getActualTableStructure(context),
source_database,
- getMatchingTables(context),
+ table_name_regexp,
context);
res->startup();
diff --git a/src/TableFunctions/TableFunctionMerge.h b/src/TableFunctions/TableFunctionMerge.h
index 38ea2c22995..a9c4dd6b038 100644
--- a/src/TableFunctions/TableFunctionMerge.h
+++ b/src/TableFunctions/TableFunctionMerge.h
@@ -19,13 +19,11 @@ private:
StoragePtr executeImpl(const ASTPtr & ast_function, const Context & context, const std::string & table_name, ColumnsDescription cached_columns) const override;
const char * getStorageTypeName() const override { return "Merge"; }
- const Tables & getMatchingTables(const Context & context) const;
ColumnsDescription getActualTableStructure(const Context & context) const override;
void parseArguments(const ASTPtr & ast_function, const Context & context) override;
String source_database;
String table_name_regexp;
- mutable std::optional tables;
};
diff --git a/tests/clickhouse-test b/tests/clickhouse-test
index 3702268819b..4eff8351e66 100755
--- a/tests/clickhouse-test
+++ b/tests/clickhouse-test
@@ -15,6 +15,7 @@ from subprocess import check_call
from subprocess import Popen
from subprocess import PIPE
from subprocess import CalledProcessError
+from subprocess import TimeoutExpired
from datetime import datetime
from time import time, sleep
from errno import ESRCH
@@ -114,6 +115,7 @@ def get_db_engine(args):
def run_single_test(args, ext, server_logs_level, client_options, case_file, stdout_file, stderr_file):
# print(client_options)
+ start_time = datetime.now()
if args.database:
database = args.database
os.environ.setdefault("CLICKHOUSE_DATABASE", database)
@@ -129,7 +131,11 @@ def run_single_test(args, ext, server_logs_level, client_options, case_file, std
database = 'test_{suffix}'.format(suffix=random_str())
clickhouse_proc_create = Popen(shlex.split(args.client), stdin=PIPE, stdout=PIPE, stderr=PIPE, universal_newlines=True)
- clickhouse_proc_create.communicate(("CREATE DATABASE " + database + get_db_engine(args)))
+ try:
+ clickhouse_proc_create.communicate(("CREATE DATABASE " + database + get_db_engine(args)), timeout=args.timeout)
+ except TimeoutExpired:
+ total_time = (datetime.now() - start_time).total_seconds()
+ return clickhouse_proc_create, "", "Timeout creating database {} before test".format(database), total_time
os.environ["CLICKHOUSE_DATABASE"] = database
@@ -152,14 +158,24 @@ def run_single_test(args, ext, server_logs_level, client_options, case_file, std
# print(command)
proc = Popen(command, shell=True, env=os.environ)
- start_time = datetime.now()
while (datetime.now() - start_time).total_seconds() < args.timeout and proc.poll() is None:
sleep(0.01)
if not args.database:
clickhouse_proc_create = Popen(shlex.split(args.client), stdin=PIPE, stdout=PIPE, stderr=PIPE, universal_newlines=True)
- clickhouse_proc_create.communicate(("DROP DATABASE " + database))
+ seconds_left = max(args.timeout - (datetime.now() - start_time).total_seconds(), 10)
+ try:
+ clickhouse_proc_create.communicate(("DROP DATABASE " + database), timeout=seconds_left)
+ except TimeoutExpired:
+ # kill test process because it can also hung
+ if proc.returncode is None:
+ try:
+ proc.kill()
+ except OSError as e:
+ if e.errno != ESRCH:
+ raise
+ return clickhouse_proc_create, "", "Timeout dropping database {} after test".format(database), total_time
total_time = (datetime.now() - start_time).total_seconds()
@@ -305,7 +321,7 @@ def run_tests_array(all_tests_with_params):
if args.testname:
clickhouse_proc = Popen(shlex.split(args.client), stdin=PIPE, stdout=PIPE, stderr=PIPE, universal_newlines=True)
- clickhouse_proc.communicate(("SELECT 'Running test {suite}/{case} from pid={pid}';".format(pid = os.getpid(), case = case, suite = suite)))
+ clickhouse_proc.communicate(("SELECT 'Running test {suite}/{case} from pid={pid}';".format(pid = os.getpid(), case = case, suite = suite)), timeout=10)
if clickhouse_proc.returncode != 0:
failures += 1
@@ -330,6 +346,8 @@ def run_tests_array(all_tests_with_params):
print(MSG_FAIL, end='')
print_test_time(total_time)
print(" - Timeout!")
+ if stderr:
+ print(stderr)
else:
counter = 1
while proc.returncode != 0 and need_retry(stderr):
diff --git a/tests/integration/README.md b/tests/integration/README.md
index 0886dc2cfac..cea1bd6f893 100644
--- a/tests/integration/README.md
+++ b/tests/integration/README.md
@@ -135,3 +135,13 @@ named `test.py` containing tests in it. All functions with names starting with `
To assert that two TSV files must be equal, wrap them in the `TSV` class and use the regular `assert`
statement. Example: `assert TSV(result) == TSV(reference)`. In case the assertion fails, `pytest`
will automagically detect the types of variables and only the small diff of two files is printed.
+
+### Troubleshooting
+
+If tests failing for misterious reasons, this may help:
+
+```
+sudo service docker stop
+sudo bash -c 'rm -rf /var/lib/docker/*'
+sudo service docker start
+```
diff --git a/tests/integration/test_graphite_merge_tree/configs/users.xml b/tests/integration/test_graphite_merge_tree/configs/users.xml
new file mode 100644
index 00000000000..cdd437797ce
--- /dev/null
+++ b/tests/integration/test_graphite_merge_tree/configs/users.xml
@@ -0,0 +1,8 @@
+
+
+
+
+ 0
+
+
+
diff --git a/tests/integration/test_graphite_merge_tree/test.py b/tests/integration/test_graphite_merge_tree/test.py
index 502004d2dfe..ee8a18f1ca5 100644
--- a/tests/integration/test_graphite_merge_tree/test.py
+++ b/tests/integration/test_graphite_merge_tree/test.py
@@ -8,7 +8,8 @@ from helpers.test_tools import TSV
cluster = ClickHouseCluster(__file__)
instance = cluster.add_instance('instance',
- main_configs=['configs/graphite_rollup.xml'])
+ main_configs=['configs/graphite_rollup.xml'],
+ user_configs=["configs/users.xml"])
q = instance.query
diff --git a/tests/integration/test_materialize_mysql_database/configs/users.xml b/tests/integration/test_materialize_mysql_database/configs/users.xml
index 4c167c06d63..196ea5cfb99 100644
--- a/tests/integration/test_materialize_mysql_database/configs/users.xml
+++ b/tests/integration/test_materialize_mysql_database/configs/users.xml
@@ -3,6 +3,8 @@
1
+ 1
+ 0
Ordinary
diff --git a/tests/integration/test_row_policy/configs/users.d/another_user.xml b/tests/integration/test_row_policy/configs/users.d/another_user.xml
index fb9608e5313..89e16e94c83 100644
--- a/tests/integration/test_row_policy/configs/users.d/another_user.xml
+++ b/tests/integration/test_row_policy/configs/users.d/another_user.xml
@@ -1,5 +1,10 @@
+
+
+ 0
+
+
@@ -10,4 +15,4 @@
default
-
\ No newline at end of file
+
diff --git a/tests/integration/test_table_functions_access_rights/test.py b/tests/integration/test_table_functions_access_rights/test.py
deleted file mode 100644
index b986981dc22..00000000000
--- a/tests/integration/test_table_functions_access_rights/test.py
+++ /dev/null
@@ -1,55 +0,0 @@
-import pytest
-from helpers.cluster import ClickHouseCluster
-from helpers.test_tools import TSV
-
-cluster = ClickHouseCluster(__file__)
-instance = cluster.add_instance('instance')
-
-
-@pytest.fixture(scope="module", autouse=True)
-def started_cluster():
- try:
- cluster.start()
-
- instance.query("CREATE TABLE table1(x UInt32) ENGINE = MergeTree ORDER BY tuple()")
- instance.query("CREATE TABLE table2(x UInt32) ENGINE = MergeTree ORDER BY tuple()")
- instance.query("INSERT INTO table1 VALUES (1)")
- instance.query("INSERT INTO table2 VALUES (2)")
-
- yield cluster
-
- finally:
- cluster.shutdown()
-
-
-@pytest.fixture(autouse=True)
-def cleanup_after_test():
- try:
- yield
- finally:
- instance.query("DROP USER IF EXISTS A")
-
-
-def test_merge():
- select_query = "SELECT * FROM merge('default', 'table[0-9]+') ORDER BY x"
- assert instance.query(select_query) == "1\n2\n"
-
- instance.query("CREATE USER A")
- assert "it's necessary to have the grant CREATE TEMPORARY TABLE ON *.*" in instance.query_and_get_error(select_query, user = 'A')
-
- instance.query("GRANT CREATE TEMPORARY TABLE ON *.* TO A")
- assert "no one matches regular expression" in instance.query_and_get_error(select_query, user = 'A')
-
- instance.query("GRANT SELECT ON default.table1 TO A")
- assert instance.query(select_query, user = 'A') == "1\n"
-
- instance.query("GRANT SELECT ON default.* TO A")
- assert instance.query(select_query, user = 'A') == "1\n2\n"
-
- instance.query("REVOKE SELECT ON default.table1 FROM A")
- assert instance.query(select_query, user = 'A') == "2\n"
-
- instance.query("REVOKE ALL ON default.* FROM A")
- instance.query("GRANT SELECT ON default.table1 TO A")
- instance.query("GRANT INSERT ON default.table2 TO A")
- assert "it's necessary to have the grant SELECT ON default.table2" in instance.query_and_get_error(select_query, user = 'A')
diff --git a/tests/queries/0_stateless/00083_create_merge_tree_zookeeper.sql b/tests/queries/0_stateless/00083_create_merge_tree_zookeeper.sql
index 120d599bd35..998a4517163 100644
--- a/tests/queries/0_stateless/00083_create_merge_tree_zookeeper.sql
+++ b/tests/queries/0_stateless/00083_create_merge_tree_zookeeper.sql
@@ -1,3 +1,5 @@
+SET optimize_on_insert = 0;
+
DROP TABLE IF EXISTS merge_tree;
DROP TABLE IF EXISTS collapsing_merge_tree;
DROP TABLE IF EXISTS versioned_collapsing_merge_tree;
diff --git a/tests/queries/0_stateless/00327_summing_composite_nested.sql b/tests/queries/0_stateless/00327_summing_composite_nested.sql
index b61bc71b892..9be21e87abf 100644
--- a/tests/queries/0_stateless/00327_summing_composite_nested.sql
+++ b/tests/queries/0_stateless/00327_summing_composite_nested.sql
@@ -1,3 +1,5 @@
+SET optimize_on_insert = 0;
+
DROP TABLE IF EXISTS summing_composite_key;
CREATE TABLE summing_composite_key (d Date, k UInt64, FirstMap Nested(k1 UInt32, k2ID Int8, s Float64), SecondMap Nested(k1ID UInt64, k2Key String, k3Type Int32, s Int64)) ENGINE = SummingMergeTree(d, k, 1);
diff --git a/tests/queries/0_stateless/00443_optimize_final_vertical_merge.sh b/tests/queries/0_stateless/00443_optimize_final_vertical_merge.sh
index 3cea3a65cdf..cc19a608c20 100755
--- a/tests/queries/0_stateless/00443_optimize_final_vertical_merge.sh
+++ b/tests/queries/0_stateless/00443_optimize_final_vertical_merge.sh
@@ -42,13 +42,13 @@ $CLICKHOUSE_CLIENT -q "INSERT INTO $name (date, Sign, ki) SELECT
toDate(0) AS date,
toInt8(1) AS Sign,
toUInt64(0) AS ki
-FROM system.numbers LIMIT 9000"
+FROM system.numbers LIMIT 9000" --server_logs_file=/dev/null
$CLICKHOUSE_CLIENT -q "INSERT INTO $name (date, Sign, ki) SELECT
toDate(0) AS date,
toInt8(1) AS Sign,
number AS ki
-FROM system.numbers LIMIT 9000, 9000"
+FROM system.numbers LIMIT 9000, 9000" --server_logs_file=/dev/null
$CLICKHOUSE_CLIENT -q "INSERT INTO $name SELECT
toDate(0) AS date,
@@ -67,7 +67,7 @@ number AS di09,
number AS di10,
[number, number+1] AS \`n.i\`,
[hex(number), hex(number+1)] AS \`n.s\`
-FROM system.numbers LIMIT $res_rows"
+FROM system.numbers LIMIT $res_rows" --server_logs_file=/dev/null
while [[ $(get_num_parts) -ne 1 ]] ; do $CLICKHOUSE_CLIENT -q "OPTIMIZE TABLE $name PARTITION 197001" --server_logs_file=/dev/null; done
diff --git a/tests/queries/0_stateless/00509_extended_storage_definition_syntax_zookeeper.sql b/tests/queries/0_stateless/00509_extended_storage_definition_syntax_zookeeper.sql
index 4ed053f5953..ef8655a1861 100644
--- a/tests/queries/0_stateless/00509_extended_storage_definition_syntax_zookeeper.sql
+++ b/tests/queries/0_stateless/00509_extended_storage_definition_syntax_zookeeper.sql
@@ -1,3 +1,5 @@
+SET optimize_on_insert = 0;
+
SELECT '*** Replicated with sampling ***';
DROP TABLE IF EXISTS replicated_with_sampling;
diff --git a/tests/queries/0_stateless/00564_versioned_collapsing_merge_tree.sql b/tests/queries/0_stateless/00564_versioned_collapsing_merge_tree.sql
index b7824e7efdc..634b9781c7a 100644
--- a/tests/queries/0_stateless/00564_versioned_collapsing_merge_tree.sql
+++ b/tests/queries/0_stateless/00564_versioned_collapsing_merge_tree.sql
@@ -1,3 +1,5 @@
+set optimize_on_insert = 0;
+
drop table if exists mult_tab;
create table mult_tab (date Date, value String, version UInt64, sign Int8) engine = VersionedCollapsingMergeTree(date, (date), 8192, sign, version);
insert into mult_tab select '2018-01-31', 'str_' || toString(number), 0, if(number % 2, 1, -1) from system.numbers limit 10;
diff --git a/tests/queries/0_stateless/00577_replacing_merge_tree_vertical_merge.sql b/tests/queries/0_stateless/00577_replacing_merge_tree_vertical_merge.sql
index 1bfbdaf75c5..8c51a6f34da 100644
--- a/tests/queries/0_stateless/00577_replacing_merge_tree_vertical_merge.sql
+++ b/tests/queries/0_stateless/00577_replacing_merge_tree_vertical_merge.sql
@@ -1,3 +1,5 @@
+set optimize_on_insert = 0;
+
drop table if exists tab_00577;
create table tab_00577 (date Date, version UInt64, val UInt64) engine = ReplacingMergeTree(version) partition by date order by date settings enable_vertical_merge_algorithm = 1, vertical_merge_algorithm_min_rows_to_activate = 1, vertical_merge_algorithm_min_columns_to_activate = 0;
insert into tab_00577 values ('2018-01-01', 2, 2), ('2018-01-01', 1, 1);
diff --git a/tests/queries/0_stateless/00616_final_single_part.sql b/tests/queries/0_stateless/00616_final_single_part.sql
index df65123e29b..6618d0b1252 100644
--- a/tests/queries/0_stateless/00616_final_single_part.sql
+++ b/tests/queries/0_stateless/00616_final_single_part.sql
@@ -1,3 +1,5 @@
+SET optimize_on_insert = 0;
+
DROP TABLE IF EXISTS test_00616;
DROP TABLE IF EXISTS replacing_00616;
diff --git a/tests/queries/0_stateless/00660_optimize_final_without_partition.sql b/tests/queries/0_stateless/00660_optimize_final_without_partition.sql
index 8c1f2ebd361..6545ad6e85b 100644
--- a/tests/queries/0_stateless/00660_optimize_final_without_partition.sql
+++ b/tests/queries/0_stateless/00660_optimize_final_without_partition.sql
@@ -1,3 +1,5 @@
+SET optimize_on_insert = 0;
+
DROP TABLE IF EXISTS partitioned_by_tuple;
CREATE TABLE partitioned_by_tuple (d Date, x UInt8, w String, y UInt8) ENGINE SummingMergeTree (y) PARTITION BY (d, x) ORDER BY (d, x, w);
diff --git a/tests/queries/0_stateless/00661_optimize_final_replicated_without_partition_zookeeper.sql b/tests/queries/0_stateless/00661_optimize_final_replicated_without_partition_zookeeper.sql
index 77747bc0383..033202e04aa 100644
--- a/tests/queries/0_stateless/00661_optimize_final_replicated_without_partition_zookeeper.sql
+++ b/tests/queries/0_stateless/00661_optimize_final_replicated_without_partition_zookeeper.sql
@@ -1,3 +1,5 @@
+SET optimize_on_insert = 0;
+
DROP TABLE IF EXISTS partitioned_by_tuple_replica1_00661;
DROP TABLE IF EXISTS partitioned_by_tuple_replica2_00661;
CREATE TABLE partitioned_by_tuple_replica1_00661(d Date, x UInt8, w String, y UInt8) ENGINE = ReplicatedSummingMergeTree('/clickhouse/tables/test/partitioned_by_tuple_00661', '1') PARTITION BY (d, x) ORDER BY (d, x, w);
diff --git a/tests/queries/0_stateless/00754_alter_modify_order_by.sql b/tests/queries/0_stateless/00754_alter_modify_order_by.sql
index f8c584ed052..a09d824c928 100644
--- a/tests/queries/0_stateless/00754_alter_modify_order_by.sql
+++ b/tests/queries/0_stateless/00754_alter_modify_order_by.sql
@@ -1,4 +1,5 @@
SET send_logs_level = 'fatal';
+SET optimize_on_insert = 0;
DROP TABLE IF EXISTS old_style;
CREATE TABLE old_style(d Date, x UInt32) ENGINE MergeTree(d, x, 8192);
diff --git a/tests/queries/0_stateless/00754_alter_modify_order_by_replicated_zookeeper.sql b/tests/queries/0_stateless/00754_alter_modify_order_by_replicated_zookeeper.sql
index 4b150fb3826..809adfaa498 100644
--- a/tests/queries/0_stateless/00754_alter_modify_order_by_replicated_zookeeper.sql
+++ b/tests/queries/0_stateless/00754_alter_modify_order_by_replicated_zookeeper.sql
@@ -1,3 +1,5 @@
+SET optimize_on_insert = 0;
+
SET send_logs_level = 'fatal';
DROP TABLE IF EXISTS old_style;
diff --git a/tests/queries/0_stateless/01030_incorrect_count_summing_merge_tree.sql b/tests/queries/0_stateless/01030_incorrect_count_summing_merge_tree.sql
index 0b5845d3b04..90b1660e546 100644
--- a/tests/queries/0_stateless/01030_incorrect_count_summing_merge_tree.sql
+++ b/tests/queries/0_stateless/01030_incorrect_count_summing_merge_tree.sql
@@ -1,3 +1,5 @@
+SET optimize_on_insert = 0;
+
select '-- SummingMergeTree with Nullable column without duplicates.';
drop table if exists tst;
diff --git a/tests/queries/0_stateless/01285_data_skip_index_over_aggregation.sql b/tests/queries/0_stateless/01285_data_skip_index_over_aggregation.sql
index 575b4dfa4a5..dba734454c0 100644
--- a/tests/queries/0_stateless/01285_data_skip_index_over_aggregation.sql
+++ b/tests/queries/0_stateless/01285_data_skip_index_over_aggregation.sql
@@ -1,3 +1,5 @@
+SET optimize_on_insert = 0;
+
DROP TABLE IF EXISTS data_01285;
SET max_threads=1;
diff --git a/tests/queries/0_stateless/01323_add_scalars_in_time.sql b/tests/queries/0_stateless/01323_add_scalars_in_time.sql
index 2d7cf270017..2ee5603f760 100644
--- a/tests/queries/0_stateless/01323_add_scalars_in_time.sql
+++ b/tests/queries/0_stateless/01323_add_scalars_in_time.sql
@@ -1,3 +1,5 @@
+SET optimize_on_insert = 0;
+
DROP TABLE IF EXISTS tags;
CREATE TABLE tags (
diff --git a/tests/queries/0_stateless/01373_summing_merge_tree_exclude_partition_key.sql b/tests/queries/0_stateless/01373_summing_merge_tree_exclude_partition_key.sql
index 790fbca6b73..c5a874efe09 100644
--- a/tests/queries/0_stateless/01373_summing_merge_tree_exclude_partition_key.sql
+++ b/tests/queries/0_stateless/01373_summing_merge_tree_exclude_partition_key.sql
@@ -1,3 +1,5 @@
+SET optimize_on_insert = 0;
+
DROP TABLE IF EXISTS tt_01373;
CREATE TABLE tt_01373
diff --git a/tests/queries/0_stateless/01560_optimize_on_insert.reference b/tests/queries/0_stateless/01560_optimize_on_insert.reference
new file mode 100644
index 00000000000..7ace2043be0
--- /dev/null
+++ b/tests/queries/0_stateless/01560_optimize_on_insert.reference
@@ -0,0 +1,13 @@
+Replacing Merge Tree
+1 2020-01-01 00:00:00
+2 2020-01-02 00:00:00
+Collapsing Merge Tree
+1 1 2020-01-01 00:00:00
+Versioned Collapsing Merge Tree
+1 1 2 2020-01-01 00:00:00
+Summing Merge Tree
+1 6 2020-01-01 00:00:00
+2 6 2020-01-02 00:00:00
+Aggregating Merge Tree
+1 5 2020-01-01 00:00:00
+2 5 2020-01-02 00:00:00
diff --git a/tests/queries/0_stateless/01560_optimize_on_insert.sql b/tests/queries/0_stateless/01560_optimize_on_insert.sql
new file mode 100644
index 00000000000..9f6dac686bb
--- /dev/null
+++ b/tests/queries/0_stateless/01560_optimize_on_insert.sql
@@ -0,0 +1,35 @@
+SELECT 'Replacing Merge Tree';
+DROP TABLE IF EXISTS replacing_merge_tree;
+CREATE TABLE replacing_merge_tree (key UInt32, date Datetime) ENGINE=ReplacingMergeTree() PARTITION BY date ORDER BY key;
+INSERT INTO replacing_merge_tree VALUES (1, '2020-01-01'), (2, '2020-01-02'), (1, '2020-01-01'), (2, '2020-01-02');
+SELECT * FROM replacing_merge_tree ORDER BY key;
+DROP TABLE replacing_merge_tree;
+
+SELECT 'Collapsing Merge Tree';
+DROP TABLE IF EXISTS collapsing_merge_tree;
+CREATE TABLE collapsing_merge_tree (key UInt32, sign Int8, date Datetime) ENGINE=CollapsingMergeTree(sign) PARTITION BY date ORDER BY key;
+INSERT INTO collapsing_merge_tree VALUES (1, 1, '2020-01-01'), (2, 1, '2020-01-02'), (1, -1, '2020-01-01'), (2, -1, '2020-01-02'), (1, 1, '2020-01-01');
+SELECT * FROM collapsing_merge_tree ORDER BY key;
+DROP TABLE collapsing_merge_tree;
+
+SELECT 'Versioned Collapsing Merge Tree';
+DROP TABLE IF EXISTS versioned_collapsing_merge_tree;
+CREATE TABLE versioned_collapsing_merge_tree (key UInt32, sign Int8, version Int32, date Datetime) ENGINE=VersionedCollapsingMergeTree(sign, version) PARTITION BY date ORDER BY (key, version);
+INSERT INTO versioned_collapsing_merge_tree VALUES (1, 1, 1, '2020-01-01'), (1, -1, 1, '2020-01-01'), (1, 1, 2, '2020-01-01');
+SELECT * FROM versioned_collapsing_merge_tree ORDER BY key;
+DROP TABLE versioned_collapsing_merge_tree;
+
+SELECT 'Summing Merge Tree';
+DROP TABLE IF EXISTS summing_merge_tree;
+CREATE TABLE summing_merge_tree (key UInt32, val UInt32, date Datetime) ENGINE=SummingMergeTree(val) PARTITION BY date ORDER BY key;
+INSERT INTO summing_merge_tree VALUES (1, 1, '2020-01-01'), (2, 1, '2020-01-02'), (1, 5, '2020-01-01'), (2, 5, '2020-01-02');
+SELECT * FROM summing_merge_tree ORDER BY key;
+DROP TABLE summing_merge_tree;
+
+SELECT 'Aggregating Merge Tree';
+DROP TABLE IF EXISTS aggregating_merge_tree;
+CREATE TABLE aggregating_merge_tree (key UInt32, val SimpleAggregateFunction(max, UInt32), date Datetime) ENGINE=AggregatingMergeTree() PARTITION BY date ORDER BY key;
+INSERT INTO aggregating_merge_tree VALUES (1, 1, '2020-01-01'), (2, 1, '2020-01-02'), (1, 5, '2020-01-01'), (2, 5, '2020-01-02');
+SELECT * FROM aggregating_merge_tree ORDER BY key;
+DROP TABLE aggregating_merge_tree;
+
diff --git a/tests/integration/test_table_functions_access_rights/__init__.py b/tests/queries/0_stateless/01602_max_distributed_connections.reference
similarity index 100%
rename from tests/integration/test_table_functions_access_rights/__init__.py
rename to tests/queries/0_stateless/01602_max_distributed_connections.reference
diff --git a/tests/queries/0_stateless/01602_max_distributed_connections.sh b/tests/queries/0_stateless/01602_max_distributed_connections.sh
new file mode 100755
index 00000000000..8c19b6f5bb7
--- /dev/null
+++ b/tests/queries/0_stateless/01602_max_distributed_connections.sh
@@ -0,0 +1,15 @@
+#!/usr/bin/env bash
+
+CURDIR=$(cd "$(dirname "${BASH_SOURCE[0]}")" && pwd)
+. "$CURDIR"/../shell_config.sh
+
+common_opts=(
+ "--format=Null"
+
+ "--max_threads=1"
+ "--max_distributed_connections=3"
+)
+
+# NOTE: the test use higher timeout to avoid flakiness.
+timeout 9s ${CLICKHOUSE_CLIENT} "$@" "${common_opts[@]}" -q "select sleep(3) from remote('127.{1,2,3,4,5}', system.one)" --prefer_localhost_replica=0
+timeout 9s ${CLICKHOUSE_CLIENT} "$@" "${common_opts[@]}" -q "select sleep(3) from remote('127.{1,2,3,4,5}', system.one)" --prefer_localhost_replica=1