Merge branch 'master' into add_test_logs_level

This commit is contained in:
alesapin 2021-09-04 15:48:47 +03:00
commit 36a11af351
66 changed files with 1013 additions and 442 deletions

View File

@ -17,7 +17,7 @@ if (NOT EXISTS "${ClickHouse_SOURCE_DIR}/contrib/AMQP-CPP/CMakeLists.txt")
endif ()
set (USE_AMQPCPP 1)
set (AMQPCPP_LIBRARY amqp-cpp)
set (AMQPCPP_LIBRARY amqp-cpp ${OPENSSL_LIBRARIES})
set (AMQPCPP_INCLUDE_DIR "${ClickHouse_SOURCE_DIR}/contrib/AMQP-CPP/include")
list (APPEND AMQPCPP_INCLUDE_DIR

View File

@ -29,6 +29,9 @@ if (NOT USE_INTERNAL_ZLIB_LIBRARY)
endif ()
if (NOT ZLIB_FOUND AND NOT MISSING_INTERNAL_ZLIB_LIBRARY)
# https://github.com/zlib-ng/zlib-ng/pull/733
# This is disabed by default
add_compile_definitions(Z_TLS=__thread)
set (USE_INTERNAL_ZLIB_LIBRARY 1)
set (ZLIB_INCLUDE_DIR "${ClickHouse_SOURCE_DIR}/contrib/${INTERNAL_ZLIB_NAME}" "${ClickHouse_BINARY_DIR}/contrib/${INTERNAL_ZLIB_NAME}" CACHE INTERNAL "") # generated zconf.h
set (ZLIB_INCLUDE_DIRS ${ZLIB_INCLUDE_DIR}) # for poco

View File

@ -41,6 +41,4 @@ target_compile_options (amqp-cpp
)
target_include_directories (amqp-cpp SYSTEM PUBLIC "${LIBRARY_DIR}/include")
target_link_libraries (amqp-cpp PUBLIC ssl)
target_link_libraries(amqp-cpp PUBLIC ${OPENSSL_SSL_LIBRARY} ${OPENSSL_CRYPTO_LIBRARY})

View File

@ -15,12 +15,12 @@ if(CMAKE_CXX_COMPILER_ID MATCHES "Clang")
endif()
if(CMAKE_COMPILER_IS_GNUCXX OR CLANG)
set(CMAKE_CXX_FLAGS "${CMAKE_CXX_FLAGS} -std=c++11 -fvisibility=hidden -fno-common -fno-exceptions -fno-rtti")
set(CMAKE_CXX_FLAGS "${CMAKE_CXX_FLAGS} -std=c++11 -fno-common -fno-exceptions -fno-rtti")
if(APPLE AND CLANG)
set(CMAKE_CXX_FLAGS "${CMAKE_CXX_FLAGS} -stdlib=libc++")
endif()
set(CMAKE_C_FLAGS "${CMAKE_C_FLAGS} -fvisibility=hidden -fno-common")
set(CMAKE_C_FLAGS "${CMAKE_C_FLAGS} -fno-common")
if((CMAKE_C_COMPILER_VERSION VERSION_GREATER "4.8.99") OR CLANG)
set(CMAKE_C_FLAGS "${CMAKE_C_FLAGS} -std=c11")
else()

2
contrib/poco vendored

@ -1 +1 @@
Subproject commit 15883876a758bf6f407b22ea4ad0ad2f9465bee6
Subproject commit 46c80daf1b015aa10474ce82e3d24b578c6ae422

View File

@ -21,11 +21,12 @@ CREATE TABLE [IF NOT EXISTS] [db.]table_name [ON CLUSTER cluster]
name2 [type2] [DEFAULT|MATERIALIZED|ALIAS expr2],
...
) ENGINE = RabbitMQ SETTINGS
rabbitmq_host_port = 'host:port',
rabbitmq_host_port = 'host:port' [or rabbitmq_address = 'amqp(s)://guest:guest@localhost/vhost'],
rabbitmq_exchange_name = 'exchange_name',
rabbitmq_format = 'data_format'[,]
[rabbitmq_exchange_type = 'exchange_type',]
[rabbitmq_routing_key_list = 'key1,key2,...',]
[rabbitmq_secure = 0,]
[rabbitmq_row_delimiter = 'delimiter_symbol',]
[rabbitmq_schema = '',]
[rabbitmq_num_consumers = N,]
@ -59,6 +60,11 @@ Optional parameters:
- `rabbitmq_max_block_size`
- `rabbitmq_flush_interval_ms`
SSL connection:
Use either `rabbitmq_secure = 1` or `amqps` in connection address: `rabbitmq_address = 'amqps://guest:guest@localhost/vhost'`.
The default behaviour of the used library is not to check if the created TLS connection is sufficiently secure. Whether the certificate is expired, self-signed, missing or invalid: the connection is simply permitted. More strict checking of certificates can possibly be implemented in the future.
Also format settings can be added along with rabbitmq-related settings.
Example:

View File

