Merge branch 'no_background_pool_no_more' into add_separate_pool_for_fetches

This commit is contained in:
alesapin 2020-11-02 10:54:09 +03:00
commit 8b0d9fbc07
176 changed files with 1182 additions and 403 deletions

View File

@ -16,8 +16,4 @@ endif ()
if (CMAKE_SYSTEM_PROCESSOR MATCHES "^(ppc64le.*|PPC64LE.*)")
set (ARCH_PPC64LE 1)
# FIXME: move this check into tools.cmake
if (COMPILER_CLANG OR (COMPILER_GCC AND CMAKE_CXX_COMPILER_VERSION VERSION_LESS 8))
message(FATAL_ERROR "Only gcc-8 or higher is supported for powerpc architecture")
endif ()
endif ()

View File

@ -84,3 +84,9 @@ if (LINKER_NAME)
message(STATUS "Using custom linker by name: ${LINKER_NAME}")
endif ()
if (ARCH_PPC64LE)
if (COMPILER_CLANG OR (COMPILER_GCC AND CMAKE_CXX_COMPILER_VERSION VERSION_LESS 8))
message(FATAL_ERROR "Only gcc-8 or higher is supported for powerpc architecture")
endif ()
endif ()

View File

@ -17,7 +17,8 @@ RUN apt-get update \
sqlite3 \
curl \
tar \
krb5-user
krb5-user \
iproute2
RUN rm -rf \
/var/lib/apt/lists/* \
/var/cache/debconf \

View File

@ -148,6 +148,31 @@ You can define the parameters explicitly instead of using substitutions. This mi
When working with large clusters, we recommend using substitutions because they reduce the probability of error.
You can specify default arguments for `Replicated` table engine in the server configuration file. For instance:
```xml
<default_replica_path>/clickhouse/tables/{shard}/{database}/{table}</default_replica_path>
<default_replica_name>{replica}</default_replica_path>
```
In this case, you can omit arguments when creating tables:
``` sql
CREATE TABLE table_name (
x UInt32
) ENGINE = ReplicatedMergeTree
ORDER BY x;
```
It is equivalent to:
``` sql
CREATE TABLE table_name (
x UInt32
) ENGINE = ReplicatedMergeTree('/clickhouse/tables/{shard}/{database}/table_name', '{replica}')
ORDER BY x;
```
Run the `CREATE TABLE` query on each replica. This query creates a new replicated table, or adds a new replica to an existing one.
If you add a new replica after the table already contains some data on other replicas, the data will be copied from the other replicas to the new one after running the query. In other words, the new replica syncs itself with the others.

View File

@ -2148,7 +2148,34 @@ Result:
└───────────────┘
```
[Original article](https://clickhouse.tech/docs/en/operations/settings/settings/) <!-- hide -->
## output_format_pretty_row_numbers {#output_format_pretty_row_numbers}
Adds row numbers to output in the [Pretty](../../interfaces/formats.md#pretty) format.
Possible values:
- 0 — Output without row numbers.
- 1 — Output with row numbers.
Default value: `0`.
**Example**
Query:
```sql
SET output_format_pretty_row_numbers = 1;
SELECT TOP 3 name, value FROM system.settings;
```
Result:
```text
┌─name────────────────────┬─value───┐
1. │ min_compress_block_size │ 65536 │
2. │ max_compress_block_size │ 1048576 │
3. │ max_block_size │ 65505 │
└─────────────────────────┴─────────┘
```
## allow_experimental_bigint_types {#allow_experimental_bigint_types}
@ -2160,3 +2187,5 @@ Possible values:
- 0 — The bigint data type is disabled.
Default value: `0`.
[Original article](https://clickhouse.tech/docs/en/operations/settings/settings/) <!-- hide -->

View File

@ -23,8 +23,6 @@ SELECT
└─────────────────────┴────────────┴────────────┴─────────────────────┘
```
Only time zones that differ from UTC by a whole number of hours are supported.
## toTimeZone {#totimezone}
Convert time or date and time to the specified time zone.

View File

@ -13,12 +13,61 @@ Basic query format:
INSERT INTO [db.]table [(c1, c2, c3)] VALUES (v11, v12, v13), (v21, v22, v23), ...
```
The query can specify a list of columns to insert `[(c1, c2, c3)]`. In this case, the rest of the columns are filled with:
You can specify a list of columns to insert using the `(c1, c2, c3)` or `COLUMNS(c1,c2,c3)` syntax.
Instead of listing all the required columns you can use the `(* EXCEPT(column_list))` syntax.
For example, consider the table:
``` sql
SHOW CREATE insert_select_testtable;
```
```
┌─statement────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────┐
│ CREATE TABLE insert_select_testtable
(
`a` Int8,
`b` String,
`c` Int8
)
ENGINE = MergeTree()
ORDER BY a
SETTINGS index_granularity = 8192 │
└──────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────┘
```
``` sql
INSERT INTO insert_select_testtable (*) VALUES (1, 'a', 1) ;
```
If you want to insert data in all the columns, except 'b', you need to pass so many values how many columns you chose in parenthesis then:
``` sql
INSERT INTO insert_select_testtable (* EXCEPT(b)) Values (2, 2);
```
``` sql
SELECT * FROM insert_select_testtable;
```
```
┌─a─┬─b─┬─c─┐
│ 2 │ │ 2 │
└───┴───┴───┘
┌─a─┬─b─┬─c─┐
│ 1 │ a │ 1 │
└───┴───┴───┘
```
In this example, we see that the second inserted row has `a` and `c` columns filled by the passed values, and `b` filled with value by default.
If a list of columns doesn't include all existing columns, the rest of the columns are filled with:
- The values calculated from the `DEFAULT` expressions specified in the table definition.
- Zeros and empty strings, if `DEFAULT` expressions are not defined.
If [strict_insert_defaults=1](../../operations/settings/settings.md), columns that do not have `DEFAULT` defined must be listed in the query.
If [strict\_insert\_defaults=1](../../operations/settings/settings.md), columns that do not have `DEFAULT` defined must be listed in the query.
Data can be passed to the INSERT in any [format](../../interfaces/formats.md#formats) supported by ClickHouse. The format must be specified explicitly in the query:

View File

@ -4,13 +4,17 @@ toc_title: WITH
# WITH Clause {#with-clause}
This section provides support for Common Table Expressions ([CTE](https://en.wikipedia.org/wiki/Hierarchical_and_recursive_queries_in_SQL)), so the results of `WITH` clause can be used in the rest of `SELECT` query.
Clickhouse supports Common Table Expressions ([CTE](https://en.wikipedia.org/wiki/Hierarchical_and_recursive_queries_in_SQL)), that is provides to use results of `WITH` clause in the rest of `SELECT` query. Named subqueries can be included to the current and child query context in places where table objects are allowed. Recursion is prevented by hiding the current level CTEs from the WITH expression.
## Limitations {#limitations}
## Syntax
1. Recursive queries are not supported.
2. When subquery is used inside WITH section, its result should be scalar with exactly one row.
3. Expressions results are not available in subqueries.
``` sql
WITH <expression> AS <identifier>
```
or
``` sql
WITH <identifier> AS <subquery expression>
```
## Examples {#examples}
@ -22,10 +26,10 @@ SELECT *
FROM hits
WHERE
EventDate = toDate(ts_upper_bound) AND
EventTime <= ts_upper_bound
EventTime <= ts_upper_bound;
```
**Example 2:** Evicting sum(bytes) expression result from SELECT clause column list
**Example 2:** Evicting a sum(bytes) expression result from the SELECT clause column list
``` sql
WITH sum(bytes) as s
@ -34,10 +38,10 @@ SELECT
table
FROM system.parts
GROUP BY table
ORDER BY s
ORDER BY s;
```
**Example 3:** Using results of scalar subquery
**Example 3:** Using results of a scalar subquery
``` sql
/* this example would return TOP 10 of most huge tables */
@ -53,27 +57,14 @@ SELECT
FROM system.parts
GROUP BY table
ORDER BY table_disk_usage DESC
LIMIT 10
LIMIT 10;
```
**Example 4:** Re-using expression in subquery
As a workaround for current limitation for expression usage in subqueries, you may duplicate it.
**Example 4:** Reusing expression in a subquery
``` sql
WITH ['hello'] AS hello
SELECT
hello,
*
FROM
(
WITH ['hello'] AS hello
SELECT hello
)
WITH test1 AS (SELECT i + 1, j + 1 FROM test1)
SELECT * FROM test1;
```
``` text
┌─hello─────┬─hello─────┐
│ ['hello'] │ ['hello'] │
└───────────┴───────────┘
```
[Original article](https://clickhouse.tech/docs/en/sql-reference/statements/select/with/) <!--hide-->

View File

@ -149,6 +149,31 @@ CREATE TABLE table_name
При работе с большими кластерами мы рекомендуем использовать подстановки, они уменьшают вероятность ошибки.
Можно указать аргументы по умолчанию для движка реплицируемых таблиц в файле конфигурации сервера.
```xml
<default_replica_path>/clickhouse/tables/{shard}/{database}/{table}</default_replica_path>
<default_replica_name>{replica}</default_replica_path>
```
В этом случае можно опустить аргументы при создании таблиц:
``` sql
CREATE TABLE table_name (
x UInt32
) ENGINE = ReplicatedMergeTree
ORDER BY x;
```
Это будет эквивалентно следующему запросу:
``` sql
CREATE TABLE table_name (
x UInt32
) ENGINE = ReplicatedMergeTree('/clickhouse/tables/{shard}/{database}/table_name', '{replica}')
ORDER BY x;
```
Выполните запрос `CREATE TABLE` на каждой реплике. Запрос создаёт новую реплицируемую таблицу, или добавляет новую реплику к имеющимся.
Если вы добавляете новую реплику после того, как таблица на других репликах уже содержит некоторые данные, то после выполнения запроса, данные на новую реплику будут скачаны с других реплик. То есть, новая реплика синхронизирует себя с остальными.

View File

@ -1977,6 +1977,48 @@ SELECT range(number) FROM system.numbers LIMIT 5 FORMAT PrettyCompactNoEscapes;
└───────────────┘
```
## output_format_pretty_row_numbers {#output_format_pretty_row_numbers}
Включает режим отображения номеров строк для запросов, выводимых в формате [Pretty](../../interfaces/formats.md#pretty).
Возможные значения:
- 0 — номера строк не выводятся.
- 1 — номера строк выводятся.
Значение по умолчанию: `0`.
**Пример**
Запрос:
```sql
SET output_format_pretty_row_numbers = 1;
SELECT TOP 3 name, value FROM system.settings;
```
Результат:
```text
┌─name────────────────────┬─value───┐
1. │ min_compress_block_size │ 65536 │
2. │ max_compress_block_size │ 1048576 │
3. │ max_block_size │ 65505 │
└─────────────────────────┴─────────┘
```
## allow_experimental_bigint_types {#allow_experimental_bigint_types}
Включает или отключает поддержку целочисленных значений, превышающих максимальное значение, допустимое для типа `int`.
Возможные значения:
- 1 — большие целочисленные значения поддерживаются.
- 0 — большие целочисленные значения не поддерживаются.
Значение по умолчанию: `0`.
## lock_acquire_timeout {#lock_acquire_timeout}
Устанавливает, сколько секунд сервер ожидает возможности выполнить блокировку таблицы.

View File

@ -13,7 +13,55 @@ toc_title: INSERT INTO
INSERT INTO [db.]table [(c1, c2, c3)] VALUES (v11, v12, v13), (v21, v22, v23), ...
```
В запросе можно указать список столбцов для вставки `[(c1, c2, c3)]`. В этом случае, в остальные столбцы записываются:
Вы можете указать список столбцов для вставки, используя следующий синтаксис: `(c1, c2, c3)` или `COLUMNS(c1,c2,c3)`.
Можно не перечислять все необходимые столбцы, а использовать синтаксис `(* EXCEPT(column_list))`.
В качестве примера рассмотрим таблицу:
``` sql
SHOW CREATE insert_select_testtable
```
```
┌─statement────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────┐
│ CREATE TABLE insert_select_testtable
(
`a` Int8,
`b` String,
`c` Int8
)
ENGINE = MergeTree()
ORDER BY a
SETTINGS index_granularity = 8192 │
└──────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────┘
```
``` sql
INSERT INTO insert_select_testtable (*) VALUES (1, 'a', 1)
```
Если вы хотите вставить данные во все столбцы, кроме 'b', вам нужно передать столько значений, сколько столбцов вы указали в скобках:
``` sql
INSERT INTO insert_select_testtable (* EXCEPT(b)) Values (2, 2)
```
``` sql
SELECT * FROM insert_select_testtable
```
```
┌─a─┬─b─┬─c─┐
│ 2 │ │ 2 │
└───┴───┴───┘
┌─a─┬─b─┬─c─┐
│ 1 │ a │ 1 │
└───┴───┴───┘
```
В этом примере мы видим, что вторая строка содержит столбцы `a` и `c`, заполненные переданными значениями и `b`, заполненный значением по умолчанию.
Если список столбцов не включает все существующие столбцы, то все остальные столбцы заполняются следующим образом:
- Значения, вычисляемые из `DEFAULT` выражений, указанных в определении таблицы.
- Нули и пустые строки, если `DEFAULT` не определены.

View File

@ -2,18 +2,21 @@
toc_title: WITH
---
# Секция WITH {#sektsiia-with}
# Секция WITH {#with-clause}
Данная секция представляет собой [Common Table Expressions](https://ru.wikipedia.org/wiki/Иерархические_и_рекурсивныеапросы_в_SQL), то есть позволяет использовать результаты выражений из секции `WITH` в остальной части `SELECT` запроса.
Clickhouse поддерживает [Общие табличные выражения](https://ru.wikipedia.org/wiki/Иерархические_и_рекурсивныеапросы_в_SQL), то есть позволяет использовать результаты выражений из секции `WITH` в остальной части `SELECT` запроса. Именованные подзапросы могут быть включены в текущий и дочерний контекст запроса в тех местах, где разрешены табличные объекты. Рекурсия предотвращается путем скрытия общего табличного выражения текущего уровня из выражения `WITH`.
## Синтаксис
``` sql
WITH <expression> AS <identifier>
```
или
``` sql
WITH <identifier> AS <subquery expression>
```
### Ограничения
1. Рекурсивные запросы не поддерживаются
2. Если в качестве выражения используется подзапрос, то результат должен содержать ровно одну строку
3. Результаты выражений нельзя переиспользовать во вложенных запросах
В дальнейшем, результаты выражений можно использовать в секции SELECT.
### Примеры
## Примеры
**Пример 1:** Использование константного выражения как «переменной»
@ -23,7 +26,7 @@ SELECT *
FROM hits
WHERE
EventDate = toDate(ts_upper_bound) AND
EventTime <= ts_upper_bound
EventTime <= ts_upper_bound;
```
**Пример 2:** Выкидывание выражения sum(bytes) из списка колонок в SELECT
@ -35,7 +38,7 @@ SELECT
table
FROM system.parts
GROUP BY table
ORDER BY s
ORDER BY s;
```
**Пример 3:** Использование результатов скалярного подзапроса
@ -54,27 +57,14 @@ SELECT
FROM system.parts
GROUP BY table
ORDER BY table_disk_usage DESC
LIMIT 10
LIMIT 10;
```
**Пример 4:** Переиспользование выражения
В настоящий момент, переиспользование выражения из секции WITH внутри подзапроса возможно только через дублирование.
``` sql
WITH ['hello'] AS hello
SELECT
hello,
*
FROM
(
WITH ['hello'] AS hello
SELECT hello
)
WITH test1 AS (SELECT i + 1, j + 1 FROM test1)
SELECT * FROM test1;
```
``` text
┌─hello─────┬─hello─────┐
│ ['hello'] │ ['hello'] │
└───────────┴───────────┘
```
[Оригинальная статья](https://clickhouse.tech/docs/en/sql-reference/statements/select/with/) <!--hide-->

View File

@ -33,6 +33,6 @@ singledispatch==3.4.0.3
six==1.15.0
soupsieve==2.0.1
termcolor==1.1.0
tornado==5.1.1
tornado==6.1
Unidecode==1.1.1
urllib3==1.25.10

View File

@ -5,7 +5,6 @@ PEERDIR(
clickhouse/src/Common
)
CFLAGS(-g0)
SRCS(
AccessControlManager.cpp

View File

@ -4,7 +4,6 @@ PEERDIR(
clickhouse/src/Common
)
CFLAGS(-g0)
SRCS(
<? find . -name '*.cpp' | sed 's/^\.\// /' | sort ?>

View File

@ -5,7 +5,6 @@ PEERDIR(
clickhouse/src/Common
)
CFLAGS(-g0)
SRCS(
AggregateFunctionAggThrow.cpp

View File

@ -4,7 +4,6 @@ PEERDIR(
clickhouse/src/Common
)
CFLAGS(-g0)
SRCS(
<? find . -name '*.cpp' | grep -v -F tests | grep -v -F GroupBitmap | sed 's/^\.\// /' | sort ?>

View File

@ -6,7 +6,6 @@ PEERDIR(
contrib/libs/poco/NetSSL_OpenSSL
)
CFLAGS(-g0)
SRCS(
Connection.cpp

View File

@ -5,7 +5,6 @@ PEERDIR(
contrib/libs/poco/NetSSL_OpenSSL
)
CFLAGS(-g0)
SRCS(
<? find . -name '*.cpp' | grep -v -F tests | sed 's/^\.\// /' | sort ?>

View File

@ -13,7 +13,6 @@ PEERDIR(
contrib/libs/pdqsort
)
CFLAGS(-g0)
SRCS(
Collator.cpp

View File

@ -172,7 +172,7 @@ protected:
void finalizeQueryProfiler();
void logToQueryThreadLog(QueryThreadLog & thread_log);
void logToQueryThreadLog(QueryThreadLog & thread_log, const String & current_database);
void assertState(const std::initializer_list<int> & permitted_states, const char * description = nullptr) const;

View File

@ -21,7 +21,6 @@ PEERDIR(
INCLUDE(${ARCADIA_ROOT}/clickhouse/cmake/yandex/ya.make.versions.inc)
CFLAGS(-g0)
SRCS(
ActionLock.cpp

View File

@ -20,7 +20,6 @@ PEERDIR(
INCLUDE(${ARCADIA_ROOT}/clickhouse/cmake/yandex/ya.make.versions.inc)
CFLAGS(-g0)
SRCS(
<? find . -name '*.cpp' | grep -v -F tests | sed 's/^\.\// /' | sort ?>

View File

@ -12,7 +12,6 @@ PEERDIR(
contrib/libs/zstd
)
CFLAGS(-g0)
SRCS(
CachedCompressedReadBuffer.cpp

View File

@ -11,7 +11,6 @@ PEERDIR(
contrib/libs/zstd
)
CFLAGS(-g0)
SRCS(
<? find . -name '*.cpp' | grep -v -F tests | sed 's/^\.\// /' | sort ?>

View File

@ -85,6 +85,9 @@ void GTIDSets::update(const GTID & other)
ErrorCodes::LOGICAL_ERROR);
}
/// Try to shirnk Sequence interval.
GTIDSet::tryShirnk(set, i, current);
/// Sequence, extend the interval.
if (other.seq_no == current.end)
{
@ -116,6 +119,16 @@ void GTIDSets::update(const GTID & other)
sets.emplace_back(set);
}
void GTIDSet::tryShirnk(GTIDSet & set, unsigned int i, GTIDSet::Interval & current)
{
if (i != set.intervals.size() -1)
{
auto & next = set.intervals[i+1];
if (current.end == next.start)
set.tryMerge(i);
}
}
String GTIDSets::toString() const
{
WriteBufferFromOwnString buffer;

View File

@ -26,6 +26,8 @@ public:
std::vector<Interval> intervals;
void tryMerge(size_t i);
static void tryShirnk(GTIDSet & set, unsigned int i, Interval & current);
};
class GTIDSets

View File

@ -170,6 +170,8 @@ class IColumn;
M(Milliseconds, read_backoff_min_interval_between_events_ms, 1000, "Settings to reduce the number of threads in case of slow reads. Do not pay attention to the event, if the previous one has passed less than a certain amount of time.", 0) \
M(UInt64, read_backoff_min_events, 2, "Settings to reduce the number of threads in case of slow reads. The number of events after which the number of threads will be reduced.", 0) \
\
M(UInt64, read_backoff_min_concurrency, 1, "Settings to try keeping the minimal number of threads in case of slow reads.", 0) \
\
M(Float, memory_tracker_fault_probability, 0., "For testing of `exception safety` - throw an exception every time you allocate memory with the specified probability.", 0) \
\
M(Bool, enable_http_compression, 0, "Compress the result if the client over HTTP said that it understands data compressed by gzip or deflate.", 0) \

View File

@ -260,6 +260,17 @@ int main(int argc, char ** argv)
"10662d71-9d91-11ea-bbc2-0242ac110003:6-7",
"20662d71-9d91-11ea-bbc2-0242ac110003:9",
"10662d71-9d91-11ea-bbc2-0242ac110003:6-7,20662d71-9d91-11ea-bbc2-0242ac110003:9"},
{"shirnk-sequence",
"10662d71-9d91-11ea-bbc2-0242ac110003:1-3:4-5:7",
"10662d71-9d91-11ea-bbc2-0242ac110003:6",
"10662d71-9d91-11ea-bbc2-0242ac110003:1-7"},
{"shirnk-sequence",
"10662d71-9d91-11ea-bbc2-0242ac110003:1-3:4-5:10",
"10662d71-9d91-11ea-bbc2-0242ac110003:8",
"10662d71-9d91-11ea-bbc2-0242ac110003:1-5:8:10"
}
};
for (auto & tc : cases)

View File

@ -7,7 +7,6 @@ PEERDIR(
contrib/restricted/boost/libs
)
CFLAGS(-g0)
SRCS(
BackgroundSchedulePool.cpp

View File

@ -6,7 +6,6 @@ PEERDIR(
contrib/restricted/boost/libs
)
CFLAGS(-g0)
SRCS(
<? find . -name '*.cpp' | grep -v -F tests | sed 's/^\.\// /' | sort ?>

View File

@ -8,7 +8,6 @@ PEERDIR(
NO_COMPILER_WARNINGS()
CFLAGS(-g0)
SRCS(
AddingDefaultBlockOutputStream.cpp

View File

@ -7,7 +7,6 @@ PEERDIR(
NO_COMPILER_WARNINGS()
CFLAGS(-g0)
SRCS(
<? find . -name '*.cpp' | grep -v -F tests | sed 's/^\.\// /' | sort ?>

View File

@ -6,7 +6,6 @@ PEERDIR(
clickhouse/src/Formats
)
CFLAGS(-g0)
SRCS(
convertMySQLDataType.cpp

View File

@ -5,7 +5,6 @@ PEERDIR(
clickhouse/src/Formats
)
CFLAGS(-g0)
SRCS(
<? find . -name '*.cpp' | grep -v -F tests | sed 's/^\.\// /' | sort ?>

View File

@ -5,7 +5,6 @@ PEERDIR(
clickhouse/src/Common
)
CFLAGS(-g0)
SRCS(
DatabaseAtomic.cpp

View File

@ -4,7 +4,6 @@ PEERDIR(
clickhouse/src/Common
)
CFLAGS(-g0)
SRCS(
<? find . -name '*.cpp' | sed 's/^\.\// /' | sort ?>

View File

@ -1467,7 +1467,6 @@ void SSDComplexKeyCacheDictionary::getItemsNumberImpl(
{
assert(dict_struct.key);
assert(key_columns.size() == key_types.size());
assert(key_columns.size() == dict_struct.key->size());
dict_struct.validateKeyTypes(key_types);

View File

@ -12,7 +12,6 @@ PEERDIR(
NO_COMPILER_WARNINGS()
CFLAGS(-g0)
SRCS(
CacheDictionary.cpp

View File

@ -11,7 +11,6 @@ PEERDIR(
NO_COMPILER_WARNINGS()
CFLAGS(-g0)
SRCS(
<? find . -name '*.cpp' | grep -v -F tests | grep -v -F Trie | sed 's/^\.\// /' | sort ?>

View File

@ -4,7 +4,6 @@ PEERDIR(
clickhouse/src/Common
)
CFLAGS(-g0)
SRCS(
DiskS3.cpp

View File

@ -5,7 +5,6 @@ PEERDIR(
clickhouse/src/Common
)
CFLAGS(-g0)
SRCS(
createVolume.cpp

View File

@ -4,7 +4,6 @@ PEERDIR(
clickhouse/src/Common
)
CFLAGS(-g0)
SRCS(
<? find . -name '*.cpp' | grep -v -F tests | grep -v -F S3 | sed 's/^\.\// /' | sort ?>

View File

@ -7,7 +7,6 @@ PEERDIR(
contrib/libs/protoc
)
CFLAGS(-g0)
SRCS(
FormatFactory.cpp

View File

@ -6,7 +6,6 @@ PEERDIR(
contrib/libs/protoc
)
CFLAGS(-g0)
SRCS(
<? find . -name '*.cpp' | grep -v -F tests | sed 's/^\.\// /' | sort ?>

View File

@ -31,6 +31,7 @@ namespace ErrorCodes
{
extern const int NUMBER_OF_ARGUMENTS_DOESNT_MATCH;
extern const int ILLEGAL_TYPE_OF_ARGUMENT;
extern const int ARGUMENT_OUT_OF_BOUND;
extern const int ILLEGAL_COLUMN;
extern const int BAD_ARGUMENTS;
}
@ -84,6 +85,9 @@ enum class TieBreakingMode
Bankers, // use banker's rounding
};
/// For N, no more than the number of digits in the largest type.
using Scale = Int16;
/** Rounding functions for integer values.
*/
@ -416,7 +420,7 @@ private:
using Container = typename ColumnDecimal<T>::Container;
public:
static NO_INLINE void apply(const Container & in, Container & out, Int64 scale_arg)
static NO_INLINE void apply(const Container & in, Container & out, Scale scale_arg)
{
scale_arg = in.getScale() - scale_arg;
if (scale_arg > 0)
@ -458,7 +462,7 @@ class Dispatcher
FloatRoundingImpl<T, rounding_mode, scale_mode>,
IntegerRoundingImpl<T, rounding_mode, scale_mode, tie_breaking_mode>>;
static ColumnPtr apply(const ColumnVector<T> * col, Int64 scale_arg)
static ColumnPtr apply(const ColumnVector<T> * col, Scale scale_arg)
{
auto col_res = ColumnVector<T>::create();
@ -487,7 +491,7 @@ class Dispatcher
return col_res;
}
static ColumnPtr apply(const ColumnDecimal<T> * col, Int64 scale_arg)
static ColumnPtr apply(const ColumnDecimal<T> * col, Scale scale_arg)
{
const typename ColumnDecimal<T>::Container & vec_src = col->getData();
@ -501,7 +505,7 @@ class Dispatcher
}
public:
static ColumnPtr apply(const IColumn * column, Int64 scale_arg)
static ColumnPtr apply(const IColumn * column, Scale scale_arg)
{
if constexpr (IsNumber<T>)
return apply(checkAndGetColumn<ColumnVector<T>>(column), scale_arg);
@ -544,20 +548,25 @@ public:
return arguments[0];
}
static Int64 getScaleArg(ColumnsWithTypeAndName & arguments)
static Scale getScaleArg(ColumnsWithTypeAndName & arguments)
{
if (arguments.size() == 2)
{
const IColumn & scale_column = *arguments[1].column;
if (!isColumnConst(scale_column))
throw Exception("Scale argument for rounding functions must be constant.", ErrorCodes::ILLEGAL_COLUMN);
throw Exception("Scale argument for rounding functions must be constant", ErrorCodes::ILLEGAL_COLUMN);
Field scale_field = assert_cast<const ColumnConst &>(scale_column).getField();
if (scale_field.getType() != Field::Types::UInt64
&& scale_field.getType() != Field::Types::Int64)
throw Exception("Scale argument for rounding functions must have integer type.", ErrorCodes::ILLEGAL_COLUMN);
throw Exception("Scale argument for rounding functions must have integer type", ErrorCodes::ILLEGAL_COLUMN);
return scale_field.get<Int64>();
Int64 scale64 = scale_field.get<Int64>();
if (scale64 > std::numeric_limits<Scale>::max()
|| scale64 < std::numeric_limits<Scale>::min())
throw Exception("Scale argument for rounding function is too large", ErrorCodes::ARGUMENT_OUT_OF_BOUND);
return scale64;
}
return 0;
}
@ -568,7 +577,7 @@ public:
ColumnPtr executeImpl(ColumnsWithTypeAndName & arguments, const DataTypePtr &, size_t /*input_rows_count*/) const override
{
const ColumnWithTypeAndName & column = arguments[0];
Int64 scale_arg = getScaleArg(arguments);
Scale scale_arg = getScaleArg(arguments);
ColumnPtr res;
auto call = [&](const auto & types) -> bool

View File

@ -32,7 +32,6 @@ PEERDIR(
)
# "Arcadia" build is slightly deficient. It lacks many libraries that we need.
CFLAGS(-g0)
SRCS(
abs.cpp

View File

@ -31,7 +31,6 @@ PEERDIR(
)
# "Arcadia" build is slightly deficient. It lacks many libraries that we need.
CFLAGS(-g0)
SRCS(
<? find . -name '*.cpp' | grep -i -v -P 'tests|Bitmap|sumbur|abtesting' | sed 's/^\.\// /' | sort ?>

View File

@ -480,7 +480,7 @@ void readEscapedString(String & s, ReadBuffer & buf)
}
template void readEscapedStringInto<PaddedPODArray<UInt8>>(PaddedPODArray<UInt8> & s, ReadBuffer & buf);
template void readEscapedStringInto<NullSink>(NullSink & s, ReadBuffer & buf);
template void readEscapedStringInto<NullOutput>(NullOutput & s, ReadBuffer & buf);
/** If enable_sql_style_quoting == true,
@ -562,7 +562,7 @@ void readQuotedStringWithSQLStyle(String & s, ReadBuffer & buf)
template void readQuotedStringInto<true>(PaddedPODArray<UInt8> & s, ReadBuffer & buf);
template void readDoubleQuotedStringInto<false>(NullSink & s, ReadBuffer & buf);
template void readDoubleQuotedStringInto<false>(NullOutput & s, ReadBuffer & buf);
void readDoubleQuotedString(String & s, ReadBuffer & buf)
{
@ -742,7 +742,7 @@ void readJSONString(String & s, ReadBuffer & buf)
template void readJSONStringInto<PaddedPODArray<UInt8>, void>(PaddedPODArray<UInt8> & s, ReadBuffer & buf);
template bool readJSONStringInto<PaddedPODArray<UInt8>, bool>(PaddedPODArray<UInt8> & s, ReadBuffer & buf);
template void readJSONStringInto<NullSink>(NullSink & s, ReadBuffer & buf);
template void readJSONStringInto<NullOutput>(NullOutput & s, ReadBuffer & buf);
template void readJSONStringInto<String>(String & s, ReadBuffer & buf);
@ -891,7 +891,7 @@ void skipJSONField(ReadBuffer & buf, const StringRef & name_of_field)
throw Exception("Unexpected EOF for key '" + name_of_field.toString() + "'", ErrorCodes::INCORRECT_DATA);
else if (*buf.position() == '"') /// skip double-quoted string
{
NullSink sink;
NullOutput sink;
readJSONStringInto(sink, buf);
}
else if (isNumericASCII(*buf.position()) || *buf.position() == '-' || *buf.position() == '+' || *buf.position() == '.') /// skip number
@ -955,7 +955,7 @@ void skipJSONField(ReadBuffer & buf, const StringRef & name_of_field)
// field name
if (*buf.position() == '"')
{
NullSink sink;
NullOutput sink;
readJSONStringInto(sink, buf);
}
else

View File

@ -527,7 +527,7 @@ bool tryReadJSONStringInto(Vector & s, ReadBuffer & buf)
}
/// This could be used as template parameter for functions above, if you want to just skip data.
struct NullSink
struct NullOutput
{
void append(const char *, size_t) {}
void push_back(char) {}

View File

@ -106,6 +106,7 @@ namespace detail
std::vector<Poco::Net::HTTPCookie> cookies;
HTTPHeaderEntries http_header_entries;
RemoteHostFilter remote_host_filter;
std::function<void(size_t)> next_callback;
std::istream * call(const Poco::URI uri_, Poco::Net::HTTPResponse & response)
{
@ -154,6 +155,7 @@ namespace detail
}
public:
using NextCallback = std::function<void(size_t)>;
using OutStreamCallback = std::function<void(std::ostream &)>;
explicit ReadWriteBufferFromHTTPBase(
@ -204,6 +206,8 @@ namespace detail
bool nextImpl() override
{
if (next_callback)
next_callback(count());
if (!impl->next())
return false;
internal_buffer = impl->buffer();
@ -218,6 +222,17 @@ namespace detail
return cookie.getValue();
return def;
}
/// Set function to call on each nextImpl, useful when you need to track
/// progress.
/// NOTE: parameter on each call is not incremental -- it's all bytes count
/// passed through the buffer
void setNextCallback(NextCallback next_callback_)
{
next_callback = next_callback_;
/// Some data maybe already read
next_callback(count());
}
};
}

View File

@ -533,6 +533,10 @@ ReturnType parseDateTimeBestEffortImpl(
}
}
/// If neither Date nor Time is parsed successfully, it should fail
if (!year && !month && !day_of_month && !has_time)
return on_error("Cannot read DateTime: neither Date nor Time was parsed successfully", ErrorCodes::CANNOT_PARSE_DATETIME);
if (!year)
year = 2000;
if (!month)

View File

@ -8,7 +8,6 @@ PEERDIR(
contrib/libs/poco/NetSSL_OpenSSL
)
CFLAGS(-g0)
SRCS(
AIOContextPool.cpp

View File

@ -7,7 +7,6 @@ PEERDIR(
contrib/libs/poco/NetSSL_OpenSSL
)
CFLAGS(-g0)
SRCS(
<? find . -name '*.cpp' | grep -v -F tests | grep -v -P 'S3|HDFS' | sed 's/^\.\// /' | sort ?>

View File

@ -19,6 +19,8 @@
#include <Storages/IStorage.h>
#include <Storages/MarkCache.h>
#include <Storages/MergeTree/MergeList.h>
#include <Storages/MergeTree/ReplicatedFetchList.h>
#include <Storages/MergeTree/MergeTreeData.h>
#include <Storages/MergeTree/MergeTreeSettings.h>
#include <Storages/CompressionCodecSelector.h>
#include <Storages/StorageS3Settings.h>
@ -328,6 +330,7 @@ struct ContextShared
mutable MarkCachePtr mark_cache; /// Cache of marks in compressed files.
ProcessList process_list; /// Executing queries at the moment.
MergeList merge_list; /// The list of executable merge (for (Replicated)?MergeTree)
ReplicatedFetchList replicated_fetch_list;
ConfigurationPtr users_config; /// Config with the users, profiles and quotas sections.
InterserverIOHandler interserver_io_handler; /// Handler for interserver communication.
std::optional<BackgroundSchedulePool> buffer_flush_schedule_pool; /// A thread pool that can do background flush for Buffer tables.
@ -501,6 +504,8 @@ ProcessList & Context::getProcessList() { return shared->process_list; }
const ProcessList & Context::getProcessList() const { return shared->process_list; }
MergeList & Context::getMergeList() { return shared->merge_list; }
const MergeList & Context::getMergeList() const { return shared->merge_list; }
ReplicatedFetchList & Context::getReplicatedFetchList() { return shared->replicated_fetch_list; }
const ReplicatedFetchList & Context::getReplicatedFetchList() const { return shared->replicated_fetch_list; }
void Context::enableNamedSessions()

View File

@ -64,6 +64,7 @@ class ExternalModelsLoader;
class InterserverIOHandler;
class BackgroundSchedulePool;
class MergeList;
class ReplicatedFetchList;
class Cluster;
class Compiler;
class MarkCache;
@ -478,6 +479,9 @@ public:
MergeList & getMergeList();
const MergeList & getMergeList() const;
ReplicatedFetchList & getReplicatedFetchList();
const ReplicatedFetchList & getReplicatedFetchList() const;
/// If the current session is expired at the time of the call, synchronously creates and returns a new session with the startNewSession() call.
/// If no ZooKeeper configured, throws an exception.
std::shared_ptr<zkutil::ZooKeeper> getZooKeeper() const;

View File

@ -250,7 +250,7 @@ BlockIO InterpreterDropQuery::executeToDatabase(const ASTDropQuery & query)
{
if (query.no_delay)
{
for (const auto table_uuid : tables_to_wait)
for (const auto & table_uuid : tables_to_wait)
waitForTableToBeActuallyDroppedOrDetached(query, database, table_uuid);
}
throw;
@ -258,7 +258,7 @@ BlockIO InterpreterDropQuery::executeToDatabase(const ASTDropQuery & query)
if (query.no_delay)
{
for (const auto table_uuid : tables_to_wait)
for (const auto & table_uuid : tables_to_wait)
waitForTableToBeActuallyDroppedOrDetached(query, database, table_uuid);
}
return res;

View File

@ -38,6 +38,7 @@ Block QueryThreadLogElement::createBlock()
{std::make_shared<DataTypeString>(), "thread_name"},
{std::make_shared<DataTypeUInt64>(), "thread_id"},
{std::make_shared<DataTypeUInt64>(), "master_thread_id"},
{std::make_shared<DataTypeString>(), "current_database"},
{std::make_shared<DataTypeString>(), "query"},
{std::make_shared<DataTypeUInt8>(), "is_initial_query"},
@ -91,6 +92,7 @@ void QueryThreadLogElement::appendToBlock(MutableColumns & columns) const
columns[i++]->insert(thread_id);
columns[i++]->insert(master_thread_id);
columns[i++]->insertData(current_database.data(), current_database.size());
columns[i++]->insertData(query.data(), query.size());
QueryLogElement::appendClientInfo(client_info, columns, i);

View File

@ -39,7 +39,9 @@ struct QueryThreadLogElement
UInt64 thread_id{};
UInt64 master_thread_id{};
String current_database;
String query;
ClientInfo client_info;
std::shared_ptr<ProfileEvents::Counters> profile_counters;

View File

@ -243,7 +243,7 @@ void ThreadStatus::finalizePerformanceCounters()
const auto & settings = query_context->getSettingsRef();
if (settings.log_queries && settings.log_query_threads)
if (auto thread_log = global_context->getQueryThreadLog())
logToQueryThreadLog(*thread_log);
logToQueryThreadLog(*thread_log, query_context->getCurrentDatabase());
}
}
catch (...)
@ -322,7 +322,7 @@ void ThreadStatus::detachQuery(bool exit_if_already_detached, bool thread_exits)
#endif
}
void ThreadStatus::logToQueryThreadLog(QueryThreadLog & thread_log)
void ThreadStatus::logToQueryThreadLog(QueryThreadLog & thread_log, const String & current_database)
{
QueryThreadLogElement elem;
@ -350,6 +350,7 @@ void ThreadStatus::logToQueryThreadLog(QueryThreadLog & thread_log)
elem.thread_name = getThreadName();
elem.thread_id = thread_id;
elem.current_database = current_database;
if (thread_group)
{
{

View File

@ -14,7 +14,6 @@ PEERDIR(
NO_COMPILER_WARNINGS()
CFLAGS(-g0)
SRCS(
ActionLocksManager.cpp

View File

@ -13,7 +13,6 @@ PEERDIR(
NO_COMPILER_WARNINGS()
CFLAGS(-g0)
SRCS(
<? find . -name '*.cpp' | grep -v -F tests | grep -v -F JIT | sed 's/^\.\// /' | sort ?>

View File

@ -5,7 +5,6 @@ PEERDIR(
clickhouse/src/Common
)
CFLAGS(-g0)
SRCS(
ASTAlterQuery.cpp

View File

@ -4,7 +4,6 @@ PEERDIR(
clickhouse/src/Common
)
CFLAGS(-g0)
SRCS(
<? find . -name '*.cpp' | grep -v -F tests | sed 's/^\.\// /' | sort ?>

View File

@ -1,4 +1,5 @@
#include "AvroRowInputFormat.h"
#include "DataTypes/DataTypeLowCardinality.h"
#if USE_AVRO
#include <numeric>
@ -174,7 +175,8 @@ static std::string nodeName(avro::NodePtr node)
AvroDeserializer::DeserializeFn AvroDeserializer::createDeserializeFn(avro::NodePtr root_node, DataTypePtr target_type)
{
WhichDataType target(target_type);
const WhichDataType target = removeLowCardinality(target_type);
switch (root_node->type())
{
case avro::AVRO_STRING: [[fallthrough]];
@ -384,7 +386,8 @@ AvroDeserializer::DeserializeFn AvroDeserializer::createDeserializeFn(avro::Node
}
throw Exception(
"Type " + target_type->getName() + " is not compatible with Avro " + avro::toString(root_node->type()) + ":\n" + nodeToJson(root_node),
"Type " + target_type->getName() + " is not compatible with Avro " + avro::toString(root_node->type()) + ":\n"
+ nodeToJson(root_node),
ErrorCodes::ILLEGAL_COLUMN);
}

View File

@ -130,7 +130,7 @@ bool TSKVRowInputFormat::readRow(MutableColumns & columns, RowReadExtension & ex
throw Exception("Unknown field found while parsing TSKV format: " + name_ref.toString(), ErrorCodes::INCORRECT_DATA);
/// If the key is not found, skip the value.
NullSink sink;
NullOutput sink;
readEscapedStringInto(sink, in);
}
else

View File

@ -20,7 +20,7 @@ namespace ErrorCodes
static void skipTSVRow(ReadBuffer & in, const size_t num_columns)
{
NullSink null_sink;
NullOutput null_sink;
for (size_t i = 0; i < num_columns; ++i)
{
@ -196,7 +196,7 @@ bool TabSeparatedRowInputFormat::readRow(MutableColumns & columns, RowReadExtens
}
else
{
NullSink null_sink;
NullOutput null_sink;
readEscapedStringInto(null_sink, in);
}
@ -353,7 +353,7 @@ void TabSeparatedRowInputFormat::tryDeserializeField(const DataTypePtr & type, I
}
else
{
NullSink null_sink;
NullOutput null_sink;
readEscapedStringInto(null_sink, in);
}
}

View File

@ -7,7 +7,6 @@ PEERDIR(
contrib/libs/protobuf
)
CFLAGS(-g0)
SRCS(
Chunk.cpp

View File

@ -6,7 +6,6 @@ PEERDIR(
contrib/libs/protobuf
)
CFLAGS(-g0)
SRCS(
<? find . -name '*.cpp' | grep -v -F tests | grep -v -P 'Arrow|Avro|ORC|Parquet|CapnProto' | sed 's/^\.\// /' | sort ?>

View File

@ -6,7 +6,6 @@ PEERDIR(
contrib/libs/poco/Util
)
CFLAGS(-g0)
SRCS(
HTTPHandler.cpp

View File

@ -5,7 +5,6 @@ PEERDIR(
contrib/libs/poco/Util
)
CFLAGS(-g0)
SRCS(
<? find . -name '*.cpp' | sed 's/^\.\// /' | sort ?>

View File

@ -0,0 +1,91 @@
#pragma once
#include <Common/CurrentMetrics.h>
#include <memory>
#include <list>
#include <mutex>
#include <atomic>
namespace DB
{
/// Common code for background processes lists, like system.merges and system.replicated_fetches
/// Look at examples in MergeList and ReplicatedFetchList
template <typename ListElement, typename Info>
class BackgroundProcessList;
template <typename ListElement, typename Info>
class BackgroundProcessListEntry
{
BackgroundProcessList<ListElement, Info> & list;
using container_t = std::list<ListElement>;
typename container_t::iterator it;
CurrentMetrics::Increment metric_increment;
public:
BackgroundProcessListEntry(const BackgroundProcessListEntry &) = delete;
BackgroundProcessListEntry & operator=(const BackgroundProcessListEntry &) = delete;
BackgroundProcessListEntry(BackgroundProcessList<ListElement, Info> & list_, const typename container_t::iterator it_, const CurrentMetrics::Metric & metric)
: list(list_), it{it_}, metric_increment{metric}
{
list.onEntryCreate(*this);
}
~BackgroundProcessListEntry()
{
std::lock_guard lock{list.mutex};
list.onEntryDestroy(*this);
list.entries.erase(it);
}
ListElement * operator->() { return &*it; }
const ListElement * operator->() const { return &*it; }
};
template <typename ListElement, typename Info>
class BackgroundProcessList
{
protected:
friend class BackgroundProcessListEntry<ListElement, Info>;
using container_t = std::list<ListElement>;
using info_container_t = std::list<Info>;
mutable std::mutex mutex;
container_t entries;
CurrentMetrics::Metric metric;
BackgroundProcessList(const CurrentMetrics::Metric & metric_)
: metric(metric_)
{}
public:
using Entry = BackgroundProcessListEntry<ListElement, Info>;
using EntryPtr = std::unique_ptr<Entry>;
template <typename... Args>
EntryPtr insert(Args &&... args)
{
std::lock_guard lock{mutex};
auto entry = std::make_unique<Entry>(*this, entries.emplace(entries.end(), std::forward<Args>(args)...), metric);
return entry;
}
info_container_t get() const
{
std::lock_guard lock{mutex};
info_container_t res;
for (const auto & list_element : entries)
res.emplace_back(list_element.getInfo());
return res;
}
virtual void onEntryCreate(const Entry & /* entry */) {}
virtual void onEntryDestroy(const Entry & /* entry */) {}
virtual inline ~BackgroundProcessList() {}
};
}

View File

@ -12,12 +12,12 @@
#include <Poco/File.h>
#include <Poco/Net/HTTPServerResponse.h>
#include <Poco/Net/HTTPRequest.h>
#include <Storages/MergeTree/ReplicatedFetchList.h>
namespace CurrentMetrics
{
extern const Metric ReplicatedSend;
extern const Metric ReplicatedFetch;
}
namespace DB
@ -52,6 +52,30 @@ std::string getEndpointId(const std::string & node_id)
return "DataPartsExchange:" + node_id;
}
/// Simple functor for tracking fetch progress in system.replicated_fetches table.
struct ReplicatedFetchReadCallback
{
ReplicatedFetchList::Entry & replicated_fetch_entry;
explicit ReplicatedFetchReadCallback(ReplicatedFetchList::Entry & replicated_fetch_entry_)
: replicated_fetch_entry(replicated_fetch_entry_)
{}
void operator() (size_t bytes_count)
{
replicated_fetch_entry->bytes_read_compressed.store(bytes_count, std::memory_order_relaxed);
/// It's possible when we fetch part from very old clickhouse version
/// which doesn't send total size.
if (replicated_fetch_entry->total_size_bytes_compressed != 0)
{
replicated_fetch_entry->progress.store(
static_cast<double>(bytes_count) / replicated_fetch_entry->total_size_bytes_compressed,
std::memory_order_relaxed);
}
}
};
}
std::string Service::getId(const std::string & node_id) const
@ -228,7 +252,7 @@ MergeTreeData::MutableDataPartPtr Fetcher::fetchPart(
throw Exception("Fetching of part was cancelled", ErrorCodes::ABORTED);
/// Validation of the input that may come from malicious replica.
MergeTreePartInfo::fromPartName(part_name, data.format_version);
auto part_info = MergeTreePartInfo::fromPartName(part_name, data.format_version);
const auto data_settings = data.getSettings();
Poco::URI uri;
@ -294,6 +318,15 @@ MergeTreeData::MutableDataPartPtr Fetcher::fetchPart(
if (server_protocol_version >= REPLICATION_PROTOCOL_VERSION_WITH_PARTS_TYPE)
readStringBinary(part_type, in);
auto storage_id = data.getStorageID();
String new_part_path = part_type == "InMemory" ? "memory" : data.getFullPathOnDisk(reservation->getDisk()) + part_name + "/";
auto entry = data.global_context.getReplicatedFetchList().insert(
storage_id.getDatabaseName(), storage_id.getTableName(),
part_info.partition_id, part_name, new_part_path,
replica_path, uri, to_detached, sum_files_size);
in.setNextCallback(ReplicatedFetchReadCallback(*entry));
return part_type == "InMemory" ? downloadPartToMemory(part_name, metadata_snapshot, std::move(reservation), in)
: downloadPartToDisk(part_name, replica_path, to_detached, tmp_prefix_, sync, std::move(reservation), in);
}
@ -352,8 +385,6 @@ MergeTreeData::MutableDataPartPtr Fetcher::downloadPartToDisk(
if (disk->exists(part_download_path))
throw Exception("Directory " + fullPath(disk, part_download_path) + " already exists.", ErrorCodes::DIRECTORY_ALREADY_EXISTS);
CurrentMetrics::Increment metric_increment{CurrentMetrics::ReplicatedFetch};
disk->createDirectories(part_download_path);
std::optional<FileSyncGuard> sync_guard;

View File

@ -1,21 +1,20 @@
#pragma once
#include <Core/Names.h>
#include <Core/Field.h>
#include <Common/Stopwatch.h>
#include <Common/CurrentMetrics.h>
#include <Common/MemoryTracker.h>
#include <Storages/MergeTree/MergeAlgorithm.h>
#include <Storages/MergeTree/MergeTreeData.h>
#include <Storages/MergeTree/MergeType.h>
#include <Storages/MergeTree/MergeAlgorithm.h>
#include <Storages/MergeTree/BackgroundProcessList.h>
#include <boost/noncopyable.hpp>
#include <memory>
#include <list>
#include <mutex>
#include <atomic>
/** Maintains a list of currently running merges.
* For implementation of system.merges table.
*/
namespace CurrentMetrics
{
extern const Metric Merge;
@ -102,68 +101,37 @@ struct MergeListElement : boost::noncopyable
~MergeListElement();
};
using MergeListEntry = BackgroundProcessListEntry<MergeListElement, MergeInfo>;
class MergeList;
class MergeListEntry
/** Maintains a list of currently running merges.
* For implementation of system.merges table.
*/
class MergeList final : public BackgroundProcessList<MergeListElement, MergeInfo>
{
MergeList & list;
using container_t = std::list<MergeListElement>;
container_t::iterator it;
CurrentMetrics::Increment num_merges {CurrentMetrics::Merge};
public:
MergeListEntry(const MergeListEntry &) = delete;
MergeListEntry & operator=(const MergeListEntry &) = delete;
MergeListEntry(MergeList & list_, const container_t::iterator it_) : list(list_), it{it_} {}
~MergeListEntry();
MergeListElement * operator->() { return &*it; }
const MergeListElement * operator->() const { return &*it; }
};
class MergeList
{
friend class MergeListEntry;
using container_t = std::list<MergeListElement>;
using info_container_t = std::list<MergeInfo>;
mutable std::mutex mutex;
container_t merges;
private:
using Parent = BackgroundProcessList<MergeListElement, MergeInfo>;
std::atomic<size_t> merges_with_ttl_counter = 0;
public:
using Entry = MergeListEntry;
using EntryPtr = std::unique_ptr<Entry>;
MergeList()
: Parent(CurrentMetrics::Merge)
{}
template <typename... Args>
EntryPtr insert(Args &&... args)
void onEntryCreate(const Parent::Entry & entry) override
{
std::lock_guard lock{mutex};
auto entry = std::make_unique<Entry>(*this, merges.emplace(merges.end(), std::forward<Args>(args)...));
if (isTTLMergeType((*entry)->merge_type))
if (isTTLMergeType(entry->merge_type))
++merges_with_ttl_counter;
return entry;
}
info_container_t get() const
void onEntryDestroy(const Parent::Entry & entry) override
{
std::lock_guard lock{mutex};
info_container_t res;
for (const auto & merge_element : merges)
res.emplace_back(merge_element.getInfo());
return res;
if (isTTLMergeType(entry->merge_type))
--merges_with_ttl_counter;
}
void cancelPartMutations(const String & partition_id, Int64 mutation_version)
{
std::lock_guard lock{mutex};
for (auto & merge_element : merges)
for (auto & merge_element : entries)
{
if ((partition_id.empty() || merge_element.partition_id == partition_id)
&& merge_element.source_data_version < mutation_version
@ -178,16 +146,4 @@ public:
}
};
inline MergeListEntry::~MergeListEntry()
{
std::lock_guard lock{list.mutex};
if (isTTLMergeType(it->merge_type))
--list.merges_with_ttl_counter;
list.merges.erase(it);
}
}

View File

@ -35,7 +35,6 @@
namespace DB
{
class MergeListEntry;
class AlterCommands;
class MergeTreePartsMover;
class MutationCommands;

View File

@ -13,7 +13,6 @@
namespace DB
{
class MergeListEntry;
class MergeProgressCallback;
/// Auxiliary struct holding metainformation for the future merged or mutated part.

View File

@ -898,7 +898,7 @@ Pipe MergeTreeDataSelectExecutor::spreadMarkRangesAmongStreams(
num_streams,
sum_marks,
min_marks_for_concurrent_read,
parts,
std::move(parts),
data,
metadata_snapshot,
query_info.prewhere_info,

View File

@ -21,7 +21,7 @@ MergeTreeReadPool::MergeTreeReadPool(
const size_t threads_,
const size_t sum_marks_,
const size_t min_marks_for_concurrent_read_,
RangesInDataParts parts_,
RangesInDataParts && parts_,
const MergeTreeData & data_,
const StorageMetadataPtr & metadata_snapshot_,
const PrewhereInfoPtr & prewhere_info_,
@ -38,11 +38,11 @@ MergeTreeReadPool::MergeTreeReadPool(
, do_not_steal_tasks{do_not_steal_tasks_}
, predict_block_size_bytes{preferred_block_size_bytes_ > 0}
, prewhere_info{prewhere_info_}
, parts_ranges{parts_}
, parts_ranges{std::move(parts_)}
{
/// parts don't contain duplicate MergeTreeDataPart's.
const auto per_part_sum_marks = fillPerPartInfo(parts_, check_columns_);
fillPerThreadInfo(threads_, sum_marks_, per_part_sum_marks, parts_, min_marks_for_concurrent_read_);
const auto per_part_sum_marks = fillPerPartInfo(parts_ranges, check_columns_);
fillPerThreadInfo(threads_, sum_marks_, per_part_sum_marks, parts_ranges, min_marks_for_concurrent_read_);
}
@ -62,7 +62,24 @@ MergeTreeReadTaskPtr MergeTreeReadPool::getTask(const size_t min_marks_to_read,
return nullptr;
/// Steal task if nothing to do and it's not prohibited
const auto thread_idx = tasks_remaining_for_this_thread ? thread : *std::begin(remaining_thread_tasks);
auto thread_idx = thread;
if (!tasks_remaining_for_this_thread)
{
auto it = remaining_thread_tasks.lower_bound(backoff_state.current_threads);
// Grab the entire tasks of a thread which is killed by backoff
if (it != remaining_thread_tasks.end())
{
threads_tasks[thread] = std::move(threads_tasks[*it]);
remaining_thread_tasks.erase(it);
}
else // Try steal tasks from the next thread
{
it = remaining_thread_tasks.upper_bound(thread);
if (it == remaining_thread_tasks.end())
it = remaining_thread_tasks.begin();
thread_idx = *it;
}
}
auto & thread_tasks = threads_tasks[thread_idx];
auto & thread_task = thread_tasks.parts_and_ranges.back();
@ -163,7 +180,7 @@ void MergeTreeReadPool::profileFeedback(const ReadBufferFromFileBase::ProfileInf
std::lock_guard lock(mutex);
if (backoff_state.current_threads <= 1)
if (backoff_state.current_threads <= backoff_settings.min_concurrency)
return;
size_t throughput = info.bytes_read * 1000000000 / info.nanoseconds;
@ -194,14 +211,14 @@ void MergeTreeReadPool::profileFeedback(const ReadBufferFromFileBase::ProfileInf
std::vector<size_t> MergeTreeReadPool::fillPerPartInfo(
RangesInDataParts & parts, const bool check_columns)
const RangesInDataParts & parts, const bool check_columns)
{
std::vector<size_t> per_part_sum_marks;
Block sample_block = metadata_snapshot->getSampleBlock();
for (const auto i : ext::range(0, parts.size()))
{
auto & part = parts[i];
const auto & part = parts[i];
/// Read marks for every data part.
size_t sum_marks = 0;
@ -238,21 +255,53 @@ std::vector<size_t> MergeTreeReadPool::fillPerPartInfo(
void MergeTreeReadPool::fillPerThreadInfo(
const size_t threads, const size_t sum_marks, std::vector<size_t> per_part_sum_marks,
RangesInDataParts & parts, const size_t min_marks_for_concurrent_read)
const RangesInDataParts & parts, const size_t min_marks_for_concurrent_read)
{
threads_tasks.resize(threads);
if (parts.empty())
return;
struct PartInfo
{
RangesInDataPart part;
size_t sum_marks;
size_t part_idx;
};
using PartsInfo = std::vector<PartInfo>;
std::queue<PartsInfo> parts_queue;
{
/// Group parts by disk name.
/// We try minimize the number of threads concurrently read from the same disk.
/// It improves the performance for JBOD architecture.
std::map<String, std::vector<PartInfo>> parts_per_disk;
for (size_t i = 0; i < parts.size(); ++i)
{
PartInfo part_info{parts[i], per_part_sum_marks[i], i};
if (parts[i].data_part->isStoredOnDisk())
parts_per_disk[parts[i].data_part->volume->getDisk()->getName()].push_back(std::move(part_info));
else
parts_per_disk[""].push_back(std::move(part_info));
}
for (auto & info : parts_per_disk)
parts_queue.push(std::move(info.second));
}
const size_t min_marks_per_thread = (sum_marks - 1) / threads + 1;
for (size_t i = 0; i < threads && !parts.empty(); ++i)
for (size_t i = 0; i < threads && !parts_queue.empty(); ++i)
{
auto need_marks = min_marks_per_thread;
while (need_marks > 0 && !parts.empty())
while (need_marks > 0 && !parts_queue.empty())
{
const auto part_idx = parts.size() - 1;
RangesInDataPart & part = parts.back();
size_t & marks_in_part = per_part_sum_marks.back();
auto & current_parts = parts_queue.front();
RangesInDataPart & part = current_parts.back().part;
size_t & marks_in_part = current_parts.back().sum_marks;
const auto part_idx = current_parts.back().part_idx;
/// Do not get too few rows from part.
if (marks_in_part >= min_marks_for_concurrent_read &&
@ -274,8 +323,9 @@ void MergeTreeReadPool::fillPerThreadInfo(
marks_in_ranges = marks_in_part;
need_marks -= marks_in_part;
parts.pop_back();
per_part_sum_marks.pop_back();
current_parts.pop_back();
if (current_parts.empty())
parts_queue.pop();
}
else
{
@ -304,6 +354,17 @@ void MergeTreeReadPool::fillPerThreadInfo(
if (marks_in_ranges != 0)
remaining_thread_tasks.insert(i);
}
/// Before processing next thread, change disk if possible.
/// Different threads will likely start reading from different disk,
/// which may improve read parallelism for JBOD.
/// It also may be helpful in case we have backoff threads.
/// Backoff threads will likely to reduce load for different disks, not the same one.
if (parts_queue.size() > 1)
{
parts_queue.push(std::move(parts_queue.front()));
parts_queue.pop();
}
}
}

View File

@ -36,13 +36,16 @@ public:
size_t min_interval_between_events_ms = 1000;
/// Number of events to do backoff - to lower number of threads in pool.
size_t min_events = 2;
/// Try keeping the minimal number of threads in pool.
size_t min_concurrency = 1;
/// Constants above is just an example.
BackoffSettings(const Settings & settings)
: min_read_latency_ms(settings.read_backoff_min_latency_ms.totalMilliseconds()),
max_throughput(settings.read_backoff_max_throughput),
min_interval_between_events_ms(settings.read_backoff_min_interval_between_events_ms.totalMilliseconds()),
min_events(settings.read_backoff_min_events)
min_events(settings.read_backoff_min_events),
min_concurrency(settings.read_backoff_min_concurrency)
{
}
@ -68,7 +71,7 @@ private:
public:
MergeTreeReadPool(
const size_t threads_, const size_t sum_marks_, const size_t min_marks_for_concurrent_read_,
RangesInDataParts parts_, const MergeTreeData & data_, const StorageMetadataPtr & metadata_snapshot_, const PrewhereInfoPtr & prewhere_info_,
RangesInDataParts && parts_, const MergeTreeData & data_, const StorageMetadataPtr & metadata_snapshot_, const PrewhereInfoPtr & prewhere_info_,
const bool check_columns_, const Names & column_names_,
const BackoffSettings & backoff_settings_, size_t preferred_block_size_bytes_,
const bool do_not_steal_tasks_ = false);
@ -88,11 +91,11 @@ public:
private:
std::vector<size_t> fillPerPartInfo(
RangesInDataParts & parts, const bool check_columns);
const RangesInDataParts & parts, const bool check_columns);
void fillPerThreadInfo(
const size_t threads, const size_t sum_marks, std::vector<size_t> per_part_sum_marks,
RangesInDataParts & parts, const size_t min_marks_for_concurrent_read);
const RangesInDataParts & parts, const size_t min_marks_for_concurrent_read);
const MergeTreeData & data;
StorageMetadataPtr metadata_snapshot;

View File

@ -0,0 +1,52 @@
#include <Storages/MergeTree/ReplicatedFetchList.h>
#include <Common/CurrentMetrics.h>
#include <common/getThreadId.h>
namespace DB
{
ReplicatedFetchListElement::ReplicatedFetchListElement(
const std::string & database_, const std::string & table_,
const std::string & partition_id_, const std::string & result_part_name_,
const std::string & result_part_path_, const std::string & source_replica_path_,
const Poco::URI & uri_, UInt8 to_detached_, UInt64 total_size_bytes_compressed_)
: database(database_)
, table(table_)
, partition_id(partition_id_)
, result_part_name(result_part_name_)
, result_part_path(result_part_path_)
, source_replica_path(source_replica_path_)
, source_replica_hostname(uri_.getHost())
, source_replica_port(uri_.getPort())
, interserver_scheme(uri_.getScheme())
, uri(uri_.toString())
, to_detached(to_detached_)
, total_size_bytes_compressed(total_size_bytes_compressed_)
, thread_id(getThreadId())
{
}
ReplicatedFetchInfo ReplicatedFetchListElement::getInfo() const
{
ReplicatedFetchInfo res;
res.database = database;
res.table = table;
res.partition_id = partition_id;
res.result_part_name = result_part_name;
res.result_part_path = result_part_path;
res.source_replica_path = source_replica_path;
res.source_replica_hostname = source_replica_hostname;
res.source_replica_port = source_replica_port;
res.interserver_scheme = interserver_scheme;
res.uri = uri;
res.interserver_scheme = interserver_scheme;
res.to_detached = to_detached;
res.elapsed = watch.elapsedSeconds();
res.progress = progress.load(std::memory_order_relaxed);
res.bytes_read_compressed = bytes_read_compressed.load(std::memory_order_relaxed);
res.total_size_bytes_compressed = total_size_bytes_compressed;
res.thread_id = thread_id;
return res;
}
}

View File

@ -0,0 +1,96 @@
#pragma once
#include <Common/CurrentMetrics.h>
#include <boost/noncopyable.hpp>
#include <Storages/MergeTree/BackgroundProcessList.h>
#include <Common/Stopwatch.h>
#include <Common/MemoryTracker.h>
#include <Poco/URI.h>
namespace CurrentMetrics
{
extern const Metric ReplicatedFetch;
}
namespace DB
{
struct ReplicatedFetchInfo
{
std::string database;
std::string table;
std::string partition_id;
std::string result_part_name;
std::string result_part_path;
std::string source_replica_path;
std::string source_replica_hostname;
UInt16 source_replica_port;
std::string interserver_scheme;
std::string uri;
UInt8 to_detached;
Float64 elapsed;
Float64 progress;
UInt64 total_size_bytes_compressed;
UInt64 bytes_read_compressed;
UInt64 thread_id;
};
struct ReplicatedFetchListElement : private boost::noncopyable
{
const std::string database;
const std::string table;
const std::string partition_id;
const std::string result_part_name;
const std::string result_part_path;
const std::string source_replica_path;
const std::string source_replica_hostname;
const UInt16 source_replica_port;
const std::string interserver_scheme;
const std::string uri;
const UInt8 to_detached;
Stopwatch watch;
std::atomic<Float64> progress{};
/// How many bytes already read
std::atomic<UInt64> bytes_read_compressed{};
/// Total bytes to read
/// NOTE: can be zero if we fetching data from old server.
/// In this case progress is not tracked.
const UInt64 total_size_bytes_compressed{};
const UInt64 thread_id;
ReplicatedFetchListElement(
const std::string & database_, const std::string & table_,
const std::string & partition_id_, const std::string & result_part_name_,
const std::string & result_part_path_, const std::string & source_replica_path_,
const Poco::URI & uri, UInt8 to_detached_, UInt64 total_size_bytes_compressed_);
ReplicatedFetchInfo getInfo() const;
};
using ReplicatedFetchListEntry = BackgroundProcessListEntry<ReplicatedFetchListElement, ReplicatedFetchInfo>;
/// List of currently processing replicated fetches
class ReplicatedFetchList final : public BackgroundProcessList<ReplicatedFetchListElement, ReplicatedFetchInfo>
{
private:
using Parent = BackgroundProcessList<ReplicatedFetchListElement, ReplicatedFetchInfo>;
public:
ReplicatedFetchList ()
: Parent(CurrentMetrics::ReplicatedFetch)
{}
};
}

View File

@ -964,13 +964,16 @@ void ReplicatedMergeTreeQueue::checkThereAreNoConflictsInRange(const MergeTreePa
}
bool ReplicatedMergeTreeQueue::isNotCoveredByFuturePartsImpl(const String & new_part_name, String & out_reason, std::lock_guard<std::mutex> & /* queue_lock */) const
bool ReplicatedMergeTreeQueue::isNotCoveredByFuturePartsImpl(const String & log_entry_name, const String & new_part_name,
String & out_reason, std::lock_guard<std::mutex> & /* queue_lock */) const
{
/// Let's check if the same part is now being created by another action.
if (future_parts.count(new_part_name))
{
out_reason = "Not executing log entry for part " + new_part_name
+ " because another log entry for the same part is being processed. This shouldn't happen often.";
const char * format_str = "Not executing log entry {} for part {} "
"because another log entry for the same part is being processed. This shouldn't happen often.";
LOG_INFO(log, format_str, log_entry_name, new_part_name);
out_reason = fmt::format(format_str, log_entry_name, new_part_name);
return false;
/** When the corresponding action is completed, then `isNotCoveredByFuturePart` next time, will succeed,
@ -991,8 +994,10 @@ bool ReplicatedMergeTreeQueue::isNotCoveredByFuturePartsImpl(const String & new_
if (future_part.contains(result_part))
{
out_reason = "Not executing log entry for part " + new_part_name + " because it is covered by part "
+ future_part_elem.first + " that is currently executing";
const char * format_str = "Not executing log entry {} for part {} "
"because it is covered by part {} that is currently executing.";
LOG_TRACE(log, format_str, log_entry_name, new_part_name, future_part_elem.first);
out_reason = fmt::format(format_str, log_entry_name, new_part_name, future_part_elem.first);
return false;
}
}
@ -1004,7 +1009,7 @@ bool ReplicatedMergeTreeQueue::addFuturePartIfNotCoveredByThem(const String & pa
{
std::lock_guard lock(state_mutex);
if (isNotCoveredByFuturePartsImpl(part_name, reject_reason, lock))
if (isNotCoveredByFuturePartsImpl(entry.znode_name, part_name, reject_reason, lock))
{
CurrentlyExecuting::setActualPartName(entry, part_name, *this);
return true;
@ -1029,12 +1034,8 @@ bool ReplicatedMergeTreeQueue::shouldExecuteLogEntry(
{
for (const String & new_part_name : entry.getBlockingPartNames())
{
if (!isNotCoveredByFuturePartsImpl(new_part_name, out_postpone_reason, state_lock))
{
if (!out_postpone_reason.empty())
LOG_DEBUG(log, out_postpone_reason);
if (!isNotCoveredByFuturePartsImpl(entry.znode_name, new_part_name, out_postpone_reason, state_lock))
return false;
}
}
}
@ -1059,10 +1060,11 @@ bool ReplicatedMergeTreeQueue::shouldExecuteLogEntry(
{
if (future_parts.count(name))
{
String reason = "Not merging into part " + entry.new_part_name
+ " because part " + name + " is not ready yet (log entry for that part is being processed).";
LOG_TRACE(log, reason);
out_postpone_reason = reason;
const char * format_str = "Not executing log entry {} of type {} for part {} "
"because part {} is not ready yet (log entry for that part is being processed).";
LOG_TRACE(log, format_str, entry.znode_name, entry.typeToString(), entry.new_part_name, name);
/// Copy-paste of above because we need structured logging (instead of already formatted message).
out_postpone_reason = fmt::format(format_str, entry.znode_name, entry.typeToString(), entry.new_part_name, name);
return false;
}
@ -1078,9 +1080,9 @@ bool ReplicatedMergeTreeQueue::shouldExecuteLogEntry(
if (merger_mutator.merges_blocker.isCancelled())
{
String reason = "Not executing log entry for part " + entry.new_part_name + " because merges and mutations are cancelled now.";
LOG_DEBUG(log, reason);
out_postpone_reason = reason;
const char * format_str = "Not executing log entry {} of type {} for part {} because merges and mutations are cancelled now.";
LOG_DEBUG(log, format_str, entry.znode_name, entry.typeToString(), entry.new_part_name);
out_postpone_reason = fmt::format(format_str, entry.znode_name, entry.typeToString(), entry.new_part_name);
return false;
}
@ -1102,17 +1104,19 @@ bool ReplicatedMergeTreeQueue::shouldExecuteLogEntry(
{
if (merger_mutator.ttl_merges_blocker.isCancelled())
{
String reason = "Not executing log entry for part " + entry.new_part_name + " because merges with TTL are cancelled now.";
LOG_DEBUG(log, reason);
out_postpone_reason = reason;
const char * format_str = "Not executing log entry {} for part {} because merges with TTL are cancelled now.";
LOG_DEBUG(log, format_str,
entry.znode_name, entry.new_part_name);
out_postpone_reason = fmt::format(format_str, entry.znode_name, entry.new_part_name);
return false;
}
size_t total_merges_with_ttl = data.getTotalMergesWithTTLInMergeList();
if (total_merges_with_ttl >= data_settings->max_number_of_merges_with_ttl_in_pool)
{
const char * format_str = "Not executing log entry for part {}"
const char * format_str = "Not executing log entry {} for part {}"
" because {} merges with TTL already executing, maximum {}.";
LOG_DEBUG(log, format_str, entry.new_part_name, total_merges_with_ttl,
LOG_DEBUG(log, format_str, entry.znode_name,
entry.new_part_name, total_merges_with_ttl,
data_settings->max_number_of_merges_with_ttl_in_pool);
out_postpone_reason = fmt::format(format_str, entry.new_part_name, total_merges_with_ttl,
@ -1124,15 +1128,14 @@ bool ReplicatedMergeTreeQueue::shouldExecuteLogEntry(
if (!ignore_max_size && sum_parts_size_in_bytes > max_source_parts_size)
{
const char * format_str = "Not executing log entry {} for part {}"
const char * format_str = "Not executing log entry {} of type {} for part {}"
" because source parts size ({}) is greater than the current maximum ({}).";
LOG_DEBUG(log, format_str,
LOG_DEBUG(log, format_str, entry.znode_name,
entry.typeToString(), entry.new_part_name,
ReadableSize(sum_parts_size_in_bytes), ReadableSize(max_source_parts_size));
/// Copy-paste of above because we need structured logging (instead of already formatted message).
out_postpone_reason = fmt::format(format_str,
out_postpone_reason = fmt::format(format_str, entry.znode_name,
entry.typeToString(), entry.new_part_name,
ReadableSize(sum_parts_size_in_bytes), ReadableSize(max_source_parts_size));
@ -1147,9 +1150,9 @@ bool ReplicatedMergeTreeQueue::shouldExecuteLogEntry(
if (!alter_sequence.canExecuteMetaAlter(entry.alter_version, state_lock))
{
int head_alter = alter_sequence.getHeadAlterVersion(state_lock);
out_postpone_reason = "Cannot execute alter metadata with version: " + std::to_string(entry.alter_version)
+ " because another alter " + std::to_string(head_alter)
+ " must be executed before";
const char * format_str = "Cannot execute alter metadata {} with version {} because another alter {} must be executed before";
LOG_TRACE(log, format_str, entry.znode_name, entry.alter_version, head_alter);
out_postpone_reason = fmt::format(format_str, entry.znode_name, entry.alter_version, head_alter);
return false;
}
}
@ -1161,11 +1164,17 @@ bool ReplicatedMergeTreeQueue::shouldExecuteLogEntry(
{
int head_alter = alter_sequence.getHeadAlterVersion(state_lock);
if (head_alter == entry.alter_version)
out_postpone_reason = "Cannot execute alter data with version: "
+ std::to_string(entry.alter_version) + " because metadata still not altered";
{
const char * format_str = "Cannot execute alter data {} with version {} because metadata still not altered";
LOG_TRACE(log, format_str, entry.znode_name, entry.alter_version);
out_postpone_reason = fmt::format(format_str, entry.znode_name, entry.alter_version);
}
else
out_postpone_reason = "Cannot execute alter data with version: " + std::to_string(entry.alter_version)
+ " because another alter " + std::to_string(head_alter) + " must be executed before";
{
const char * format_str = "Cannot execute alter data {} with version {} because another alter {} must be executed before";
LOG_TRACE(log, format_str, entry.znode_name, entry.alter_version, head_alter);
out_postpone_reason = fmt::format(format_str, entry.znode_name, entry.alter_version, head_alter);
}
return false;
}
@ -1178,7 +1187,14 @@ bool ReplicatedMergeTreeQueue::shouldExecuteLogEntry(
/// Deadlock is possible if multiple DROP/REPLACE RANGE entries are executing in parallel and wait each other.
/// See also removePartProducingOpsInRange(...) and ReplicatedMergeTreeQueue::CurrentlyExecuting.
if (currently_executing_drop_or_replace_range)
{
const char * format_str = "Not executing log entry {} of type {} for part {} "
"because another DROP_RANGE or REPLACE_RANGE entry are currently executing.";
LOG_TRACE(log, format_str, entry.znode_name, entry.typeToString(), entry.new_part_name);
out_postpone_reason = fmt::format(format_str, entry.znode_name, entry.typeToString(), entry.new_part_name);
return false;
}
}
return true;

View File

@ -205,6 +205,7 @@ private:
* Should be called under state_mutex.
*/
bool isNotCoveredByFuturePartsImpl(
const String & log_entry_name,
const String & new_part_name, String & out_reason,
std::lock_guard<std::mutex> & state_lock) const;

View File

@ -0,0 +1,63 @@
#include <Storages/System/StorageSystemReplicatedFetches.h>
#include <Storages/MergeTree/ReplicatedFetchList.h>
#include <DataTypes/DataTypeString.h>
#include <DataTypes/DataTypesNumber.h>
#include <Interpreters/Context.h>
#include <Access/ContextAccess.h>
namespace DB
{
NamesAndTypesList StorageSystemReplicatedFetches::getNamesAndTypes()
{
return {
{"database", std::make_shared<DataTypeString>()},
{"table", std::make_shared<DataTypeString>()},
{"elapsed", std::make_shared<DataTypeFloat64>()},
{"progress", std::make_shared<DataTypeFloat64>()},
{"result_part_name", std::make_shared<DataTypeString>()},
{"result_part_path", std::make_shared<DataTypeString>()},
{"partition_id", std::make_shared<DataTypeString>()},
{"total_size_bytes_compressed", std::make_shared<DataTypeUInt64>()},
{"bytes_read_compressed", std::make_shared<DataTypeUInt64>()},
{"source_replica_path", std::make_shared<DataTypeString>()},
{"source_replica_hostname", std::make_shared<DataTypeString>()},
{"source_replica_port", std::make_shared<DataTypeUInt16>()},
{"interserver_scheme", std::make_shared<DataTypeString>()},
{"URI", std::make_shared<DataTypeString>()},
{"to_detached", std::make_shared<DataTypeUInt8>()},
{"thread_id", std::make_shared<DataTypeUInt64>()},
};
}
void StorageSystemReplicatedFetches::fillData(MutableColumns & res_columns, const Context & context, const SelectQueryInfo &) const
{
const auto access = context.getAccess();
const bool check_access_for_tables = !access->isGranted(AccessType::SHOW_TABLES);
for (const auto & fetch : context.getReplicatedFetchList().get())
{
if (check_access_for_tables && !access->isGranted(AccessType::SHOW_TABLES, fetch.database, fetch.table))
continue;
size_t i = 0;
res_columns[i++]->insert(fetch.database);
res_columns[i++]->insert(fetch.table);
res_columns[i++]->insert(fetch.elapsed);
res_columns[i++]->insert(fetch.progress);
res_columns[i++]->insert(fetch.result_part_name);
res_columns[i++]->insert(fetch.result_part_path);
res_columns[i++]->insert(fetch.partition_id);
res_columns[i++]->insert(fetch.total_size_bytes_compressed);
res_columns[i++]->insert(fetch.bytes_read_compressed);
res_columns[i++]->insert(fetch.source_replica_path);
res_columns[i++]->insert(fetch.source_replica_hostname);
res_columns[i++]->insert(fetch.source_replica_port);
res_columns[i++]->insert(fetch.interserver_scheme);
res_columns[i++]->insert(fetch.uri);
res_columns[i++]->insert(fetch.to_detached);
res_columns[i++]->insert(fetch.thread_id);
}
}
}

View File

@ -0,0 +1,28 @@
#pragma once
#include <ext/shared_ptr_helper.h>
#include <Storages/System/IStorageSystemOneBlock.h>
namespace DB
{
class Context;
/// system.replicated_fetches table. Takes data from context.getReplicatedFetchList()
class StorageSystemReplicatedFetches final : public ext::shared_ptr_helper<StorageSystemReplicatedFetches>, public IStorageSystemOneBlock<StorageSystemReplicatedFetches >
{
friend struct ext::shared_ptr_helper<StorageSystemReplicatedFetches>;
public:
std::string getName() const override { return "SystemReplicatedFetches"; }
static NamesAndTypesList getNamesAndTypes();
protected:
using IStorageSystemOneBlock::IStorageSystemOneBlock;
void fillData(MutableColumns & res_columns, const Context & context, const SelectQueryInfo & query_info) const override;
};
}

View File

@ -19,6 +19,7 @@
#include <Storages/System/StorageSystemMacros.h>
#include <Storages/System/StorageSystemMerges.h>
#include <Storages/System/StorageSystemReplicatedFetches.h>
#include <Storages/System/StorageSystemMetrics.h>
#include <Storages/System/StorageSystemModels.h>
#include <Storages/System/StorageSystemMutations.h>
@ -137,6 +138,7 @@ void attachSystemTablesServer(IDatabase & system_database, bool has_zookeeper)
attach<StorageSystemClusters>(system_database, "clusters");
attach<StorageSystemGraphite>(system_database, "graphite_retentions");
attach<StorageSystemMacros>(system_database, "macros");
attach<StorageSystemReplicatedFetches>(system_database, "replicated_fetches");
if (has_zookeeper)
attach<StorageSystemZooKeeper>(system_database, "zookeeper");

View File

@ -7,7 +7,6 @@ PEERDIR(
contrib/libs/poco/MongoDB
)
CFLAGS(-g0)
SRCS(
AlterCommands.cpp
@ -91,6 +90,7 @@ SRCS(
MergeTree/MergeTreeWriteAheadLog.cpp
MergeTree/MergeType.cpp
MergeTree/registerStorageMergeTree.cpp
MergeTree/ReplicatedFetchList.cpp
MergeTree/ReplicatedMergeTreeAddress.cpp
MergeTree/ReplicatedMergeTreeAltersSequence.cpp
MergeTree/ReplicatedMergeTreeBlockOutputStream.cpp
@ -177,6 +177,7 @@ SRCS(
System/StorageSystemQuotasUsage.cpp
System/StorageSystemQuotaUsage.cpp
System/StorageSystemReplicas.cpp
System/StorageSystemReplicatedFetches.cpp
System/StorageSystemReplicationQueue.cpp
System/StorageSystemRoleGrants.cpp
System/StorageSystemRoles.cpp

View File

@ -6,7 +6,6 @@ PEERDIR(
contrib/libs/poco/MongoDB
)
CFLAGS(-g0)
SRCS(
<? find . -name '*.cpp' | grep -v -F tests | grep -v -P 'Kafka|RabbitMQ|S3|HDFS|Licenses|TimeZones' | sed 's/^\.\// /' | sort ?>

View File

@ -5,7 +5,6 @@ PEERDIR(
clickhouse/src/Common
)
CFLAGS(-g0)
SRCS(
ITableFunction.cpp

View File

@ -4,7 +4,6 @@ PEERDIR(
clickhouse/src/Common
)
CFLAGS(-g0)
SRCS(
<? find . -name '*.cpp' | grep -v -P 'S3|HDFS' | sed 's/^\.\// /' | sort ?>

View File

@ -814,6 +814,7 @@ services:
tmpfs: {tmpfs}
cap_add:
- SYS_PTRACE
- NET_ADMIN
depends_on: {depends_on}
user: '{user}'
env_file:

View File

@ -19,6 +19,7 @@ class PartitionManager:
def __init__(self):
self._iptables_rules = []
self._netem_delayed_instances = []
_NetworkManager.get()
def drop_instance_zk_connections(self, instance, action='DROP'):
@ -46,11 +47,18 @@ class PartitionManager:
self._add_rule(create_rule(left, right))
self._add_rule(create_rule(right, left))
def add_network_delay(self, instance, delay_ms):
self._add_tc_netem_delay(instance, delay_ms)
def heal_all(self):
while self._iptables_rules:
rule = self._iptables_rules.pop()
_NetworkManager.get().delete_iptables_rule(**rule)
while self._netem_delayed_instances:
instance = self._netem_delayed_instances.pop()
instance.exec_in_container(["bash", "-c", "tc qdisc del dev eth0 root netem"], user="root")
def pop_rules(self):
res = self._iptables_rules[:]
self.heal_all()
@ -73,6 +81,10 @@ class PartitionManager:
_NetworkManager.get().delete_iptables_rule(**rule)
self._iptables_rules.remove(rule)
def _add_tc_netem_delay(self, instance, delay_ms):
instance.exec_in_container(["bash", "-c", "tc qdisc add dev eth0 root netem delay {}ms".format(delay_ms)], user="root")
self._netem_delayed_instances.append(instance)
def __enter__(self):
return self

View File

@ -0,0 +1 @@
#!/usr/bin/env python3

View File

@ -0,0 +1,93 @@
#!/usr/bin/env python3
import pytest
import time
from helpers.cluster import ClickHouseCluster
from helpers.network import PartitionManager
from helpers.test_tools import assert_eq_with_retry
import random
import string
import json
cluster = ClickHouseCluster(__file__)
node1 = cluster.add_instance('node1', with_zookeeper=True)
node2 = cluster.add_instance('node2', with_zookeeper=True)
@pytest.fixture(scope="module")
def started_cluster():
try:
cluster.start()
yield cluster
finally:
cluster.shutdown()
def get_random_string(length):
return ''.join(random.choice(string.ascii_uppercase + string.digits) for _ in range(length))
def test_system_replicated_fetches(started_cluster):
node1.query("CREATE TABLE t (key UInt64, data String) ENGINE = ReplicatedMergeTree('/clickhouse/test/t', '1') ORDER BY tuple()")
node2.query("CREATE TABLE t (key UInt64, data String) ENGINE = ReplicatedMergeTree('/clickhouse/test/t', '2') ORDER BY tuple()")
with PartitionManager() as pm:
node2.query("SYSTEM STOP FETCHES t")
node1.query("INSERT INTO t SELECT number, '{}' FROM numbers(10000)".format(get_random_string(104857)))
pm.add_network_delay(node1, 80)
node2.query("SYSTEM START FETCHES t")
fetches_result = []
for _ in range(1000):
result = json.loads(node2.query("SELECT * FROM system.replicated_fetches FORMAT JSON"))
if not result["data"]:
if fetches_result:
break
time.sleep(0.1)
else:
fetches_result.append(result["data"][0])
print(fetches_result[-1])
time.sleep(0.1)
node2.query("SYSTEM SYNC REPLICA t", timeout=10)
assert node2.query("SELECT COUNT() FROM t") == "10000\n"
for elem in fetches_result:
elem['bytes_read_compressed'] = float(elem['bytes_read_compressed'])
elem['total_size_bytes_compressed'] = float(elem['total_size_bytes_compressed'])
elem['progress'] = float(elem['progress'])
elem['elapsed'] = float(elem['elapsed'])
assert len(fetches_result) > 0
first_non_empty = fetches_result[0]
assert first_non_empty['database'] == "default"
assert first_non_empty['table'] == "t"
assert first_non_empty['source_replica_hostname'] == 'node1'
assert first_non_empty['source_replica_port'] == 9009
assert first_non_empty['source_replica_path'] == '/clickhouse/test/t/replicas/1'
assert first_non_empty['interserver_scheme'] == 'http'
assert first_non_empty['result_part_name'] == 'all_0_0_0'
assert first_non_empty['result_part_path'].startswith('/var/lib/clickhouse/')
assert first_non_empty['result_part_path'].endswith('all_0_0_0/')
assert first_non_empty['partition_id'] == 'all'
assert first_non_empty['URI'].startswith('http://node1:9009/?endpoint=DataPartsExchange')
for elem in fetches_result:
assert elem['bytes_read_compressed'] <= elem['total_size_bytes_compressed'], "Bytes read ({}) more than total bytes ({}). It's a bug".format(elem['bytes_read_compressed'], elem['total_size_bytes_compressed'])
assert 0.0 <= elem['progress'] <= 1.0, "Progress shouldn't less than 0 and bigger than 1, got {}".format(elem['progress'])
assert 0.0 <= elem['elapsed'], "Elapsed time must be greater than 0, got {}".format(elem['elapsed'])
prev_progress = first_non_empty['progress']
for elem in fetches_result:
assert elem['progress'] >= prev_progress, "Progress decreasing prev{}, next {}? It's a bug".format(prev_progress, elem['progress'])
prev_progress = elem['progress']
prev_bytes = first_non_empty['bytes_read_compressed']
for elem in fetches_result:
assert elem['bytes_read_compressed'] >= prev_bytes, "Bytes read decreasing prev {}, next {}? It's a bug".format(prev_bytes, elem['bytes_read_compressed'])
prev_bytes = elem['bytes_read_compressed']
prev_elapsed = first_non_empty['elapsed']
for elem in fetches_result:
assert elem['elapsed'] >= prev_elapsed, "Elapsed time decreasing prev {}, next {}? It's a bug".format(prev_elapsed, elem['elapsed'])
prev_elapsed = elem['elapsed']

View File

@ -1,4 +1,4 @@
<test max_ignored_relative_change="0.3">
<test>
<preconditions>
<table_exists>hits_100m_single</table_exists>
</preconditions>

View File

@ -1,19 +1,6 @@
<test>
<stop_conditions>
<all_of>
<iterations>10</iterations>
</all_of>
</stop_conditions>
<query>SELECT any(-1 * (((-2 * (number * -3)) * -4) * -5)) FROM numbers(120000000)</query>
<query>SELECT anyLast(-1 * (((-2 * (number * -3)) * -4) * -5)) FROM numbers(120000000)</query>
<query>SELECT any(number * 2) as n, n * 3 FROM numbers(120000000)</query>
<query>SELECT any(number * round(toInt64(number), -2)) FROM numbers(120000000)</query>
<query>SELECT any(-1 * (((-2 * (number * -3)) * -4) * -5)) FROM numbers(500000000)</query>
<query>SELECT anyLast(-1 * (((-2 * (number * -3)) * -4) * -5)) FROM numbers(500000000)</query>
<query>SELECT any(number * 2) as n, n * 3 FROM numbers(500000000)</query>
<query>SELECT any(number * round(toInt64(number), -2)) FROM numbers(500000000)</query>
</test>

View File

@ -1,4 +1,4 @@
<test max_ignored_relative_change="0.4">
<test>
<settings>
<max_memory_usage>30000000000</max_memory_usage>
</settings>

View File

@ -1,4 +1,4 @@
<test max_ignored_relative_change="0.2">
<test>
<create_query>DROP TABLE IF EXISTS perf_lc_str</create_query>
<create_query>CREATE TABLE perf_lc_str(
str LowCardinality(String),

View File

@ -1,5 +1,5 @@
<!-- FIXME this instability is abysmal, investigate the unstable queries -->
<test max_ignored_relative_change="1.5">
<test>
<settings>
<allow_suspicious_codecs>1</allow_suspicious_codecs>
</settings>

View File

@ -1,4 +1,4 @@
<test max_ignored_relative_change="0.2">
<test>
<settings>
<allow_suspicious_codecs>1</allow_suspicious_codecs>
</settings>

Some files were not shown because too many files have changed in this diff Show More