@ -26,17 +26,17 @@ The supported formats are:
| [VerticalRaw](#verticalraw) | ✗ | ✔ |
| [JSON](#json) | ✗ | ✔ |
| [JSONAsString](#jsonasstring) | ✔ | ✗ |
| [JSONString](#jsonstring) | ✗ | ✔ |
| [JSONStrings](#jsonstrings) | ✗ | ✔ |
| [JSONCompact](#jsoncompact) | ✗ | ✔ |
| [JSONCompactString](#jsoncompactstring) | ✗ | ✔ |
| [JSONCompactStrings](#jsoncompactstrings) | ✗ | ✔ |
| [JSONEachRow](#jsoneachrow) | ✔ | ✔ |
| [JSONEachRowWithProgress](#jsoneachrowwithprogress) | ✗ | ✔ |
| [JSONStringsEachRow](#jsonstringseachrow) | ✔ | ✔ |
| [JSONStringsEachRowWithProgress](#jsonstringseachrowwithprogress) | ✗ | ✔ |
| [JSONCompactEachRow](#jsoncompacteachrow) | ✔ | ✔ |
| [JSONCompactEachRowWithNamesAndTypes](#jsoncompacteachrowwithnamesandtypes) | ✔ | ✔ |
| [JSONCompactStringEachRow](#jsoncompactstringeachrow) | ✔ | ✔ |
| [JSONCompactStringEachRowWithNamesAndTypes](#jsoncompactstringeachrowwithnamesandtypes) | ✔ | ✔ |
| [JSONCompactStringsEachRow](#jsoncompactstringseachrow) | ✔ | ✔ |
| [JSONCompactStringsEachRowWithNamesAndTypes](#jsoncompactstringseachrowwithnamesandtypes) | ✔ | ✔ |
| [TSKV](#tskv) | ✔ | ✔ |
| [Pretty](#pretty) | ✗ | ✔ |
| [PrettyCompact](#prettycompact) | ✗ | ✔ |
@ -464,7 +464,7 @@ ClickHouse supports [NULL](../sql-reference/syntax.md), which is displayed as `n
- [JSONEachRow](#jsoneachrow) format
- [output_format_json_array_of_rows](../operations/settings/settings.md#output-format-json-array-of-rows) setting
## JSONString {#jsonstring}
## JSONStrings {#jsonstrings}
Differs from JSON only in that data fields are output in strings, not in typed JSON values.
@ -541,7 +541,7 @@ Result:
```
## JSONCompact {#jsoncompact}
## JSONCompactString {#jsoncompactstring}
## JSONCompactStrings {#jsoncompactstrings}
Differs from JSON only in that data rows are output in arrays, not in objects.
@ -580,7 +580,7 @@ Example:
```
```
// JSONCompactString
// JSONCompactStrings
{
"meta":
[
@ -614,7 +614,7 @@ Example:
## JSONEachRow {#jsoneachrow}
## JSONStringsEachRow {#jsonstringseachrow}
## JSONCompactEachRow {#jsoncompacteachrow}
## JSONCompactStringEachRow {#jsoncompactstringeachrow}
## JSONCompactStringsEachRow {#jsoncompactstringseachrow}
When using these formats, ClickHouse outputs rows as separated, newline-delimited JSON values, but the data as a whole is not valid JSON.
@ -639,9 +639,9 @@ Differs from `JSONEachRow`/`JSONStringsEachRow` in that ClickHouse will also yie
```
## JSONCompactEachRowWithNamesAndTypes {#jsoncompacteachrowwithnamesandtypes}
## JSONCompactStringEachRowWithNamesAndTypes {#jsoncompactstringeachrowwithnamesandtypes}
## JSONCompactStringsEachRowWithNamesAndTypes {#jsoncompactstringseachrowwithnamesandtypes}
Differs from `JSONCompactEachRow`/`JSONCompactStringEachRow` in that the column names and types are written as the first two rows.
Differs from `JSONCompactEachRow`/`JSONCompactStringsEachRow` in that the column names and types are written as the first two rows.
```json
["'hello'", "multiply(42, number)", "range(5)"]

View File

@ -25,7 +25,7 @@ toc_title: Adopters
| <a href="https://badoo.com" class="favicon">Badoo</a> | Dating | Timeseries | — | — | [Slides in Russian, December 2019](https://presentations.clickhouse.tech/meetup38/forecast.pdf) |
| <a href="https://www.benocs.com/" class="favicon">Benocs</a> | Network Telemetry and Analytics | Main Product | — | — | [Slides in English, October 2017](https://github.com/ClickHouse/clickhouse-presentations/blob/master/meetup9/lpm.pdf) |
| <a href="https://www.bigo.sg/" class="favicon">BIGO</a> | Video | Computing Platform | — | — | [Blog Article, August 2020](https://www.programmersought.com/article/44544895251/) |
| <a href="https://www.bloomberg.com/" class="favicon">Bloomberg</a> | Finance, Media | Monitoring | 102 servers | — | [Slides, May 2018](https://www.slideshare.net/Altinity/http-analytics-for-6m-requests-per-second-using-clickhouse-by-alexander-bocharov) |
| <a href="https://www.bloomberg.com/">Bloomberg</a> | Finance, Media | Monitoring | | — | [Slides, May 2018](https://www.slideshare.net/Altinity/http-analytics-for-6m-requests-per-second-using-clickhouse-by-alexander-bocharov) |
| <a href="https://bloxy.info" class="favicon">Bloxy</a> | Blockchain | Analytics | — | — | [Slides in Russian, August 2018](https://github.com/ClickHouse/clickhouse-presentations/blob/master/meetup17/4_bloxy.pptx) |
| <a href="https://www.bytedance.com" class="favicon">Bytedance</a> | Social platforms | — | — | — | [The ClickHouse Meetup East, October 2020](https://www.youtube.com/watch?v=ckChUkC3Pns) |
| <a href="https://cardsmobile.ru/" class="favicon">CardsMobile</a> | Finance | Analytics | — | — | [VC.ru](https://vc.ru/s/cardsmobile/143449-rukovoditel-gruppy-analiza-dannyh) |

View File

@ -390,12 +390,12 @@ This section contains the following parameters:
## keep_alive_timeout {#keep-alive-timeout}
The number of seconds that ClickHouse waits for incoming requests before closing the connection. Defaults to 3 seconds.
The number of seconds that ClickHouse waits for incoming requests before closing the connection. Defaults to 10 seconds.
**Example**
``` xml
<keep_alive_timeout>3</keep_alive_timeout>
<keep_alive_timeout>10</keep_alive_timeout>
```
## listen_host {#server_configuration_parameters-listen_host}

View File

@ -284,12 +284,12 @@ ClickHouseサーバー間でデータを交換するポート。
## keep_alive_timeout {#keep-alive-timeout}
ClickHouseが接続を閉じる前に受信要求を待機する秒数。 既定値は3秒です。
ClickHouseが接続を閉じる前に受信要求を待機する秒数。 既定値は10秒です。
**例**
``` xml
<keep_alive_timeout>3</keep_alive_timeout>
<keep_alive_timeout>10</keep_alive_timeout>
```
## listen_host {#server_configuration_parameters-listen_host}

View File

@ -25,17 +25,17 @@ ClickHouse может принимать (`INSERT`) и отдавать (`SELECT
| [VerticalRaw](#verticalraw) | ✗ | ✔ |
| [JSON](#json) | ✗ | ✔ |
| [JSONAsString](#jsonasstring) | ✔ | ✗ |
| [JSONString](#jsonstring) | ✗ | ✔ |
| [JSONStrings](#jsonstrings) | ✗ | ✔ |
| [JSONCompact](#jsoncompact) | ✗ | ✔ |
| [JSONCompactString](#jsoncompactstring) | ✗ | ✔ |
| [JSONCompactStrings](#jsoncompactstrings) | ✗ | ✔ |
| [JSONEachRow](#jsoneachrow) | ✔ | ✔ |
| [JSONEachRowWithProgress](#jsoneachrowwithprogress) | ✗ | ✔ |
| [JSONStringEachRow](#jsonstringeachrow) | ✔ | ✔ |
| [JSONStringEachRowWithProgress](#jsonstringeachrowwithprogress) | ✗ | ✔ |
| [JSONStringsEachRow](#jsonstringseachrow) | ✔ | ✔ |
| [JSONStringsEachRowWithProgress](#jsonstringseachrowwithprogress) | ✗ | ✔ |
| [JSONCompactEachRow](#jsoncompacteachrow) | ✔ | ✔ |
| [JSONCompactEachRowWithNamesAndTypes](#jsoncompacteachrowwithnamesandtypes) | ✔ | ✔ |
| [JSONCompactStringEachRow](#jsoncompactstringeachrow) | ✔ | ✔ |
| [JSONCompactStringEachRowWithNamesAndTypes](#jsoncompactstringeachrowwithnamesandtypes) | ✔ | ✔ |
| [JSONCompactStringsEachRow](#jsoncompactstringseachrow) | ✔ | ✔ |
| [JSONCompactStringsEachRowWithNamesAndTypes](#jsoncompactstringseachrowwithnamesandtypes) | ✔ | ✔ |
| [TSKV](#tskv) | ✔ | ✔ |
| [Pretty](#pretty) | ✗ | ✔ |
| [PrettyCompact](#prettycompact) | ✗ | ✔ |
@ -442,7 +442,7 @@ ClickHouse поддерживает [NULL](../sql-reference/syntax.md), кото
- Формат [JSONEachRow](#jsoneachrow)
- Настройка [output_format_json_array_of_rows](../operations/settings/settings.md#output-format-json-array-of-rows)
## JSONString {#jsonstring}
## JSONStrings {#jsonstrings}
Отличается от JSON только тем, что поля данных выводятся в строках, а не в типизированных значениях JSON.
@ -519,7 +519,7 @@ SELECT * FROM json_as_string;
```
## JSONCompact {#jsoncompact}
## JSONCompactString {#jsoncompactstring}
## JSONCompactStrings {#jsoncompactstrings}
Отличается от JSON только тем, что строчки данных выводятся в массивах, а не в object-ах.
@ -558,7 +558,7 @@ SELECT * FROM json_as_string;
```
```json
// JSONCompactString
// JSONCompactStrings
{
"meta":
[
@ -590,9 +590,9 @@ SELECT * FROM json_as_string;
```
## JSONEachRow {#jsoneachrow}
## JSONStringEachRow {#jsonstringeachrow}
## JSONStringsEachRow {#jsonstringseachrow}
## JSONCompactEachRow {#jsoncompacteachrow}
## JSONCompactStringEachRow {#jsoncompactstringeachrow}
## JSONCompactStringsEachRow {#jsoncompactstringseachrow}
При использовании этих форматов ClickHouse выводит каждую запись как значения JSON (каждое значение отдельной строкой), при этом данные в целом — невалидный JSON.
@ -605,9 +605,9 @@ SELECT * FROM json_as_string;
При вставке данных вы должны предоставить отдельное значение JSON для каждой строки.
## JSONEachRowWithProgress {#jsoneachrowwithprogress}
## JSONStringEachRowWithProgress {#jsonstringeachrowwithprogress}
## JSONStringsEachRowWithProgress {#jsonstringseachrowwithprogress}
Отличается от `JSONEachRow`/`JSONStringEachRow` тем, что ClickHouse будет выдавать информацию о ходе выполнения в виде значений JSON.
Отличается от `JSONEachRow`/`JSONStringsEachRow` тем, что ClickHouse будет выдавать информацию о ходе выполнения в виде значений JSON.
```json
{"row":{"'hello'":"hello","multiply(42, number)":"0","range(5)":[0,1,2,3,4]}}
@ -617,9 +617,9 @@ SELECT * FROM json_as_string;
```
## JSONCompactEachRowWithNamesAndTypes {#jsoncompacteachrowwithnamesandtypes}
## JSONCompactStringEachRowWithNamesAndTypes {#jsoncompactstringeachrowwithnamesandtypes}
## JSONCompactStringsEachRowWithNamesAndTypes {#jsoncompactstringseachrowwithnamesandtypes}
Отличается от `JSONCompactEachRow`/`JSONCompactStringEachRow` тем, что имена и типы столбцов записываются как первые две строки.
Отличается от `JSONCompactEachRow`/`JSONCompactStringsEachRow` тем, что имена и типы столбцов записываются как первые две строки.
```json
["'hello'", "multiply(42, number)", "range(5)"]

View File

@ -371,12 +371,12 @@ ClickHouse проверяет условия для `min_part_size` и `min_part
## keep_alive_timeout {#keep-alive-timeout}
Время в секундах, в течение которого ClickHouse ожидает входящих запросов прежде, чем закрыть соединение.
Время в секундах, в течение которого ClickHouse ожидает входящих запросов прежде, чем 10акрыть соединение.
**Пример**
``` xml
<keep_alive_timeout>3</keep_alive_timeout>
<keep_alive_timeout>10</keep_alive_timeout>
```
## listen_host {#server_configuration_parameters-listen_host}

View File

@ -26,17 +26,17 @@ ClickHouse可以接受和返回各种格式的数据。受支持的输入格式
| [VerticalRaw](#verticalraw) | ✗ | ✔ |
| [JSON](#json) | ✗ | ✔ |
| [JSONAsString](#jsonasstring) | ✔ | ✗ |
| [JSONString](#jsonstring) | ✗ | ✔ |
| [JSONStrings](#jsonstrings) | ✗ | ✔ |
| [JSONCompact](#jsoncompact) | ✗ | ✔ |
| [JSONCompactString](#jsoncompactstring) | ✗ | ✔ |
| [JSONCompactStrings](#jsoncompactstrings) | ✗ | ✔ |
| [JSONEachRow](#jsoneachrow) | ✔ | ✔ |
| [JSONEachRowWithProgress](#jsoneachrowwithprogress) | ✗ | ✔ |
| [JSONStringsEachRow](#jsonstringseachrow) | ✔ | ✔ |
| [JSONStringsEachRowWithProgress](#jsonstringseachrowwithprogress) | ✗ | ✔ |
| [JSONCompactEachRow](#jsoncompacteachrow) | ✔ | ✔ |
| [JSONCompactEachRowWithNamesAndTypes](#jsoncompacteachrowwithnamesandtypes) | ✔ | ✔ |
| [JSONCompactStringEachRow](#jsoncompactstringeachrow) | ✔ | ✔ |
| [JSONCompactStringEachRowWithNamesAndTypes](#jsoncompactstringeachrowwithnamesandtypes) | ✔ | ✔ |
| [JSONCompactStringsEachRow](#jsoncompactstringseachrow) | ✔ | ✔ |
| [JSONCompactStringsEachRowWithNamesAndTypes](#jsoncompactstringseachrowwithnamesandtypes) | ✔ | ✔ |
| [TSKV](#tskv) | ✔ | ✔ |
| [Pretty](#pretty) | ✗ | ✔ |
| [PrettyCompact](#prettycompact) | ✗ | ✔ |
@ -465,7 +465,7 @@ ClickHouse支持[NULL](../sql-reference/syntax.md), 在JSON输出中显示为`nu
- [JSONEachRow](#jsoneachrow)格式
- [output_format_json_array_of_rows](../operations/settings/settings.md#output-format-json-array-of-rows)设置
## JSONString {#jsonstring}
## JSONStrings {#jsonstrings}
与JSON的不同之处在于数据字段以字符串输出而不是以类型化JSON值输出。
@ -543,7 +543,7 @@ SELECT * FROM json_as_string;
```
## JSONCompact {#jsoncompact}
## JSONCompactString {#jsoncompactstring}
## JSONCompactStrings {#jsoncompactstrings}
与JSON格式不同的是它以数组的方式输出结果而不是以结构体。
@ -582,7 +582,7 @@ SELECT * FROM json_as_string;
```
```json
// JSONCompactString
// JSONCompactStrings
{
"meta":
[
@ -614,9 +614,9 @@ SELECT * FROM json_as_string;
```
## JSONEachRow {#jsoneachrow}
## JSONStringEachRow {#jsonstringeachrow}
## JSONStringsEachRow {#jsonstringseachrow}
## JSONCompactEachRow {#jsoncompacteachrow}
## JSONCompactStringEachRow {#jsoncompactstringeachrow}
## JSONCompactStringsEachRow {#jsoncompactstringseachrow}
使用这些格式时ClickHouse会将行输出为用换行符分隔的JSON值这些输出数据作为一个整体时由于没有分隔符(,)因而不是有效的JSON文档。
@ -629,9 +629,9 @@ SELECT * FROM json_as_string;
在插入数据时应该为每一行提供一个单独的JSON值。
## JSONEachRowWithProgress {#jsoneachrowwithprogress}
## JSONStringEachRowWithProgress {#jsonstringeachrowwithprogress}
## JSONStringsEachRowWithProgress {#jsonstringseachrowwithprogress}
与`JSONEachRow`/`JSONStringEachRow`不同的是ClickHouse还将生成作为JSON值的进度信息。
与`JSONEachRow`/`JSONStringsEachRow`不同的是ClickHouse还将生成作为JSON值的进度信息。
```json
{"row":{"'hello'":"hello","multiply(42, number)":"0","range(5)":[0,1,2,3,4]}}
@ -641,9 +641,9 @@ SELECT * FROM json_as_string;
```
## JSONCompactEachRowWithNamesAndTypes {#jsoncompacteachrowwithnamesandtypes}
## JSONCompactStringEachRowWithNamesAndTypes {#jsoncompactstringeachrowwithnamesandtypes}
## JSONCompactStringsEachRowWithNamesAndTypes {#jsoncompactstringseachrowwithnamesandtypes}
与`JSONCompactEachRow`/`JSONCompactStringEachRow`不同的是,列名和类型被写入前两行。
与`JSONCompactEachRow`/`JSONCompactStringsEachRow`不同的是,列名和类型被写入前两行。
```json
["'hello'", "multiply(42, number)", "range(5)"]

View File

@ -282,12 +282,12 @@ ClickHouse每x秒重新加载内置字典。 这使得编辑字典 “on the fly
## keep_alive_timeout {#keep-alive-timeout}
ClickHouse在关闭连接之前等待传入请求的秒数。 默认为3秒。
ClickHouse在关闭连接之前等待传入请求的秒数。默认为10秒。
**示例**
``` xml
<keep_alive_timeout>3</keep_alive_timeout>
<keep_alive_timeout>10</keep_alive_timeout>
```
## listen_host {#server_configuration_parameters-listen_host}

View File

@ -43,6 +43,7 @@ enum class AccessType
M(ALTER_COMMENT_COLUMN, "COMMENT COLUMN", COLUMN, ALTER_COLUMN) \
M(ALTER_CLEAR_COLUMN, "CLEAR COLUMN", COLUMN, ALTER_COLUMN) \
M(ALTER_RENAME_COLUMN, "RENAME COLUMN", COLUMN, ALTER_COLUMN) \
M(ALTER_MATERIALIZE_COLUMN, "MATERIALIZE COLUMN", COLUMN, ALTER_COLUMN) \
M(ALTER_COLUMN, "", GROUP, ALTER_TABLE) /* allow to execute ALTER {ADD|DROP|MODIFY...} COLUMN */\
\
M(ALTER_ORDER_BY, "ALTER MODIFY ORDER BY, MODIFY ORDER BY", TABLE, ALTER_INDEX) \

View File

@ -142,8 +142,22 @@ private:
struct ChangelogReadResult
{
uint64_t entries_read;
/// Total entries read from log including skipped.
/// Useful when we decide to continue to write in the same log and want to know
/// how many entries was already written in it.
uint64_t total_entries_read_from_log;
/// First index in log
uint64_t log_start_index;
/// First entry actually read log (not including skipped)
uint64_t first_read_index;
/// Last entry read from log (last entry in log)
/// When we don't skip anything last_read_index - first_read_index = total_entries_read_from_log.
/// But when some entries from the start of log can be skipped because they are not required.
uint64_t last_read_index;
/// last offset we were able to read from log
off_t last_position;
bool error;
};
@ -156,6 +170,7 @@ public:
, read_buf(filepath)
{}
/// start_log_index -- all entries with index < start_log_index will be skipped, but accounted into total_entries_read_from_log
ChangelogReadResult readChangelog(IndexToLogEntry & logs, uint64_t start_log_index, IndexToOffset & index_to_offset, Poco::Logger * log)
{
uint64_t previous_index = 0;
@ -210,7 +225,7 @@ public:
if (logs.count(record.header.index) != 0)
throw Exception(ErrorCodes::CORRUPTED_DATA, "Duplicated index id {} in log {}", record.header.index, filepath);
result.entries_read += 1;
result.total_entries_read_from_log += 1;
/// Read but skip this entry because our state is already more fresh
if (record.header.index < start_log_index)
@ -224,9 +239,10 @@ public:
/// Put it into in memory structure
logs.emplace(record.header.index, log_entry);
index_to_offset[record.header.index] = result.last_position;
result.last_read_index = record.header.index;
if (result.entries_read % 50000 == 0)
LOG_TRACE(log, "Reading changelog from path {}, entries {}", filepath, result.entries_read);
if (result.total_entries_read_from_log % 50000 == 0)
LOG_TRACE(log, "Reading changelog from path {}, entries {}", filepath, result.total_entries_read_from_log);
}
}
catch (const Exception & ex)
@ -243,7 +259,7 @@ public:
tryLogCurrentException(log);
}
LOG_TRACE(log, "Totally read from changelog {} {} entries", filepath, result.entries_read);
LOG_TRACE(log, "Totally read from changelog {} {} entries", filepath, result.total_entries_read_from_log);
return result;
}
@ -280,16 +296,10 @@ Changelog::Changelog(
void Changelog::readChangelogAndInitWriter(uint64_t last_commited_log_index, uint64_t logs_to_keep)
{
uint64_t total_read = 0;
std::optional<ChangelogReadResult> last_log_read_result;
/// Amount of entries in last log index
uint64_t entries_in_last = 0;
/// Log idx of the first incomplete log (key in existing_changelogs)
int64_t first_incomplete_log_start_index = -1; /// if -1 then no incomplete log exists
ChangelogReadResult result{};
/// First log index which was read from all changelogs
uint64_t first_read_index = 0;
/// Last log has some free space to write
bool last_log_is_not_complete = false;
/// We must start to read from this log index
uint64_t start_to_read_from = last_commited_log_index;
@ -300,19 +310,14 @@ void Changelog::readChangelogAndInitWriter(uint64_t last_commited_log_index, uin
else
start_to_read_from = 1;
/// At least we read something
bool started = false;
/// Got through changelog files in order of start_index
for (const auto & [changelog_start_index, changelog_description] : existing_changelogs)
{
/// How many entries we have in the last changelog
entries_in_last = changelog_description.expectedEntriesCountInLog();
/// [from_log_index.>=.......start_to_read_from.....<=.to_log_index]
if (changelog_description.to_log_index >= start_to_read_from)
{
if (!started) /// still nothing was read
if (!last_log_read_result) /// still nothing was read
{
/// Our first log starts from the more fresh log_id than we required to read and this changelog is not empty log.
/// So we are missing something in our logs, but it's not dataloss, we will receive snapshot and required
@ -320,8 +325,12 @@ void Changelog::readChangelogAndInitWriter(uint64_t last_commited_log_index, uin
if (changelog_description.from_log_index > last_commited_log_index && (changelog_description.from_log_index - last_commited_log_index) > 1)
{
LOG_ERROR(log, "Some records was lost, last committed log index {}, smallest available log index on disk {}. Hopefully will receive missing records from leader.", last_commited_log_index, changelog_description.from_log_index);
first_incomplete_log_start_index = changelog_start_index;
break;
/// Nothing to do with our more fresh log, leader will overwrite them, so remove everything and just start from last_commited_index
removeAllLogs();
min_log_id = last_commited_log_index;
max_log_id = last_commited_log_index == 0 ? 0 : last_commited_log_index - 1;
rotate(max_log_id + 1);
return;
}
else if (changelog_description.from_log_index > start_to_read_from)
{
@ -332,69 +341,100 @@ void Changelog::readChangelogAndInitWriter(uint64_t last_commited_log_index, uin
ChangelogReader reader(changelog_description.path);
result = reader.readChangelog(logs, start_to_read_from, index_to_start_pos, log);
started = true;
last_log_read_result = reader.readChangelog(logs, start_to_read_from, index_to_start_pos, log);
/// Otherwise we have already initialized it
if (first_read_index == 0)
first_read_index = result.first_read_index;
if (min_log_id == 0)
min_log_id = last_log_read_result->first_read_index;
total_read += result.entries_read;
if (last_log_read_result->last_read_index != 0)
max_log_id = last_log_read_result->last_read_index;
last_log_read_result->log_start_index = changelog_description.from_log_index;
/// How many entries we have in the last changelog
uint64_t expected_entries_in_log = changelog_description.expectedEntriesCountInLog();
/// May happen after truncate, crash or simply unfinished log
if (result.entries_read < entries_in_last)
if (last_log_read_result->total_entries_read_from_log < expected_entries_in_log)
{
first_incomplete_log_start_index = changelog_start_index;
last_log_is_not_complete = true;
break;
}
}
}
if (first_read_index != 0)
start_index = first_read_index;
else /// We just may have no logs (only snapshot)
start_index = last_commited_log_index;
/// Found some broken or non finished logs
/// We have to remove broken data and continue to write into incomplete log.
if (first_incomplete_log_start_index != -1) /// otherwise all logs completed so just start a new one
/// we can have empty log (with zero entries) and last_log_read_result will be initialized
if (!last_log_read_result || min_log_id == 0) /// We just may have no logs (only snapshot or nothing)
{
auto start_remove_from = existing_changelogs.begin();
if (started)
start_remove_from = existing_changelogs.upper_bound(first_incomplete_log_start_index);
/// Just to be sure they don't exist
removeAllLogs();
/// All subsequent logs shouldn't exist. But they may exist if we crashed after writeAt started. Remove them.
for (auto itr = start_remove_from; itr != existing_changelogs.end();)
{
LOG_WARNING(log, "Removing changelog {}, because it's goes after broken changelog entry", itr->second.path);
std::filesystem::remove(itr->second.path);
itr = existing_changelogs.erase(itr);
}
min_log_id = last_commited_log_index;
max_log_id = last_commited_log_index == 0 ? 0 : last_commited_log_index - 1;
}
else if (last_log_is_not_complete) /// if it's complete just start new one
{
assert(last_log_read_result != std::nullopt);
/// Actually they shouldn't exist, but to be sure we remove them
removeAllLogsAfter(last_log_read_result->log_start_index);
/// Continue to write into existing log
if (!existing_changelogs.empty())
{
auto description = existing_changelogs.rbegin()->second;
if (description.expectedEntriesCountInLog() != rotate_interval)
LOG_TRACE(log, "Looks like rotate_logs_interval was changed, current {}, expected entries in last log {}", rotate_interval, description.expectedEntriesCountInLog());
assert(!existing_changelogs.empty());
assert(existing_changelogs.find(last_log_read_result->log_start_index)->first == existing_changelogs.rbegin()->first);
LOG_TRACE(log, "Continue to write into {}", description.path);
current_writer = std::make_unique<ChangelogWriter>(description.path, WriteMode::Append, description.from_log_index);
current_writer->setEntriesWritten(result.entries_read);
/// Continue to write into incomplete existing log
auto description = existing_changelogs[last_log_read_result->log_start_index];
/// Truncate all broken entries from log
if (result.error)
{
LOG_WARNING(log, "Read finished with error, truncating all broken log entries");
current_writer->truncateToLength(result.last_position);
}
}
if (last_log_read_result->error)
initWriter(description, last_log_read_result->total_entries_read_from_log, /* truncate_to_offset = */ last_log_read_result->last_position);
else
initWriter(description, last_log_read_result->total_entries_read_from_log);
}
/// Start new log if we don't initialize writer from previous log
/// Start new log if we don't initialize writer from previous log. All logs can be "complete".
if (!current_writer)
rotate(start_index + total_read);
rotate(max_log_id + 1);
}
void Changelog::initWriter(const ChangelogFileDescription & description, uint64_t entries_already_written, std::optional<uint64_t> truncate_to_offset)
{
if (description.expectedEntriesCountInLog() != rotate_interval)
LOG_TRACE(log, "Looks like rotate_logs_interval was changed, current {}, expected entries in last log {}", rotate_interval, description.expectedEntriesCountInLog());
LOG_TRACE(log, "Continue to write into {}", description.path);
current_writer = std::make_unique<ChangelogWriter>(description.path, WriteMode::Append, description.from_log_index);
current_writer->setEntriesWritten(entries_already_written);
if (truncate_to_offset)
{
LOG_WARNING(log, "Changelog {} contain broken enties, truncating all broken log entries", description.path);
current_writer->truncateToLength(*truncate_to_offset);
}
}
void Changelog::removeAllLogsAfter(uint64_t start_to_remove_from_id)
{
auto start_to_remove_from = existing_changelogs.upper_bound(start_to_remove_from_id);
/// All subsequent logs shouldn't exist. But they may exist if we crashed after writeAt started. Remove them.
for (auto itr = start_to_remove_from; itr != existing_changelogs.end();)
{
LOG_WARNING(log, "Removing changelog {}, because it's goes after broken changelog entry", itr->second.path);
std::filesystem::remove(itr->second.path);
itr = existing_changelogs.erase(itr);
}
}
void Changelog::removeAllLogs()
{
LOG_WARNING(log, "Removing all changelogs");
for (auto itr = existing_changelogs.begin(); itr != existing_changelogs.end();)
{
LOG_WARNING(log, "Removing changelog {}, because it's goes after broken changelog entry", itr->second.path);
std::filesystem::remove(itr->second.path);
itr = existing_changelogs.erase(itr);
}
}
void Changelog::rotate(uint64_t new_start_log_index)
@ -439,7 +479,7 @@ void Changelog::appendEntry(uint64_t index, const LogEntryPtr & log_entry)
throw Exception(ErrorCodes::LOGICAL_ERROR, "Changelog must be initialized before appending records");
if (logs.empty())
start_index = index;
min_log_id = index;
const auto & current_changelog_description = existing_changelogs[current_writer->getStartIndex()];
const bool log_is_complete = current_writer->getEntriesWritten() == current_changelog_description.expectedEntriesCountInLog();
@ -452,6 +492,7 @@ void Changelog::appendEntry(uint64_t index, const LogEntryPtr & log_entry)
throw Exception(ErrorCodes::LOGICAL_ERROR, "Record with index {} already exists", index);
logs[index] = makeClone(log_entry);
max_log_id = index;
}
void Changelog::writeAt(uint64_t index, const LogEntryPtr & log_entry)
@ -513,11 +554,30 @@ void Changelog::writeAt(uint64_t index, const LogEntryPtr & log_entry)
void Changelog::compact(uint64_t up_to_log_index)
{
LOG_INFO(log, "Compact logs up to log index {}, our max log id is {}", up_to_log_index, max_log_id);
bool remove_all_logs = false;
if (up_to_log_index > max_log_id)
{
LOG_INFO(log, "Seems like this node recovers from leaders snapshot, removing all logs");
/// If we received snapshot from leader we may compact up to more fresh log
max_log_id = up_to_log_index;
remove_all_logs = true;
}
bool need_rotate = false;
for (auto itr = existing_changelogs.begin(); itr != existing_changelogs.end();)
{
/// Remove all completely outdated changelog files
if (itr->second.to_log_index <= up_to_log_index)
if (remove_all_logs || itr->second.to_log_index <= up_to_log_index)
{
if (current_writer && itr->second.from_log_index == current_writer->getStartIndex())
{
LOG_INFO(log, "Trying to remove log {} which is current active log for write. Possibly this node recovers from snapshot", itr->second.path);
need_rotate = true;
current_writer.reset();
}
LOG_INFO(log, "Removing changelog {} because of compaction", itr->second.path);
std::erase_if(index_to_start_pos, [right_index = itr->second.to_log_index] (const auto & item) { return item.first <= right_index; });
std::filesystem::remove(itr->second.path);
@ -526,8 +586,14 @@ void Changelog::compact(uint64_t up_to_log_index)
else /// Files are ordered, so all subsequent should exist
break;
}
start_index = up_to_log_index + 1;
/// Compaction from the past is possible, so don't make our min_log_id smaller.
min_log_id = std::max(min_log_id, up_to_log_index + 1);
std::erase_if(logs, [up_to_log_index] (const auto & item) { return item.first <= up_to_log_index; });
if (need_rotate)
rotate(up_to_log_index + 1);
LOG_INFO(log, "Compaction up to {} finished new min index {}, new max index {}", up_to_log_index, min_log_id, max_log_id);
}
LogEntryPtr Changelog::getLastEntry() const
@ -535,10 +601,11 @@ LogEntryPtr Changelog::getLastEntry() const
/// This entry treaded in special way by NuRaft
static LogEntryPtr fake_entry = nuraft::cs_new<nuraft::log_entry>(0, nuraft::buffer::alloc(sizeof(uint64_t)));
const uint64_t next_index = getNextEntryIndex() - 1;
auto entry = logs.find(next_index);
auto entry = logs.find(max_log_id);
if (entry == logs.end())
{
return fake_entry;
}
return entry->second;
}

View File

@ -2,6 +2,7 @@
#include <libnuraft/nuraft.hxx> // Y_IGNORE
#include <city.h>
#include <optional>
#include <IO/WriteBufferFromFile.h>
#include <IO/HashingWriteBuffer.h>
#include <Compression/CompressedWriteBuffer.h>
@ -87,12 +88,12 @@ public:
uint64_t getNextEntryIndex() const
{
return start_index + logs.size();
return max_log_id + 1;
}
uint64_t getStartIndex() const
{
return start_index;
return min_log_id;
}
/// Last entry in log, or fake entry with term 0 if log is empty
@ -128,6 +129,13 @@ private:
/// Starts new file [new_start_log_index, new_start_log_index + rotate_interval]
void rotate(uint64_t new_start_log_index);
/// Remove all changelogs from disk with start_index bigger than start_to_remove_from_id
void removeAllLogsAfter(uint64_t start_to_remove_from_id);
/// Remove all logs from disk
void removeAllLogs();
/// Init writer for existing log with some entries already written
void initWriter(const ChangelogFileDescription & description, uint64_t entries_already_written, std::optional<uint64_t> truncate_to_offset = {});
private:
const std::string changelogs_dir;
const uint64_t rotate_interval;
@ -144,7 +152,9 @@ private:
/// Mapping log_id -> log_entry
IndexToLogEntry logs;
/// Start log_id which exists in all "active" logs
uint64_t start_index = 0;
/// min_log_id + 1 == max_log_id means empty log storage for NuRaft
uint64_t min_log_id = 0;
uint64_t max_log_id = 0;
};
}

View File

@ -293,10 +293,12 @@ void KeeperDispatcher::shutdown()
if (session_cleaner_thread.joinable())
session_cleaner_thread.join();
/// FIXME not the best way to notify
requests_queue->push({});
if (request_thread.joinable())
request_thread.join();
if (requests_queue)
{
requests_queue->push({});
if (request_thread.joinable())
request_thread.join();
}
responses_queue.push({});
if (responses_thread.joinable())
@ -313,7 +315,7 @@ void KeeperDispatcher::shutdown()
KeeperStorage::RequestForSession request_for_session;
/// Set session expired for all pending requests
while (requests_queue->tryPop(request_for_session))
while (requests_queue && requests_queue->tryPop(request_for_session))
{
if (request_for_session.request)
{

View File

@ -357,7 +357,7 @@ void KeeperServer::waitInit()
throw Exception(ErrorCodes::RAFT_ERROR, "Failed to wait RAFT initialization");
}
std::unordered_set<int64_t> KeeperServer::getDeadSessions()
std::vector<int64_t> KeeperServer::getDeadSessions()
{
return state_machine->getDeadSessions();
}

View File

@ -71,7 +71,7 @@ public:
RaftAppendResult putRequestBatch(const KeeperStorage::RequestsForSessions & requests);
/// Return set of the non-active sessions
std::unordered_set<int64_t> getDeadSessions();
std::vector<int64_t> getDeadSessions();
bool isLeader() const;

View File

@ -308,7 +308,7 @@ void KeeperStateMachine::processReadRequest(const KeeperStorage::RequestForSessi
responses_queue.push(response);
}
std::unordered_set<int64_t> KeeperStateMachine::getDeadSessions()
std::vector<int64_t> KeeperStateMachine::getDeadSessions()
{
std::lock_guard lock(storage_and_responses_lock);
return storage->getDeadSessions();

View File

@ -71,7 +71,7 @@ public:
/// Process local read request
void processReadRequest(const KeeperStorage::RequestForSession & request_for_session);
std::unordered_set<int64_t> getDeadSessions();
std::vector<int64_t> getDeadSessions();
void shutdownStorage();

View File

@ -1078,7 +1078,8 @@ KeeperStorage::ResponsesForSessions KeeperStorage::processRequest(const Coordina
zxid = *new_last_zxid;
}
session_expiry_queue.update(session_id, session_and_timeout[session_id]);
/// ZooKeeper update sessions expirity for each request, not only for heartbeats
session_expiry_queue.addNewSessionOrUpdate(session_id, session_and_timeout[session_id]);
if (zk_request->getOpNum() == Coordination::OpNum::Close) /// Close request is special
{

View File

@ -123,7 +123,7 @@ public:
{
auto result = session_id_counter++;
session_and_timeout.emplace(result, session_timeout_ms);
session_expiry_queue.update(result, session_timeout_ms);
session_expiry_queue.addNewSessionOrUpdate(result, session_timeout_ms);
return result;
}
@ -131,7 +131,7 @@ public:
void addSessionID(int64_t session_id, int64_t session_timeout_ms)
{
session_and_timeout.emplace(session_id, session_timeout_ms);
session_expiry_queue.update(session_id, session_timeout_ms);
session_expiry_queue.addNewSessionOrUpdate(session_id, session_timeout_ms);
}
/// Process user request and return response.
@ -172,7 +172,7 @@ public:
}
/// Get all dead sessions
std::unordered_set<int64_t> getDeadSessions()
std::vector<int64_t> getDeadSessions()
{
return session_expiry_queue.getExpiredSessions();
}

View File

@ -1,82 +1,96 @@
#include <Coordination/SessionExpiryQueue.h>
#include <common/logger_useful.h>
namespace DB
{
bool SessionExpiryQueue::remove(int64_t session_id)
{
auto session_it = session_to_timeout.find(session_id);
if (session_it != session_to_timeout.end())
auto session_it = session_to_expiration_time.find(session_id);
if (session_it != session_to_expiration_time.end())
{
auto set_it = expiry_to_sessions.find(session_it->second);
if (set_it != expiry_to_sessions.end())
set_it->second.erase(session_id);
/// No more sessions in this bucket
if (set_it->second.empty())
expiry_to_sessions.erase(set_it);
session_to_expiration_time.erase(session_it);
return true;
}
return false;
}
bool SessionExpiryQueue::update(int64_t session_id, int64_t timeout_ms)
void SessionExpiryQueue::addNewSessionOrUpdate(int64_t session_id, int64_t timeout_ms)
{
auto session_it = session_to_timeout.find(session_id);
int64_t now = getNowMilliseconds();
/// round up to next interval
int64_t new_expiry_time = roundToNextInterval(now + timeout_ms);
if (session_it != session_to_timeout.end())
auto session_it = session_to_expiration_time.find(session_id);
/// We already registered this session
if (session_it != session_to_expiration_time.end())
{
if (new_expiry_time == session_it->second)
return false;
int64_t prev_expiry_time = session_it->second;
session_it->second = new_expiry_time;
/// Nothing changed, session stay in the some bucket
if (new_expiry_time == prev_expiry_time)
return;
/// This bucket doesn't exist, let's create it
auto set_it = expiry_to_sessions.find(new_expiry_time);
if (set_it == expiry_to_sessions.end())
std::tie(set_it, std::ignore) = expiry_to_sessions.emplace(new_expiry_time, std::unordered_set<int64_t>());
/// Add session to the next bucket
set_it->second.insert(session_id);
int64_t prev_expiry_time = session_it->second;
if (prev_expiry_time != new_expiry_time)
{
auto prev_set_it = expiry_to_sessions.find(prev_expiry_time);
if (prev_set_it != expiry_to_sessions.end())
prev_set_it->second.erase(session_id);
}
session_it->second = new_expiry_time;
return true;
auto prev_set_it = expiry_to_sessions.find(prev_expiry_time);
/// Remove session from previous bucket
if (prev_set_it != expiry_to_sessions.end())
prev_set_it->second.erase(session_id);
/// No more sessions in this bucket
if (prev_set_it->second.empty())
expiry_to_sessions.erase(prev_set_it);
}
else
{
session_to_timeout[session_id] = new_expiry_time;
/// Just add sessions to the new bucket
session_to_expiration_time[session_id] = new_expiry_time;
auto set_it = expiry_to_sessions.find(new_expiry_time);
if (set_it == expiry_to_sessions.end())
std::tie(set_it, std::ignore) = expiry_to_sessions.emplace(new_expiry_time, std::unordered_set<int64_t>());
set_it->second.insert(session_id);
return false;
}
}
std::unordered_set<int64_t> SessionExpiryQueue::getExpiredSessions()
std::vector<int64_t> SessionExpiryQueue::getExpiredSessions() const
{
int64_t now = getNowMilliseconds();
if (now < next_expiration_time)
return {};
std::vector<int64_t> result;
auto set_it = expiry_to_sessions.find(next_expiration_time);
int64_t new_expiration_time = next_expiration_time + expiration_interval;
next_expiration_time = new_expiration_time;
if (set_it != expiry_to_sessions.end())
/// Check all buckets
for (const auto & [expire_time, expired_sessions] : expiry_to_sessions)
{
auto result = set_it->second;
expiry_to_sessions.erase(set_it);
return result;
if (expire_time <= now)
result.insert(result.end(), expired_sessions.begin(), expired_sessions.end());
else
break;
}
return {};
return result;
}
void SessionExpiryQueue::clear()
{
session_to_timeout.clear();
session_to_expiration_time.clear();
expiry_to_sessions.clear();
}

View File

@ -1,19 +1,32 @@
#pragma once
#include <map>
#include <unordered_map>
#include <unordered_set>
#include <vector>
#include <chrono>
namespace DB
{
/// Simple class for checking expired sessions. Main idea -- to round sessions
/// timeouts and place all sessions into buckets rounded by their expired time.
/// So we will have not too many different buckets and can check expired
/// sessions quite fast.
/// So buckets looks like this:
/// [1630580418000] -> {1, 5, 6}
/// [1630580418500] -> {2, 3}
/// ...
/// When new session appear it's added to the existing bucket or create new bucket.
class SessionExpiryQueue
{
private:
std::unordered_map<int64_t, int64_t> session_to_timeout;
std::unordered_map<int64_t, std::unordered_set<int64_t>> expiry_to_sessions;
/// Session -> timeout ms
std::unordered_map<int64_t, int64_t> session_to_expiration_time;
/// Expire time -> session expire near this time
std::map<int64_t, std::unordered_set<int64_t>> expiry_to_sessions;
int64_t expiration_interval;
int64_t next_expiration_time;
static int64_t getNowMilliseconds()
{
@ -21,23 +34,30 @@ private:
return duration_cast<milliseconds>(system_clock::now().time_since_epoch()).count();
}
/// Round time to the next expiration interval. The result used as a key for
/// expiry_to_sessions map.
int64_t roundToNextInterval(int64_t time) const
{
return (time / expiration_interval + 1) * expiration_interval;
}
public:
/// expiration_interval -- how often we will check new sessions and how small
/// buckets we will have. In ZooKeeper normal session timeout is around 30 seconds
/// and expiration_interval is about 500ms.
explicit SessionExpiryQueue(int64_t expiration_interval_)
: expiration_interval(expiration_interval_)
, next_expiration_time(roundToNextInterval(getNowMilliseconds()))
{
}
/// Session was actually removed
bool remove(int64_t session_id);
bool update(int64_t session_id, int64_t timeout_ms);
/// Update session expiry time (must be called on hearbeats)
void addNewSessionOrUpdate(int64_t session_id, int64_t timeout_ms);
std::unordered_set<int64_t> getExpiredSessions();
/// Get all expired sessions
std::vector<int64_t> getExpiredSessions() const;
void clear();
};

View File

@ -404,6 +404,7 @@ TEST(CoordinationTest, ChangelogTestCompaction)
/// And we able to read it
DB::KeeperLogStore changelog_reader("./logs", 5, true);
changelog_reader.init(7, 0);
EXPECT_EQ(changelog_reader.size(), 1);
EXPECT_EQ(changelog_reader.start_index(), 7);
EXPECT_EQ(changelog_reader.next_slot(), 8);
@ -1317,7 +1318,8 @@ TEST(CoordinationTest, TestRotateIntervalChanges)
}
}
EXPECT_TRUE(fs::exists("./logs/changelog_0_99.bin"));
EXPECT_TRUE(fs::exists("./logs/changelog_1_100.bin"));
DB::KeeperLogStore changelog_1("./logs", 10, true);
changelog_1.init(0, 50);
@ -1330,8 +1332,8 @@ TEST(CoordinationTest, TestRotateIntervalChanges)
changelog_1.end_of_append_batch(0, 0);
}
EXPECT_TRUE(fs::exists("./logs/changelog_0_99.bin"));
EXPECT_TRUE(fs::exists("./logs/changelog_100_109.bin"));
EXPECT_TRUE(fs::exists("./logs/changelog_1_100.bin"));
EXPECT_TRUE(fs::exists("./logs/changelog_101_110.bin"));
DB::KeeperLogStore changelog_2("./logs", 7, true);
changelog_2.init(98, 55);
@ -1346,11 +1348,12 @@ TEST(CoordinationTest, TestRotateIntervalChanges)
}
changelog_2.compact(105);
EXPECT_FALSE(fs::exists("./logs/changelog_0_99.bin"));
EXPECT_TRUE(fs::exists("./logs/changelog_100_109.bin"));
EXPECT_TRUE(fs::exists("./logs/changelog_110_116.bin"));
EXPECT_TRUE(fs::exists("./logs/changelog_117_123.bin"));
EXPECT_TRUE(fs::exists("./logs/changelog_124_130.bin"));
EXPECT_FALSE(fs::exists("./logs/changelog_1_100.bin"));
EXPECT_TRUE(fs::exists("./logs/changelog_101_110.bin"));
EXPECT_TRUE(fs::exists("./logs/changelog_111_117.bin"));
EXPECT_TRUE(fs::exists("./logs/changelog_118_124.bin"));
EXPECT_TRUE(fs::exists("./logs/changelog_125_131.bin"));
DB::KeeperLogStore changelog_3("./logs", 5, true);
changelog_3.init(116, 3);
@ -1364,14 +1367,31 @@ TEST(CoordinationTest, TestRotateIntervalChanges)
}
changelog_3.compact(125);
EXPECT_FALSE(fs::exists("./logs/changelog_100_109.bin"));
EXPECT_FALSE(fs::exists("./logs/changelog_110_116.bin"));
EXPECT_FALSE(fs::exists("./logs/changelog_117_123.bin"));
EXPECT_FALSE(fs::exists("./logs/changelog_101_110.bin"));
EXPECT_FALSE(fs::exists("./logs/changelog_111_117.bin"));
EXPECT_FALSE(fs::exists("./logs/changelog_118_124.bin"));
EXPECT_TRUE(fs::exists("./logs/changelog_124_130.bin"));
EXPECT_TRUE(fs::exists("./logs/changelog_131_135.bin"));
EXPECT_TRUE(fs::exists("./logs/changelog_136_140.bin"));
EXPECT_TRUE(fs::exists("./logs/changelog_141_145.bin"));
EXPECT_TRUE(fs::exists("./logs/changelog_125_131.bin"));
EXPECT_TRUE(fs::exists("./logs/changelog_132_136.bin"));
EXPECT_TRUE(fs::exists("./logs/changelog_137_141.bin"));
EXPECT_TRUE(fs::exists("./logs/changelog_142_146.bin"));
}
TEST(CoordinationTest, TestSessionExpiryQueue)
{
using namespace Coordination;
SessionExpiryQueue queue(500);
queue.addNewSessionOrUpdate(1, 1000);
for (size_t i = 0; i < 2; ++i)
{
EXPECT_EQ(queue.getExpiredSessions(), std::vector<int64_t>({}));
std::this_thread::sleep_for(std::chrono::milliseconds(400));
}
std::this_thread::sleep_for(std::chrono::milliseconds(700));
EXPECT_EQ(queue.getExpiredSessions(), std::vector<int64_t>({1}));
}

View File

@ -141,7 +141,7 @@ NamesAndTypesList collect(const NamesAndTypesList & names_and_types)
auto nested_types = getSubcolumnsOfNested(names_and_types);
for (const auto & name_type : names_and_types)
if (!nested_types.count(splitName(name_type.name).first))
if (!isArray(name_type.type) || !nested_types.count(splitName(name_type.name).first))
res.push_back(name_type);
for (const auto & name_type : nested_types)
@ -157,6 +157,9 @@ NamesAndTypesList convertToSubcolumns(const NamesAndTypesList & names_and_types)
for (auto & name_type : res)
{
if (!isArray(name_type.type))
continue;
auto split = splitName(name_type.name);
if (name_type.isSubcolumn() || split.second.empty())
continue;

View File

@ -0,0 +1,43 @@
#include <DataTypes/NestedUtils.h>
#include <DataTypes/DataTypesNumber.h>
#include <DataTypes/DataTypeArray.h>
#include <DataTypes/DataTypeNested.h>
#include <gtest/gtest.h>
using namespace DB;
GTEST_TEST(NestedUtils, collect)
{
DataTypePtr uint_type = std::make_shared<DataTypeUInt32>();
DataTypePtr array_type = std::make_shared<DataTypeArray>(std::make_shared<DataTypeUInt32>());
const NamesAndTypesList source_columns =
{
{"id", uint_type},
{"arr1", array_type},
{"b.id", uint_type},
{"b.arr1", array_type},
{"b.arr2", array_type}
};
auto nested_type = createNested({uint_type, uint_type}, {"arr1", "arr2"});
const NamesAndTypesList columns_with_subcolumns =
{
{"id", uint_type},
{"arr1", array_type},
{"b.id", uint_type},
{"b", "arr1", nested_type, array_type},
{"b", "arr2", nested_type, array_type}
};
const NamesAndTypesList columns_with_nested =
{
{"id", uint_type},
{"arr1", array_type},
{"b.id", uint_type},
{"b", nested_type},
};
ASSERT_EQ(Nested::convertToSubcolumns(source_columns).toString(), columns_with_subcolumns.toString());
ASSERT_EQ(Nested::collect(source_columns).toString(), columns_with_nested.toString());
}

View File

@ -1422,16 +1422,32 @@ public:
Monotonicity getMonotonicityForRange(const IDataType &, const Field & left_point, const Field & right_point) const override
{
// For simplicity, we treat null values as monotonicity breakers.
const std::string_view name_view = Name::name;
// For simplicity, we treat null values as monotonicity breakers, except for variable / non-zero constant.
if (left_point.isNull() || right_point.isNull())
{
if (name_view == "divide" || name_view == "intDiv")
{
// variable / constant
if (right.column && isColumnConst(*right.column))
{
auto constant = (*right.column)[0];
if (applyVisitor(FieldVisitorAccurateEquals(), constant, Field(0)))
return {false, true, false}; // variable / 0 is undefined, let's treat it as non-monotonic
bool is_constant_positive = applyVisitor(FieldVisitorAccurateLess(), Field(0), constant);
// division is saturated to `inf`, thus it doesn't have overflow issues.
return {true, is_constant_positive, true};
}
}
return {false, true, false};
}
// For simplicity, we treat every single value interval as positive monotonic.
if (applyVisitor(FieldVisitorAccurateEquals(), left_point, right_point))
return {true, true, false};
const std::string_view name_view = Name::name;
if (name_view == "minus" || name_view == "plus")
{
// const +|- variable
@ -1503,14 +1519,14 @@ public:
return {true, true, false}; // 0 / 0 is undefined, thus it's not always monotonic
bool is_constant_positive = applyVisitor(FieldVisitorAccurateLess(), Field(0), constant);
if (applyVisitor(FieldVisitorAccurateLess(), left_point, Field(0)) &&
applyVisitor(FieldVisitorAccurateLess(), right_point, Field(0)))
if (applyVisitor(FieldVisitorAccurateLess(), left_point, Field(0))
&& applyVisitor(FieldVisitorAccurateLess(), right_point, Field(0)))
{
return {true, is_constant_positive, false};
}
else
if (applyVisitor(FieldVisitorAccurateLess(), Field(0), left_point) &&
applyVisitor(FieldVisitorAccurateLess(), Field(0), right_point))
else if (
applyVisitor(FieldVisitorAccurateLess(), Field(0), left_point)
&& applyVisitor(FieldVisitorAccurateLess(), Field(0), right_point))
{
return {true, !is_constant_positive, false};
}
@ -1524,7 +1540,7 @@ public:
bool is_constant_positive = applyVisitor(FieldVisitorAccurateLess(), Field(0), constant);
// division is saturated to `inf`, thus it doesn't have overflow issues.
return {true, is_constant_positive, false};
return {true, is_constant_positive, true};
}
}
return {false, true, false};

View File

@ -321,21 +321,32 @@ public:
Strings attribute_names = getAttributeNamesFromColumn(arguments[1].column, arguments[1].type);
DataTypes types;
auto dictionary_structure = helper.getDictionaryStructure(dictionary_name);
DataTypes attribute_types;
attribute_types.reserve(attribute_names.size());
for (auto & attribute_name : attribute_names)
{
/// We're extracting the return type from the dictionary's config, without loading the dictionary.
auto attribute = dictionary_structure.getAttribute(attribute_name);
types.emplace_back(attribute.type);
const auto & attribute = dictionary_structure.getAttribute(attribute_name);
attribute_types.emplace_back(attribute.type);
}
if (types.size() > 1)
return std::make_shared<DataTypeTuple>(types, attribute_names);
bool key_is_nullable = arguments[2].type->isNullable();
if (attribute_types.size() > 1)
{
if (key_is_nullable)
throw Exception(ErrorCodes::UNSUPPORTED_METHOD, "Function {} support nullable key only for single dictionary attribute", getName());
return std::make_shared<DataTypeTuple>(attribute_types, attribute_names);
}
else
return types.front();
{
if (key_is_nullable)
return makeNullable(attribute_types.front());
else
return attribute_types.front();
}
}
ColumnPtr executeImpl(const ColumnsWithTypeAndName & arguments, const DataTypePtr & result_type, size_t input_rows_count) const override
@ -418,7 +429,9 @@ public:
default_cols = tuple_column->getColumnsCopy();
}
else
{
default_cols.emplace_back(result);
}
}
else
{
@ -426,7 +439,16 @@ public:
default_cols.emplace_back(nullptr);
}
const auto & key_col_with_type = arguments[2];
auto key_col_with_type = arguments[2];
bool key_is_only_null = key_col_with_type.type->onlyNull();
if (key_is_only_null)
return result_type->createColumnConstWithDefaultValue(input_rows_count);
bool key_is_nullable = key_col_with_type.type->isNullable();
if (key_is_nullable)
key_col_with_type = columnGetNested(key_col_with_type);
auto key_column = key_col_with_type.column;
Columns key_columns;
@ -482,7 +504,26 @@ public:
key_types.emplace_back(range_col_type);
}
return executeDictionaryRequest(dictionary, attribute_names, key_columns, key_types, result_type, default_cols);
DataTypePtr attribute_type = result_type;
if (key_is_nullable)
{
DataTypes attribute_types;
attribute_types.reserve(attribute_names.size());
for (auto & attribute_name : attribute_names)
{
const auto & attribute = dictionary->getStructure().getAttribute(attribute_name);
attribute_types.emplace_back(attribute.type);
}
attribute_type = attribute_types.front();
}
auto result_column = executeDictionaryRequest(dictionary, attribute_names, key_columns, key_types, attribute_type, default_cols);
if (key_is_nullable)
result_column = wrapInNullable(result_column, {arguments[2]}, result_type, input_rows_count);
return result_column;
}
private:
@ -511,12 +552,14 @@ private:
result = ColumnTuple::create(std::move(result_columns));
}
else
{
result = dictionary->getColumn(
attribute_names[0],
result_type,
key_columns,
key_types,
default_cols.front());
}
return result;
}
@ -526,7 +569,9 @@ private:
Strings attribute_names;
if (const auto * name_col = checkAndGetColumnConst<ColumnString>(column.get()))
{
attribute_names.emplace_back(name_col->getValue<String>());
}
else if (const auto * tuple_col_const = checkAndGetColumnConst<ColumnTuple>(column.get()))
{
const ColumnTuple & tuple_col = assert_cast<const ColumnTuple &>(tuple_col_const->getDataColumn());
@ -551,10 +596,12 @@ private:
}
}
else
{
throw Exception(ErrorCodes::ILLEGAL_TYPE_OF_ARGUMENT,
"Illegal type {} of second argument of function {}, expected a const string or const tuple of const strings.",
type->getName(),
getName());
}
return attribute_names;
}

View File

@ -4,6 +4,7 @@
#include <IO/BufferWithOwnMemory.h>
#include <IO/CompressionMethod.h>
#include <zlib.h>

View File

@ -4,6 +4,7 @@
#include <IO/BufferWithOwnMemory.h>
#include <IO/CompressionMethod.h>
#include <zlib.h>

View File

@ -1,6 +1,7 @@
#include <unistd.h>
#include <vector>
#include <stdexcept>
#include <zlib.h>
#pragma GCC diagnostic ignored "-Wold-style-cast"

View File

@ -206,6 +206,11 @@ AccessRightsElements InterpreterAlterQuery::getRequiredAccessForCommand(const AS
required_access.emplace_back(AccessType::ALTER_COMMENT_COLUMN, database, table, column_name());
break;
}
case ASTAlterCommand::MATERIALIZE_COLUMN:
{
required_access.emplace_back(AccessType::ALTER_MATERIALIZE_COLUMN, database, table);
break;
}
case ASTAlterCommand::MODIFY_ORDER_BY:
{
required_access.emplace_back(AccessType::ALTER_ORDER_BY, database, table);

View File

@ -319,7 +319,13 @@ AccessRightsElements InterpreterKillQueryQuery::getRequiredAccessForDDLOnCluster
if (query.type == ASTKillQueryQuery::Type::Query)
required_access.emplace_back(AccessType::KILL_QUERY);
else if (query.type == ASTKillQueryQuery::Type::Mutation)
required_access.emplace_back(AccessType::ALTER_UPDATE | AccessType::ALTER_DELETE | AccessType::ALTER_MATERIALIZE_INDEX | AccessType::ALTER_MATERIALIZE_TTL);
required_access.emplace_back(
AccessType::ALTER_UPDATE
| AccessType::ALTER_DELETE
| AccessType::ALTER_MATERIALIZE_INDEX
| AccessType::ALTER_MATERIALIZE_COLUMN
| AccessType::ALTER_MATERIALIZE_TTL
);
return required_access;
}

View File

@ -2,6 +2,10 @@
#if USE_EMBEDDED_COMPILER
#include <sys/mman.h>
#include <boost/noncopyable.hpp>
#include <llvm/Analysis/TargetTransformInfo.h>
#include <llvm/IR/BasicBlock.h>
#include <llvm/IR/DataLayout.h>
@ -22,7 +26,10 @@
#include <llvm/Transforms/IPO/PassManagerBuilder.h>
#include <llvm/Support/SmallVectorMemoryBuffer.h>
#include <common/getPageSize.h>
#include <Common/Exception.h>
#include <Common/formatReadable.h>
namespace DB
{
@ -31,6 +38,8 @@ namespace ErrorCodes
{
extern const int CANNOT_COMPILE_CODE;
extern const int LOGICAL_ERROR;
extern const int CANNOT_ALLOCATE_MEMORY;
extern const int CANNOT_MPROTECT;
}
/** Simple module to object file compiler.
@ -80,6 +89,161 @@ private:
llvm::TargetMachine & target_machine;
};
/** Arena that allocate all memory with system page_size.
* All allocated pages can be protected with protection_flags using protect method.
* During destruction all allocated pages protection_flags will be reset.
*/
class PageArena : private boost::noncopyable
{
public:
PageArena() : page_size(::getPageSize()) {}
char * allocate(size_t size, size_t alignment)
{
/** First check if in some allocated page blocks there are enough free memory to make allocation.
* If there is no such block create it and then allocate from it.
*/
for (size_t i = 0; i < page_blocks.size(); ++i)
{
char * result = tryAllocateFromPageBlockWithIndex(size, alignment, i);
if (result)
return result;
}
allocateNextPageBlock(size);
size_t allocated_page_index = page_blocks.size() - 1;
char * result = tryAllocateFromPageBlockWithIndex(size, alignment, allocated_page_index);
assert(result);
return result;
}
inline size_t getAllocatedSize() const { return allocated_size; }
inline size_t getPageSize() const { return page_size; }
~PageArena()
{
protect(PROT_READ | PROT_WRITE);
for (auto & page_block : page_blocks)
free(page_block.base());
}
void protect(int protection_flags)
{
/** The code is partially based on the LLVM codebase
* The LLVM Project is under the Apache License v2.0 with LLVM Exceptions.
*/
# if defined(__NetBSD__) && defined(PROT_MPROTECT)
protection_flags |= PROT_MPROTECT(PROT_READ | PROT_WRITE | PROT_EXEC);
# endif
bool invalidate_cache = (protection_flags & PROT_EXEC);
for (const auto & block : page_blocks)
{
# if defined(__arm__) || defined(__aarch64__)
/// Certain ARM implementations treat icache clear instruction as a memory read,
/// and CPU segfaults on trying to clear cache on !PROT_READ page.
/// Therefore we need to temporarily add PROT_READ for the sake of flushing the instruction caches.
if (invalidate_cache && !(protection_flags & PROT_READ))
{
int res = mprotect(block.base(), block.blockSize(), protection_flags | PROT_READ);
if (res != 0)
throwFromErrno("Cannot mprotect memory region", ErrorCodes::CANNOT_MPROTECT);
llvm::sys::Memory::InvalidateInstructionCache(block.base(), block.blockSize());
InvalidateCache = false;
}
# endif
int res = mprotect(block.base(), block.blockSize(), protection_flags);
if (res != 0)
throwFromErrno("Cannot mprotect memory region", ErrorCodes::CANNOT_MPROTECT);
if (invalidate_cache)
llvm::sys::Memory::InvalidateInstructionCache(block.base(), block.blockSize());
}
}
private:
struct PageBlock
{
public:
PageBlock(void * pages_base_, size_t pages_size_, size_t page_size_)
: pages_base(pages_base_), pages_size(pages_size_), page_size(page_size_)
{
}
inline void * base() const { return pages_base; }
inline size_t pagesSize() const { return pages_size; }
inline size_t pageSize() const { return page_size; }
inline size_t blockSize() const { return pages_size * page_size; }
private:
void * pages_base;
size_t pages_size;
size_t page_size;
};
std::vector<PageBlock> page_blocks;
std::vector<size_t> page_blocks_allocated_size;
size_t page_size = 0;
size_t allocated_size = 0;
char * tryAllocateFromPageBlockWithIndex(size_t size, size_t alignment, size_t page_block_index)
{
assert(page_block_index < page_blocks.size());
auto & pages_block = page_blocks[page_block_index];
size_t block_size = pages_block.blockSize();
size_t & block_allocated_size = page_blocks_allocated_size[page_block_index];
size_t block_free_size = block_size - block_allocated_size;
uint8_t * pages_start = static_cast<uint8_t *>(pages_block.base());
void * pages_offset = pages_start + block_allocated_size;
auto * result = std::align(alignment, size, pages_offset, block_free_size);
if (result)
{
block_allocated_size = reinterpret_cast<uint8_t *>(result) - pages_start;
block_allocated_size += size;
return static_cast<char *>(result);
}
else
{
return nullptr;
}
}
void allocateNextPageBlock(size_t size)
{
size_t pages_to_allocate_size = ((size / page_size) + 1) * 2;
size_t allocate_size = page_size * pages_to_allocate_size;
void * buf = nullptr;
int res = posix_memalign(&buf, page_size, allocate_size);
if (res != 0)
throwFromErrno(
fmt::format("Cannot allocate memory (posix_memalign) alignment {} size {}.", page_size, ReadableSize(allocate_size)),
ErrorCodes::CANNOT_ALLOCATE_MEMORY,
res);
page_blocks.emplace_back(buf, pages_to_allocate_size, page_size);
page_blocks_allocated_size.emplace_back(0);
allocated_size += allocate_size;
}
};
// class AssemblyPrinter
// {
// public:
@ -104,46 +268,43 @@ private:
/** MemoryManager for module.
* Keep total allocated size during RuntimeDyld linker execution.
* Actual compiled code memory is stored in llvm::SectionMemoryManager member, we cannot use ZeroBase optimization here
* because it is required for llvm::SectionMemoryManager::MemoryMapper to live longer than llvm::SectionMemoryManager.
*/
class JITModuleMemoryManager
class JITModuleMemoryManager : public llvm::RTDyldMemoryManager
{
class DefaultMMapper final : public llvm::SectionMemoryManager::MemoryMapper
{
public:
llvm::sys::MemoryBlock allocateMappedMemory(
llvm::SectionMemoryManager::AllocationPurpose Purpose [[maybe_unused]],
size_t NumBytes,
const llvm::sys::MemoryBlock * const NearBlock,
unsigned Flags,
std::error_code & EC) override
{
auto allocated_memory_block = llvm::sys::Memory::allocateMappedMemory(NumBytes, NearBlock, Flags, EC);
allocated_size += allocated_memory_block.allocatedSize();
return allocated_memory_block;
}
std::error_code protectMappedMemory(const llvm::sys::MemoryBlock & Block, unsigned Flags) override
{
return llvm::sys::Memory::protectMappedMemory(Block, Flags);
}
std::error_code releaseMappedMemory(llvm::sys::MemoryBlock & M) override { return llvm::sys::Memory::releaseMappedMemory(M); }
size_t allocated_size = 0;
};
public:
JITModuleMemoryManager() : manager(&mmaper) { }
inline size_t getAllocatedSize() const { return mmaper.allocated_size; }
uint8_t * allocateCodeSection(uintptr_t size, unsigned alignment, unsigned, llvm::StringRef) override
{
return reinterpret_cast<uint8_t *>(ex_page_arena.allocate(size, alignment));
}
inline llvm::SectionMemoryManager & getManager() { return manager; }
uint8_t * allocateDataSection(uintptr_t size, unsigned alignment, unsigned, llvm::StringRef, bool is_read_only) override
{
if (is_read_only)
return reinterpret_cast<uint8_t *>(ro_page_arena.allocate(size, alignment));
else
return reinterpret_cast<uint8_t *>(rw_page_arena.allocate(size, alignment));
}
bool finalizeMemory(std::string *) override
{
ro_page_arena.protect(PROT_READ);
ex_page_arena.protect(PROT_READ | PROT_EXEC);
return true;
}
inline size_t allocatedSize() const
{
size_t data_size = rw_page_arena.getAllocatedSize() + ro_page_arena.getAllocatedSize();
size_t code_size = ex_page_arena.getAllocatedSize();
return data_size + code_size;
}
private:
DefaultMMapper mmaper;
llvm::SectionMemoryManager manager;
PageArena rw_page_arena;
PageArena ro_page_arena;
PageArena ex_page_arena;
};
class JITSymbolResolver : public llvm::LegacyJITSymbolResolver
@ -249,12 +410,12 @@ CHJIT::CompiledModule CHJIT::compileModule(std::unique_ptr<llvm::Module> module)
}
std::unique_ptr<JITModuleMemoryManager> module_memory_manager = std::make_unique<JITModuleMemoryManager>();
llvm::RuntimeDyld dynamic_linker = {module_memory_manager->getManager(), *symbol_resolver};
llvm::RuntimeDyld dynamic_linker = {*module_memory_manager, *symbol_resolver};
std::unique_ptr<llvm::RuntimeDyld::LoadedObjectInfo> linked_object = dynamic_linker.loadObject(*object.get());
dynamic_linker.resolveRelocations();
module_memory_manager->getManager().finalizeMemory();
module_memory_manager->finalizeMemory(nullptr);
CompiledModule compiled_module;
@ -275,7 +436,7 @@ CHJIT::CompiledModule CHJIT::compileModule(std::unique_ptr<llvm::Module> module)
compiled_module.function_name_to_symbol.emplace(std::move(function_name), jit_symbol_address);
}
compiled_module.size = module_memory_manager->getAllocatedSize();
compiled_module.size = module_memory_manager->allocatedSize();
compiled_module.identifier = current_module_key;
module_identifier_to_memory_manager[current_module_key] = std::move(module_memory_manager);

View File

@ -543,6 +543,17 @@ ASTPtr MutationsInterpreter::prepare(bool dry_run)
}
}
}
else if (command.type == MutationCommand::MATERIALIZE_COLUMN)
{
mutation_kind.set(MutationKind::MUTATE_OTHER);
if (stages.empty() || !stages.back().column_to_updated.empty())
stages.emplace_back(context);
if (stages.size() == 1) /// First stage only supports filtering and can't update columns.
stages.emplace_back(context);
const auto & column = columns_desc.get(command.column_name);
stages.back().column_to_updated.emplace(column.name, column.default_desc.expression->clone());
}
else if (command.type == MutationCommand::MATERIALIZE_INDEX)
{
mutation_kind.set(MutationKind::MUTATE_INDEX_PROJECTION);

View File

@ -120,6 +120,17 @@ void ASTAlterCommand::formatImpl(
}
}
}
else if (type == ASTAlterCommand::MATERIALIZE_COLUMN)
{
settings.ostr << (settings.hilite ? hilite_keyword : "") << indent_str
<< "MATERIALIZE COLUMN " << (settings.hilite ? hilite_none : "");
column->formatImpl(settings, state, frame);
if (partition)
{
settings.ostr << (settings.hilite ? hilite_keyword : "") << indent_str << " IN PARTITION " << (settings.hilite ? hilite_none : "");
partition->formatImpl(settings, state, frame);
}
}
else if (type == ASTAlterCommand::COMMENT_COLUMN)
{
settings.ostr << (settings.hilite ? hilite_keyword : "") << indent_str << "COMMENT COLUMN " << (if_exists ? "IF EXISTS " : "") << (settings.hilite ? hilite_none : "");

View File

@ -31,6 +31,8 @@ public:
MODIFY_COLUMN,
COMMENT_COLUMN,
RENAME_COLUMN,
MATERIALIZE_COLUMN,
MODIFY_ORDER_BY,
MODIFY_SAMPLE_BY,
MODIFY_TTL,

View File

@ -28,6 +28,8 @@ bool ParserAlterCommand::parseImpl(Pos & pos, ASTPtr & node, Expected & expected
ParserKeyword s_modify_column("MODIFY COLUMN");
ParserKeyword s_rename_column("RENAME COLUMN");
ParserKeyword s_comment_column("COMMENT COLUMN");
ParserKeyword s_materialize_column("MATERIALIZE COLUMN");
ParserKeyword s_modify_order_by("MODIFY ORDER BY");
ParserKeyword s_modify_sample_by("MODIFY SAMPLE BY");
ParserKeyword s_modify_ttl("MODIFY TTL");
@ -168,6 +170,20 @@ bool ParserAlterCommand::parseImpl(Pos & pos, ASTPtr & node, Expected & expected
command->type = ASTAlterCommand::RENAME_COLUMN;
}
else if (s_materialize_column.ignore(pos, expected))
{
if (!parser_name.parse(pos, command->column, expected))
return false;
command->type = ASTAlterCommand::MATERIALIZE_COLUMN;
command->detach = false;
if (s_in_partition.ignore(pos, expected))
{
if (!parser_partition.parse(pos, command->partition, expected))
return false;
}
}
else if (s_drop_partition.ignore(pos, expected))
{
if (!parser_partition.parse(pos, command->partition, expected))

View File

@ -322,7 +322,7 @@ bool CSVRowInputFormat::parseRowAndPrintDiagnosticInfo(MutableColumns & columns,
verbosePrintString(in.position(), in.position() + 1, out);
out << " found instead.\n"
" It's like your file has more columns than expected.\n"
"And if your file have right number of columns, maybe it have unquoted string value with comma.\n";
"And if your file has the right number of columns, maybe it has an unquoted string value with a comma.\n";
return false;
}
@ -341,7 +341,7 @@ bool CSVRowInputFormat::parseRowAndPrintDiagnosticInfo(MutableColumns & columns,
{
out << "ERROR: Line feed found where delimiter (" << delimiter << ") is expected."
" It's like your file has less columns than expected.\n"
"And if your file have right number of columns, maybe it have unescaped quotes in values.\n";
"And if your file has the right number of columns, maybe it has unescaped quotes in values.\n";
}
else
{

View File

@ -284,7 +284,7 @@ bool TabSeparatedRowInputFormat::parseRowAndPrintDiagnosticInfo(MutableColumns &
{
out << "ERROR: Tab found where line feed is expected."
" It's like your file has more columns than expected.\n"
"And if your file have right number of columns, maybe it have unescaped tab in value.\n";
"And if your file has the right number of columns, maybe it has an unescaped tab in a value.\n";
}
else if (*in.position() == '\r')
{
@ -313,8 +313,8 @@ bool TabSeparatedRowInputFormat::parseRowAndPrintDiagnosticInfo(MutableColumns &
{
out << "ERROR: Line feed found where tab is expected."
" It's like your file has less columns than expected.\n"
"And if your file have right number of columns, "
"maybe it have unescaped backslash in value before tab, which cause tab has escaped.\n";
"And if your file has the right number of columns, "
"maybe it has an unescaped backslash in value before tab, which causes the tab to be escaped.\n";
}
else if (*in.position() == '\r')
{

View File

@ -192,7 +192,7 @@ struct SocketInterruptablePollWrapper
KeeperTCPHandler::KeeperTCPHandler(IServer & server_, const Poco::Net::StreamSocket & socket_)
: Poco::Net::TCPServerConnection(socket_)
, server(server_)
, log(&Poco::Logger::get("NuKeeperTCPHandler"))
, log(&Poco::Logger::get("KeeperTCPHandler"))
, global_context(Context::createCopy(server.context()))
, keeper_dispatcher(global_context->getKeeperDispatcher())
, operation_timeout(0, global_context->getConfigRef().getUInt("keeper_server.operation_timeout_ms", Coordination::DEFAULT_OPERATION_TIMEOUT_MS) * 1000)

View File

@ -668,44 +668,34 @@ void KeyCondition::traverseAST(const ASTPtr & node, ContextPtr context, Block &
rpn.emplace_back(std::move(element));
}
bool KeyCondition::canConstantBeWrappedByMonotonicFunctions(
const ASTPtr & node,
/** The key functional expression constraint may be inferred from a plain column in the expression.
* For example, if the key contains `toStartOfHour(Timestamp)` and query contains `WHERE Timestamp >= now()`,
* it can be assumed that if `toStartOfHour()` is monotonic on [now(), inf), the `toStartOfHour(Timestamp) >= toStartOfHour(now())`
* condition also holds, so the index may be used to select only parts satisfying this condition.
*
* To check the assumption, we'd need to assert that the inverse function to this transformation is also monotonic, however the
* inversion isn't exported (or even viable for not strictly monotonic functions such as `toStartOfHour()`).
* Instead, we can qualify only functions that do not transform the range (for example rounding),
* which while not strictly monotonic, are monotonic everywhere on the input range.
*/
bool KeyCondition::transformConstantWithValidFunctions(
const String & expr_name,
size_t & out_key_column_num,
DataTypePtr & out_key_column_type,
Field & out_value,
DataTypePtr & out_type)
DataTypePtr & out_type,
std::function<bool(IFunctionBase &, const IDataType &)> always_monotonic) const
{
String expr_name = node->getColumnNameWithoutAlias();
if (array_joined_columns.count(expr_name))
return false;
if (key_subexpr_names.count(expr_name) == 0)
return false;
if (out_value.isNull())
return false;
const auto & sample_block = key_expr->getSampleBlock();
/** The key functional expression constraint may be inferred from a plain column in the expression.
* For example, if the key contains `toStartOfHour(Timestamp)` and query contains `WHERE Timestamp >= now()`,
* it can be assumed that if `toStartOfHour()` is monotonic on [now(), inf), the `toStartOfHour(Timestamp) >= toStartOfHour(now())`
* condition also holds, so the index may be used to select only parts satisfying this condition.
*
* To check the assumption, we'd need to assert that the inverse function to this transformation is also monotonic, however the
* inversion isn't exported (or even viable for not strictly monotonic functions such as `toStartOfHour()`).
* Instead, we can qualify only functions that do not transform the range (for example rounding),
* which while not strictly monotonic, are monotonic everywhere on the input range.
*/
for (const auto & dag_node : key_expr->getNodes())
for (const auto & node : key_expr->getNodes())
{
auto it = key_columns.find(dag_node.result_name);
auto it = key_columns.find(node.result_name);
if (it != key_columns.end())
{
std::stack<const ActionsDAG::Node *> chain;
const auto * cur_node = &dag_node;
const auto * cur_node = &node;
bool is_valid_chain = true;
while (is_valid_chain)
@ -715,21 +705,25 @@ bool KeyCondition::canConstantBeWrappedByMonotonicFunctions(
chain.push(cur_node);
if (cur_node->type == ActionsDAG::ActionType::FUNCTION && cur_node->children.size() == 1)
if (cur_node->type == ActionsDAG::ActionType::FUNCTION && cur_node->children.size() <= 2)
{
const auto * next_node = cur_node->children.front();
is_valid_chain = always_monotonic(*cur_node->function_base, *cur_node->result_type);
if (!cur_node->function_base->hasInformationAboutMonotonicity())
is_valid_chain = false;
else
const ActionsDAG::Node * next_node = nullptr;
for (const auto * arg : cur_node->children)
{
/// Range is irrelevant in this case.
auto monotonicity = cur_node->function_base->getMonotonicityForRange(
*next_node->result_type, Field(), Field());
if (!monotonicity.is_always_monotonic)
if (arg->column && isColumnConst(*arg->column))
continue;
if (next_node)
is_valid_chain = false;
next_node = arg;
}
if (!next_node)
is_valid_chain = false;
cur_node = next_node;
}
else if (cur_node->type == ActionsDAG::ActionType::ALIAS)
@ -738,7 +732,7 @@ bool KeyCondition::canConstantBeWrappedByMonotonicFunctions(
is_valid_chain = false;
}
if (is_valid_chain && !chain.empty())
if (is_valid_chain)
{
/// Here we cast constant to the input type.
/// It is not clear, why this works in general.
@ -761,8 +755,30 @@ bool KeyCondition::canConstantBeWrappedByMonotonicFunctions(
if (func->type != ActionsDAG::ActionType::FUNCTION)
continue;
std::tie(const_value, const_type) =
applyFunctionForFieldOfUnknownType(func->function_base, const_type, const_value);
if (func->children.size() == 1)
{
std::tie(const_value, const_type)
= applyFunctionForFieldOfUnknownType(func->function_base, const_type, const_value);
}
else if (func->children.size() == 2)
{
const auto * left = func->children[0];
const auto * right = func->children[1];
if (left->column && isColumnConst(*left->column))
{
auto left_arg_type = left->result_type;
auto left_arg_value = (*left->column)[0];
std::tie(const_value, const_type) = applyBinaryFunctionForFieldOfUnknownType(
func->function_builder, left_arg_type, left_arg_value, const_type, const_value);
}
else
{
auto right_arg_type = right->result_type;
auto right_arg_value = (*right->column)[0];
std::tie(const_value, const_type) = applyBinaryFunctionForFieldOfUnknownType(
func->function_builder, const_type, const_value, right_arg_type, right_arg_value);
}
}
}
out_key_column_num = it->second;
@ -773,10 +789,43 @@ bool KeyCondition::canConstantBeWrappedByMonotonicFunctions(
}
}
}
return false;
}
bool KeyCondition::canConstantBeWrappedByMonotonicFunctions(
const ASTPtr & node,
size_t & out_key_column_num,
DataTypePtr & out_key_column_type,
Field & out_value,
DataTypePtr & out_type)
{
String expr_name = node->getColumnNameWithoutAlias();
if (array_joined_columns.count(expr_name))
return false;
if (key_subexpr_names.count(expr_name) == 0)
return false;
if (out_value.isNull())
return false;
return transformConstantWithValidFunctions(
expr_name, out_key_column_num, out_key_column_type, out_value, out_type, [](IFunctionBase & func, const IDataType & type)
{
if (!func.hasInformationAboutMonotonicity())
return false;
else
{
/// Range is irrelevant in this case.
auto monotonicity = func.getMonotonicityForRange(type, Field(), Field());
if (!monotonicity.is_always_monotonic)
return false;
}
return true;
});
}
/// Looking for possible transformation of `column = constant` into `partition_expr = function(constant)`
bool KeyCondition::canConstantBeWrappedByFunctions(
const ASTPtr & ast, size_t & out_key_column_num, DataTypePtr & out_key_column_type, Field & out_value, DataTypePtr & out_type)
@ -805,106 +854,14 @@ bool KeyCondition::canConstantBeWrappedByFunctions(
return false;
}
const auto & sample_block = key_expr->getSampleBlock();
if (out_value.isNull())
return false;
for (const auto & node : key_expr->getNodes())
{
auto it = key_columns.find(node.result_name);
if (it != key_columns.end())
return transformConstantWithValidFunctions(
expr_name, out_key_column_num, out_key_column_type, out_value, out_type, [](IFunctionBase & func, const IDataType &)
{
std::stack<const ActionsDAG::Node *> chain;
const auto * cur_node = &node;
bool is_valid_chain = true;
while (is_valid_chain)
{
if (cur_node->result_name == expr_name)
break;
chain.push(cur_node);
if (cur_node->type == ActionsDAG::ActionType::FUNCTION && cur_node->children.size() <= 2)
{
if (!cur_node->function_base->isDeterministic())
is_valid_chain = false;
const ActionsDAG::Node * next_node = nullptr;
for (const auto * arg : cur_node->children)
{
if (arg->column && isColumnConst(*arg->column))
continue;
if (next_node)
is_valid_chain = false;
next_node = arg;
}
if (!next_node)
is_valid_chain = false;
cur_node = next_node;
}
else if (cur_node->type == ActionsDAG::ActionType::ALIAS)
cur_node = cur_node->children.front();
else
is_valid_chain = false;
}
if (is_valid_chain)
{
/// This CAST is the same as in canConstantBeWrappedByMonotonicFunctions (see comment).
auto const_type = cur_node->result_type;
auto const_column = out_type->createColumnConst(1, out_value);
auto const_value = (*castColumn({const_column, out_type, ""}, const_type))[0];
while (!chain.empty())
{
const auto * func = chain.top();
chain.pop();
if (func->type != ActionsDAG::ActionType::FUNCTION)
continue;
if (func->children.size() == 1)
{
std::tie(const_value, const_type) = applyFunctionForFieldOfUnknownType(func->function_base, const_type, const_value);
}
else if (func->children.size() == 2)
{
const auto * left = func->children[0];
const auto * right = func->children[1];
if (left->column && isColumnConst(*left->column))
{
auto left_arg_type = left->result_type;
auto left_arg_value = (*left->column)[0];
std::tie(const_value, const_type) = applyBinaryFunctionForFieldOfUnknownType(
func->function_builder, left_arg_type, left_arg_value, const_type, const_value);
}
else
{
auto right_arg_type = right->result_type;
auto right_arg_value = (*right->column)[0];
std::tie(const_value, const_type) = applyBinaryFunctionForFieldOfUnknownType(
func->function_builder, const_type, const_value, right_arg_type, right_arg_value);
}
}
}
out_key_column_num = it->second;
out_key_column_type = sample_block.getByName(it->first).type;
out_value = const_value;
out_type = const_type;
return true;
}
}
}
return false;
return func.isDeterministic();
});
}
bool KeyCondition::tryPrepareSetIndex(

View File

@ -375,6 +375,14 @@ private:
DataTypePtr & out_key_column_type,
std::vector<const ASTFunction *> & out_functions_chain);
bool transformConstantWithValidFunctions(
const String & expr_name,
size_t & out_key_column_num,
DataTypePtr & out_key_column_type,
Field & out_value,
DataTypePtr & out_type,
std::function<bool(IFunctionBase &, const IDataType &)> always_monotonic) const;
bool canConstantBeWrappedByMonotonicFunctions(
const ASTPtr & node,
size_t & out_key_column_num,

View File

@ -1551,6 +1551,7 @@ void MergeTreeDataMergerMutator::splitMutationCommands(
for (const auto & command : commands)
{
if (command.type == MutationCommand::Type::MATERIALIZE_INDEX
|| command.type == MutationCommand::Type::MATERIALIZE_COLUMN
|| command.type == MutationCommand::Type::MATERIALIZE_PROJECTION
|| command.type == MutationCommand::Type::MATERIALIZE_TTL
|| command.type == MutationCommand::Type::DELETE
@ -1559,6 +1560,9 @@ void MergeTreeDataMergerMutator::splitMutationCommands(
for_interpreter.push_back(command);
for (const auto & [column_name, expr] : command.column_to_update_expression)
mutated_columns.emplace(column_name);
if (command.type == MutationCommand::Type::MATERIALIZE_COLUMN)
mutated_columns.emplace(command.column_name);
}
else if (command.type == MutationCommand::Type::DROP_INDEX || command.type == MutationCommand::Type::DROP_PROJECTION)
{
@ -1596,6 +1600,7 @@ void MergeTreeDataMergerMutator::splitMutationCommands(
for (const auto & command : commands)
{
if (command.type == MutationCommand::Type::MATERIALIZE_INDEX
|| command.type == MutationCommand::Type::MATERIALIZE_COLUMN
|| command.type == MutationCommand::Type::MATERIALIZE_PROJECTION
|| command.type == MutationCommand::Type::MATERIALIZE_TTL
|| command.type == MutationCommand::Type::DELETE

View File

@ -75,6 +75,15 @@ std::optional<MutationCommand> MutationCommand::parse(ASTAlterCommand * command,
res.projection_name = command->projection->as<ASTIdentifier &>().name();
return res;
}
else if (command->type == ASTAlterCommand::MATERIALIZE_COLUMN)
{
MutationCommand res;
res.ast = command->ptr();
res.type = MATERIALIZE_COLUMN;
res.partition = command->partition;
res.column_name = getIdentifierName(command->column);
return res;
}
else if (parse_alter_commands && command->type == ASTAlterCommand::MODIFY_COLUMN)
{
MutationCommand res;

View File

@ -35,6 +35,7 @@ struct MutationCommand
DROP_PROJECTION,
MATERIALIZE_TTL,
RENAME_COLUMN,
MATERIALIZE_COLUMN,
};
Type type = EMPTY;

View File

@ -20,6 +20,8 @@ namespace DB
M(UInt64, rabbitmq_num_queues, 1, "The number of queues per consumer.", 0) \
M(String, rabbitmq_queue_base, "", "Base for queue names to be able to reopen non-empty queues in case of failure.", 0) \
M(Bool, rabbitmq_persistent, false, "For insert query messages will be made 'persistent', durable.", 0) \
M(Bool, rabbitmq_secure, false, "Use SSL connection", 0) \
M(String, rabbitmq_address, "", "Address for connection", 0) \
M(UInt64, rabbitmq_skip_broken_messages, 0, "Skip at least this number of broken messages from RabbitMQ per block", 0) \
M(UInt64, rabbitmq_max_block_size, 0, "Number of row collected before flushing data from RabbitMQ.", 0) \
M(Milliseconds, rabbitmq_flush_interval_ms, 0, "Timeout for flushing data from RabbitMQ.", 0) \

View File

@ -97,12 +97,16 @@ StorageRabbitMQ::StorageRabbitMQ(
getContext()->getConfigRef().getString("rabbitmq.username"),
getContext()->getConfigRef().getString("rabbitmq.password")))
, vhost(getContext()->getConfigRef().getString("rabbitmq.vhost", rabbitmq_settings->rabbitmq_vhost.value))
, connection_string(rabbitmq_settings->rabbitmq_address)
, secure(rabbitmq_settings->rabbitmq_secure.value)
, semaphore(0, num_consumers)
, unique_strbase(getRandomName())
, queue_size(std::max(QUEUE_SIZE, static_cast<uint32_t>(getMaxBlockSize())))
, milliseconds_to_wait(RESCHEDULE_MS)
{
event_handler = std::make_shared<RabbitMQHandler>(loop.getLoop(), log);
if (secure)
SSL_library_init();
restoreConnection(false);
StorageInMemoryMetadata storage_metadata;
@ -528,10 +532,10 @@ bool StorageRabbitMQ::restoreConnection(bool reconnecting)
LOG_TRACE(log, "Trying to restore connection to " + address);
}
connection = std::make_unique<AMQP::TcpConnection>(event_handler.get(),
AMQP::Address(
parsed_address.first, parsed_address.second,
AMQP::Login(login_password.first, login_password.second), vhost));
auto amqp_address = connection_string.empty() ? AMQP::Address(parsed_address.first, parsed_address.second,
AMQP::Login(login_password.first, login_password.second), vhost, secure)
: AMQP::Address(connection_string);
connection = std::make_unique<AMQP::TcpConnection>(event_handler.get(), amqp_address);
cnt_retries = 0;
while (!connection->ready() && !stream_cancelled && cnt_retries++ != RETRIES_MAX)
@ -1053,50 +1057,20 @@ void registerStorageRabbitMQ(StorageFactory & factory)
{
auto creator_fn = [](const StorageFactory::Arguments & args)
{
ASTs & engine_args = args.engine_args;
size_t args_count = engine_args.size();
bool has_settings = args.storage_def->settings;
auto rabbitmq_settings = std::make_unique<RabbitMQSettings>();
if (has_settings)
rabbitmq_settings->loadFromQuery(*args.storage_def);
if (!args.storage_def->settings)
throw Exception(ErrorCodes::BAD_ARGUMENTS, "RabbitMQ engine must have settings");
// Check arguments and settings
#define CHECK_RABBITMQ_STORAGE_ARGUMENT(ARG_NUM, ARG_NAME) \
/* One of the three required arguments is not specified */ \
if (args_count < (ARG_NUM) && (ARG_NUM) <= 2 && !rabbitmq_settings->ARG_NAME.changed) \
{ \
throw Exception("Required parameter '" #ARG_NAME "' for storage RabbitMQ not specified", \
ErrorCodes::NUMBER_OF_ARGUMENTS_DOESNT_MATCH); \
} \
if (args_count >= (ARG_NUM)) \
{ \
if (rabbitmq_settings->ARG_NAME.changed) /* The same argument is given in two places */ \
{ \
throw Exception("The argument №" #ARG_NUM " of storage RabbitMQ " \
"and the parameter '" #ARG_NAME "' is duplicated", ErrorCodes::BAD_ARGUMENTS); \
} \
}
rabbitmq_settings->loadFromQuery(*args.storage_def);
CHECK_RABBITMQ_STORAGE_ARGUMENT(1, rabbitmq_host_port)
CHECK_RABBITMQ_STORAGE_ARGUMENT(2, rabbitmq_format)
CHECK_RABBITMQ_STORAGE_ARGUMENT(3, rabbitmq_exchange_name)
CHECK_RABBITMQ_STORAGE_ARGUMENT(4, rabbitmq_exchange_type)
CHECK_RABBITMQ_STORAGE_ARGUMENT(5, rabbitmq_routing_key_list)
CHECK_RABBITMQ_STORAGE_ARGUMENT(6, rabbitmq_row_delimiter)
CHECK_RABBITMQ_STORAGE_ARGUMENT(7, rabbitmq_schema)
CHECK_RABBITMQ_STORAGE_ARGUMENT(8, rabbitmq_num_consumers)
CHECK_RABBITMQ_STORAGE_ARGUMENT(9, rabbitmq_num_queues)
CHECK_RABBITMQ_STORAGE_ARGUMENT(10, rabbitmq_queue_base)
CHECK_RABBITMQ_STORAGE_ARGUMENT(11, rabbitmq_persistent)
CHECK_RABBITMQ_STORAGE_ARGUMENT(12, rabbitmq_skip_broken_messages)
CHECK_RABBITMQ_STORAGE_ARGUMENT(13, rabbitmq_max_block_size)
CHECK_RABBITMQ_STORAGE_ARGUMENT(14, rabbitmq_flush_interval_ms)
CHECK_RABBITMQ_STORAGE_ARGUMENT(15, rabbitmq_vhost)
CHECK_RABBITMQ_STORAGE_ARGUMENT(16, rabbitmq_queue_settings_list)
CHECK_RABBITMQ_STORAGE_ARGUMENT(17, rabbitmq_queue_consume)
if (!rabbitmq_settings->rabbitmq_host_port.changed
&& !rabbitmq_settings->rabbitmq_address.changed)
throw Exception("You must specify either `rabbitmq_host_port` or `rabbitmq_address` settings",
ErrorCodes::NUMBER_OF_ARGUMENTS_DOESNT_MATCH);
#undef CHECK_RABBITMQ_STORAGE_ARGUMENT
if (!rabbitmq_settings->rabbitmq_format.changed)
throw Exception("You must specify `rabbitmq_format` setting", ErrorCodes::NUMBER_OF_ARGUMENTS_DOESNT_MATCH);
return StorageRabbitMQ::create(args.table_id, args.getContext(), args.columns, std::move(rabbitmq_settings));
};

View File

@ -107,6 +107,8 @@ private:
std::pair<String, UInt16> parsed_address;
std::pair<String, String> login_password;
String vhost;
String connection_string;
bool secure;
UVLoop loop;
std::shared_ptr<RabbitMQHandler> event_handler;

View File

@ -101,7 +101,7 @@ public:
void flush() override;
void drop() override;
bool storesDataOnDisk() const override { return true; }
bool storesDataOnDisk() const override { return data_volume != nullptr; }
Strings getDataPaths() const override;
ActionLock getActionLock(StorageActionBlockType type) override;

View File

@ -32,7 +32,7 @@ public:
setInMemoryMetadata(cached_metadata);
}
StoragePtr getNested() const override
StoragePtr getNestedImpl() const
{
std::lock_guard lock{nested_mutex};
if (nested)
@ -46,6 +46,20 @@ public:
return nested;
}
StoragePtr getNested() const override
{
StoragePtr nested_storage = getNestedImpl();
assert(!nested_storage->getStoragePolicy());
assert(!nested_storage->storesDataOnDisk());
return nested_storage;
}
/// Table functions cannot have storage policy and cannot store data on disk.
/// We may check if table is readonly or stores data on disk on DROP TABLE.
/// Avoid loading nested table by returning nullptr/false for all table functions.
StoragePolicyPtr getStoragePolicy() const override { return nullptr; }
bool storesDataOnDisk() const override { return false; }
String getName() const override
{
std::lock_guard lock{nested_mutex};

View File

@ -7,6 +7,7 @@ import time
import logging
import io
import string
import ast
import avro.schema
import avro.io
@ -2792,7 +2793,7 @@ def test_kafka_formats_with_broken_message(kafka_cluster):
# broken message
"(0,'BAD','AM',0.5,1)",
],
'expected':r'''{"raw_message":"(0,'BAD','AM',0.5,1)","error":"Cannot parse string 'BAD' as UInt16: syntax error at begin of string. Note: there are toUInt16OrZero and toUInt16OrNull functions, which returns zero\/NULL instead of throwing exception.: while executing 'FUNCTION _CAST(assumeNotNull(_dummy_0) :: 2, 'UInt16' :: 1) -> _CAST(assumeNotNull(_dummy_0), 'UInt16') UInt16 : 4'"}''',
'expected':r'''{"raw_message":"(0,'BAD','AM',0.5,1)","error":"Cannot parse string 'BAD' as UInt16: syntax error at begin of string. Note: there are toUInt16OrZero and toUInt16OrNull functions, which returns zero\/NULL instead of throwing exception"}''',
'supports_empty_value': True,
'printable':True,
},
@ -2934,11 +2935,13 @@ def test_kafka_formats_with_broken_message(kafka_cluster):
'''.format(topic_name=topic_name, offset_0=offsets[0], offset_1=offsets[1], offset_2=offsets[2])
# print(('Checking result\n {result} \n expected \n {expected}\n'.format(result=str(result), expected=str(expected))))
assert TSV(result) == TSV(expected), 'Proper result for format: {}'.format(format_name)
errors_result = instance.query('SELECT raw_message, error FROM test.kafka_errors_{format_name}_mv format JSONEachRow'.format(format_name=format_name))
errors_expected = format_opts['expected']
errors_result = ast.literal_eval(instance.query('SELECT raw_message, error FROM test.kafka_errors_{format_name}_mv format JSONEachRow'.format(format_name=format_name)))
errors_expected = ast.literal_eval(format_opts['expected'])
# print(errors_result.strip())
# print(errors_expected.strip())
assert errors_result.strip() == errors_expected.strip(), 'Proper errors for format: {}'.format(format_name)
assert errors_result['raw_message'] == errors_expected['raw_message'], 'Proper raw_message for format: {}'.format(format_name)
# Errors text can change, just checking prefixes
assert errors_expected['error'] in errors_result['error'], 'Proper error for format: {}'.format(format_name)
kafka_delete_topic(admin_client, topic_name)
def wait_for_new_data(table_name, prev_count = 0, max_retries = 120):

View File

@ -13,6 +13,7 @@ ALTER DROP COLUMN ['DROP COLUMN'] COLUMN ALTER COLUMN
ALTER COMMENT COLUMN ['COMMENT COLUMN'] COLUMN ALTER COLUMN
ALTER CLEAR COLUMN ['CLEAR COLUMN'] COLUMN ALTER COLUMN
ALTER RENAME COLUMN ['RENAME COLUMN'] COLUMN ALTER COLUMN
ALTER MATERIALIZE COLUMN ['MATERIALIZE COLUMN'] COLUMN ALTER COLUMN
ALTER COLUMN [] \N ALTER TABLE
ALTER ORDER BY ['ALTER MODIFY ORDER BY','MODIFY ORDER BY'] TABLE ALTER INDEX
ALTER SAMPLE BY ['ALTER MODIFY SAMPLE BY','MODIFY SAMPLE BY'] TABLE ALTER INDEX

View File

@ -43,3 +43,13 @@ DROP TABLE IF EXISTS binary_op_mono5;
DROP TABLE IF EXISTS binary_op_mono6;
DROP TABLE IF EXISTS binary_op_mono7;
DROP TABLE IF EXISTS binary_op_mono8;
drop table if exists x;
create table x (i int, j int) engine MergeTree order by i / 10 settings index_granularity = 1;
insert into x values (10, 1), (20, 2), (30, 3), (40, 4);
set max_rows_to_read = 3;
select * from x where i > 30; -- converted to i / 10 >= 3, thus needs to read 3 granules.
drop table x;

View File

@ -1,6 +1,6 @@
AlterQuery t1 (children 1)
ExpressionList (children 1)
AlterCommand 31 (children 1)
AlterCommand 32 (children 1)
Function equals (children 1)
ExpressionList (children 2)
Identifier date

View File

@ -0,0 +1,8 @@
[0,1,2,3,4,5,6,7,8,9,10,11,12,13,14,15,16,17,18,19] ['0','1','2','3','4','5','6','7','8','9','10','11','12','13','14','15','16','17','18','19']
[0,1,2,3,4,5,6,7,8,9,10,11,12,13,14,15,16,17,18,19] ['1','2','3','4','5','6','7','8','9','10','11','12','13','14','15','16','17','18','19','20']
[0,1,2,3,4,5,6,7,8,9,10,11,12,13,14,15,16,17,18,19] ['1','2','3','4','5','6','7','8','9','10','11','12','13','14','15','16','17','18','19','20']
[0,1,2,3,4,5,6,7,8,9,10,11,12,13,14,15,16,17,18,19] ['2','3','4','5','6','7','8','9','10','11','12','13','14','15','16','17','18','19','20','21']
[0,1,2,3,4,5,6,7,8,9,10,11,12,13,14,15,16,17,18,19] ['0','1','2','3','4','5','6','7','8','9','10','11','12','13','14','15','16','17','18','19']
[0,1,2,3,4,5,6,7,8,9,10,11,12,13,14,15,16,17,18,19] ['1','2','3','4','5','6','7','8','9','10','11','12','13','14','15','16','17','18','19','20']
[0,1,2,3,4,5,6,7,8,9,10,11,12,13,14,15,16,17,18,19] ['1','2','3','4','5','6','7','8','9','10','11','12','13','14','15','16','17','18','19','20']
[0,1,2,3,4,5,6,7,8,9,10,11,12,13,14,15,16,17,18,19] ['2','3','4','5','6','7','8','9','10','11','12','13','14','15','16','17','18','19','20','21']

View File

@ -0,0 +1,39 @@
DROP TABLE IF EXISTS tmp;
SET mutations_sync = 2;
CREATE TABLE tmp (x Int64) ENGINE = MergeTree() ORDER BY tuple() PARTITION BY tuple();
INSERT INTO tmp SELECT * FROM system.numbers LIMIT 20;
ALTER TABLE tmp ADD COLUMN s String DEFAULT toString(x);
SELECT groupArray(x), groupArray(s) FROM tmp;
ALTER TABLE tmp MODIFY COLUMN s String DEFAULT toString(x+1);
SELECT groupArray(x), groupArray(s) FROM tmp;
ALTER TABLE tmp MATERIALIZE COLUMN s;
ALTER TABLE tmp MODIFY COLUMN s String DEFAULT toString(x+2);
SELECT groupArray(x), groupArray(s) FROM tmp;
ALTER TABLE tmp MATERIALIZE COLUMN s;
ALTER TABLE tmp MODIFY COLUMN s String DEFAULT toString(x+3);
SELECT groupArray(x), groupArray(s) FROM tmp;
ALTER TABLE tmp DROP COLUMN s;
ALTER TABLE tmp ADD COLUMN s String MATERIALIZED toString(x);
SELECT groupArray(x), groupArray(s) FROM tmp;
ALTER TABLE tmp MODIFY COLUMN s String MATERIALIZED toString(x+1);
SELECT groupArray(x), groupArray(s) FROM tmp;
ALTER TABLE tmp MATERIALIZE COLUMN s;
ALTER TABLE tmp MODIFY COLUMN s String MATERIALIZED toString(x+2);
SELECT groupArray(x), groupArray(s) FROM tmp;
ALTER TABLE tmp MATERIALIZE COLUMN s;
ALTER TABLE tmp MODIFY COLUMN s String MATERIALIZED toString(x+3);
SELECT groupArray(x), groupArray(s) FROM tmp;
ALTER TABLE tmp DROP COLUMN s;
DROP TABLE tmp;

View File

@ -0,0 +1,3 @@
1 [0,0] 2 [1,1,3]
1 [0,0] 2 [1,1,3]
1 [0,0] 2 [1,1,3]

View File

@ -0,0 +1,24 @@
DROP TABLE IF EXISTS t_with_dots;
CREATE TABLE t_with_dots (id UInt32, arr Array(UInt32), `b.id` UInt32, `b.arr` Array(UInt32)) ENGINE = Log;
INSERT INTO t_with_dots VALUES (1, [0, 0], 2, [1, 1, 3]);
SELECT * FROM t_with_dots;
DROP TABLE t_with_dots;
CREATE TABLE t_with_dots (id UInt32, arr Array(UInt32), `b.id` UInt32, `b.arr` Array(UInt32))
ENGINE = MergeTree ORDER BY id;
INSERT INTO t_with_dots VALUES (1, [0, 0], 2, [1, 1, 3]);
SELECT * FROM t_with_dots;
DROP TABLE t_with_dots;
CREATE TABLE t_with_dots (id UInt32, arr Array(UInt32), `b.id` UInt32, `b.arr` Array(UInt32))
ENGINE = MergeTree ORDER BY id
SETTINGS min_bytes_for_wide_part = 0;
INSERT INTO t_with_dots VALUES (1, [0, 0], 2, [1, 1, 3]);
SELECT * FROM t_with_dots;
DROP TABLE t_with_dots;

View File

@ -0,0 +1,13 @@
Non nullable value only null key
\N
Non nullable value nullable key
Test
\N
Nullable value only null key
\N
Nullable value nullable key
Test
\N
\N
\N

View File

@ -0,0 +1,29 @@
DROP TABLE IF EXISTS dictionary_non_nullable_source_table;
CREATE TABLE dictionary_non_nullable_source_table (id UInt64, value String) ENGINE=TinyLog;
INSERT INTO dictionary_non_nullable_source_table VALUES (0, 'Test');
DROP DICTIONARY IF EXISTS test_dictionary_non_nullable;
CREATE DICTIONARY test_dictionary_non_nullable (id UInt64, value String) PRIMARY KEY id LAYOUT(DIRECT()) SOURCE(CLICKHOUSE(TABLE 'dictionary_non_nullable_source_table'));
SELECT 'Non nullable value only null key ';
SELECT dictGet('test_dictionary_non_nullable', 'value', NULL);
SELECT 'Non nullable value nullable key';
SELECT dictGet('test_dictionary_non_nullable', 'value', arrayJoin([toUInt64(0), NULL, 1]));
DROP DICTIONARY test_dictionary_non_nullable;
DROP TABLE dictionary_non_nullable_source_table;
DROP TABLE IF EXISTS dictionary_nullable_source_table;
CREATE TABLE dictionary_nullable_source_table (id UInt64, value Nullable(String)) ENGINE=TinyLog;
INSERT INTO dictionary_nullable_source_table VALUES (0, 'Test'), (1, NULL);
DROP DICTIONARY IF EXISTS test_dictionary_nullable;
CREATE DICTIONARY test_dictionary_nullable (id UInt64, value Nullable(String)) PRIMARY KEY id LAYOUT(DIRECT()) SOURCE(CLICKHOUSE(TABLE 'dictionary_nullable_source_table'));
SELECT 'Nullable value only null key ';
SELECT dictGet('test_dictionary_nullable', 'value', NULL);
SELECT 'Nullable value nullable key';
SELECT dictGet('test_dictionary_nullable', 'value', arrayJoin([toUInt64(0), NULL, 1, 2]));
DROP DICTIONARY test_dictionary_nullable;
DROP TABLE dictionary_nullable_source_table;

View File

@ -9,6 +9,7 @@ v21.7.5.29-stable 2021-07-28
v21.7.4.18-stable 2021-07-17
v21.7.3.14-stable 2021-07-13
v21.7.2.7-stable 2021-07-09
v21.6.9.7-stable 2021-09-02
v21.6.8.62-stable 2021-07-13
v21.6.7.57-stable 2021-07-09
v21.6.6.51-stable 2021-07-02
@ -25,6 +26,7 @@ v21.4.6.55-stable 2021-04-30
v21.4.5.46-stable 2021-04-24
v21.4.4.30-stable 2021-04-16
v21.4.3.21-stable 2021-04-12
v21.3.16.5-lts 2021-09-03
v21.3.15.4-stable 2021-07-10
v21.3.14.1-lts 2021-07-01
v21.3.13.9-lts 2021-06-22

1 v21.8.5.7-lts 2021-09-02
9 v21.7.4.18-stable 2021-07-17
10 v21.7.3.14-stable 2021-07-13
11 v21.7.2.7-stable 2021-07-09
12 v21.6.9.7-stable 2021-09-02
13 v21.6.8.62-stable 2021-07-13
14 v21.6.7.57-stable 2021-07-09
15 v21.6.6.51-stable 2021-07-02
26 v21.4.5.46-stable 2021-04-24
27 v21.4.4.30-stable 2021-04-16
28 v21.4.3.21-stable 2021-04-12
29 v21.3.16.5-lts 2021-09-03
30 v21.3.15.4-stable 2021-07-10
31 v21.3.14.1-lts 2021-07-01
32 v21.3.13.9-lts 2021-06-22