Merge branch 'master' into jepsen_for_ci

This commit is contained in:
alesapin 2021-03-31 11:16:49 +03:00
commit 7c04b17ed0
98 changed files with 1479 additions and 504 deletions

View File

@ -248,19 +248,27 @@ if (ARCH_NATIVE)
set (COMPILER_FLAGS "${COMPILER_FLAGS} -march=native")
endif ()
if (COMPILER_GCC OR COMPILER_CLANG)
# to make numeric_limits<__int128> works with GCC
set (_CXX_STANDARD "gnu++2a")
else()
set (_CXX_STANDARD "c++2a")
endif()
if (${CMAKE_VERSION} VERSION_LESS "3.12.4")
# CMake < 3.12 doesn't support setting 20 as a C++ standard version.
# We will add C++ standard controlling flag in CMAKE_CXX_FLAGS manually for now.
# cmake < 3.12 doesn't support 20. We'll set CMAKE_CXX_FLAGS for now
# set (CMAKE_CXX_STANDARD 20)
set (CMAKE_CXX_FLAGS "${CMAKE_CXX_FLAGS} -std=${_CXX_STANDARD}")
if (COMPILER_GCC OR COMPILER_CLANG)
# to make numeric_limits<__int128> works with GCC
set (_CXX_STANDARD "gnu++2a")
else ()
set (_CXX_STANDARD "c++2a")
endif ()
set (CMAKE_CXX_EXTENSIONS 0) # https://cmake.org/cmake/help/latest/prop_tgt/CXX_EXTENSIONS.html#prop_tgt:CXX_EXTENSIONS
set (CMAKE_CXX_STANDARD_REQUIRED ON)
set (CMAKE_CXX_FLAGS "${CMAKE_CXX_FLAGS} -std=${_CXX_STANDARD}")
else ()
set (CMAKE_CXX_STANDARD 20)
set (CMAKE_CXX_EXTENSIONS ON) # Same as gnu++2a (ON) vs c++2a (OFF): https://cmake.org/cmake/help/latest/prop_tgt/CXX_EXTENSIONS.html
set (CMAKE_CXX_STANDARD_REQUIRED ON)
endif ()
set (CMAKE_C_STANDARD 11)
set (CMAKE_C_EXTENSIONS ON)
set (CMAKE_C_STANDARD_REQUIRED ON)
if (COMPILER_GCC OR COMPILER_CLANG)
# Enable C++14 sized global deallocation functions. It should be enabled by setting -std=c++14 but I'm not sure.

View File

@ -215,15 +215,17 @@ if (USE_EMBEDDED_COMPILER AND USE_INTERNAL_LLVM_LIBRARY)
set (LLVM_ENABLE_RTTI 1 CACHE INTERNAL "")
set (LLVM_ENABLE_PIC 0 CACHE INTERNAL "")
set (LLVM_TARGETS_TO_BUILD "X86;AArch64" CACHE STRING "")
# Yes it is set globally, but this is not enough, since llvm will add -std=c++11 after default
# And c++2a cannot be used, due to ambiguous operator !=
if (COMPILER_GCC OR COMPILER_CLANG)
set (_CXX_STANDARD "gnu++17")
else()
set (_CXX_STANDARD "c++17")
endif()
set (LLVM_CXX_STD ${_CXX_STANDARD} CACHE STRING "" FORCE)
# Need to use C++17 since the compilation is not possible with C++20 currently, due to ambiguous operator != etc.
# LLVM project will set its default value for the -std=... but our global setting from CMake will override it.
set (CMAKE_CXX_STANDARD_bak ${CMAKE_CXX_STANDARD})
set (CMAKE_CXX_STANDARD 17)
add_subdirectory (llvm/llvm)
set (CMAKE_CXX_STANDARD ${CMAKE_CXX_STANDARD_bak})
unset (CMAKE_CXX_STANDARD_bak)
target_include_directories(LLVMSupport SYSTEM BEFORE PRIVATE ${ZLIB_INCLUDE_DIR})
endif ()
@ -280,7 +282,14 @@ if (USE_AMQPCPP)
add_subdirectory (amqpcpp-cmake)
endif()
if (USE_CASSANDRA)
# Need to use C++17 since the compilation is not possible with C++20 currently.
set (CMAKE_CXX_STANDARD_bak ${CMAKE_CXX_STANDARD})
set (CMAKE_CXX_STANDARD 17)
add_subdirectory (cassandra)
set (CMAKE_CXX_STANDARD ${CMAKE_CXX_STANDARD_bak})
unset (CMAKE_CXX_STANDARD_bak)
endif()
# Should go before:

2
contrib/grpc vendored

@ -1 +1 @@
Subproject commit 7436366ceb341ba5c00ea29f1645e02a2b70bf93
Subproject commit 8d558f03fe370240081424fafa76cdc9301ea14b

View File

@ -14,12 +14,8 @@ RUN apt-get update \
lsb-release \
wget \
--yes --no-install-recommends --verbose-versions \
&& cat /etc/resolv.conf \
&& echo "nameserver 1.1.1.1" >> /etc/resolv.conf \
&& nslookup -debug apt.llvm.org \
&& ping -c1 apt.llvm.org \
&& wget -nv --retry-connrefused --tries=10 -O /tmp/llvm-snapshot.gpg.key https://apt.llvm.org/llvm-snapshot.gpg.key \
&& export LLVM_PUBKEY_HASH="bda960a8da687a275a2078d43c111d66b1c6a893a3275271beedf266c1ff4a0cdecb429c7a5cccf9f486ea7aa43fd27f" \
&& wget -nv -O /tmp/llvm-snapshot.gpg.key https://apt.llvm.org/llvm-snapshot.gpg.key \
&& echo "${LLVM_PUBKEY_HASH} /tmp/llvm-snapshot.gpg.key" | sha384sum -c \
&& apt-key add /tmp/llvm-snapshot.gpg.key \
&& export CODENAME="$(lsb_release --codename --short | tr 'A-Z' 'a-z')" \
@ -36,10 +32,7 @@ RUN apt-get update \
software-properties-common \
--yes --no-install-recommends
RUN cat /etc/resolv.conf \
&& echo "nameserver 1.1.1.1" >> /etc/resolv.conf \
&& nslookup -debug apt.llvm.org \
&& apt-get update \
RUN apt-get update \
&& apt-get install \
bash \
cmake \

View File

@ -4,9 +4,8 @@ FROM ubuntu:20.04
ENV DEBIAN_FRONTEND=noninteractive LLVM_VERSION=11
RUN apt-get update \
&& apt-get install apt-utils ca-certificates lsb-release wget gnupg apt-transport-https \
&& apt-get install ca-certificates lsb-release wget gnupg apt-transport-https \
--yes --no-install-recommends --verbose-versions \
&& echo "nameserver 1.1.1.1" >> /etc/resolv.conf \
&& export LLVM_PUBKEY_HASH="bda960a8da687a275a2078d43c111d66b1c6a893a3275271beedf266c1ff4a0cdecb429c7a5cccf9f486ea7aa43fd27f" \
&& wget -nv -O /tmp/llvm-snapshot.gpg.key https://apt.llvm.org/llvm-snapshot.gpg.key \
&& echo "${LLVM_PUBKEY_HASH} /tmp/llvm-snapshot.gpg.key" | sha384sum -c \
@ -32,8 +31,7 @@ RUN curl -O https://clickhouse-builds.s3.yandex.net/utils/1/dpkg-deb \
&& chmod +x dpkg-deb \
&& cp dpkg-deb /usr/bin
RUN echo "nameserver 1.1.1.1" >> /etc/resolv.conf \
&& apt-get update \
RUN apt-get update \
&& apt-get install \
clang-${LLVM_VERSION} \
debhelper \

View File

@ -51,6 +51,7 @@ function run_tests()
# Skip these tests, because they fail when we rerun them multiple times
if [ "$NUM_TRIES" -gt "1" ]; then
ADDITIONAL_OPTIONS+=('--order=random')
ADDITIONAL_OPTIONS+=('--skip')
ADDITIONAL_OPTIONS+=('00000_no_tests_to_skip')
ADDITIONAL_OPTIONS+=('--jobs')
@ -74,7 +75,13 @@ timeout "$MAX_RUN_TIME" bash -c run_tests ||:
./process_functional_tests_result.py || echo -e "failure\tCannot parse results" > /test_output/check_status.tsv
pigz < /var/log/clickhouse-server/clickhouse-server.log > /test_output/clickhouse-server.log.gz ||:
clickhouse-client -q "system flush logs" ||:
pigz < /var/log/clickhouse-server/clickhouse-server.log > /test_output/clickhouse-server.log.gz &
clickhouse-client -q "select * from system.query_log format TSVWithNamesAndTypes" | pigz > /test_output/query-log.tsv.gz &
clickhouse-client -q "select * from system.query_thread_log format TSVWithNamesAndTypes" | pigz > /test_output/query-thread-log.tsv.gz &
wait ||:
mv /var/log/clickhouse-server/stderr.log /test_output/ ||:
if [[ -n "$WITH_COVERAGE" ]] && [[ "$WITH_COVERAGE" -eq 1 ]]; then
tar -chf /test_output/clickhouse_coverage.tar.gz /profraw ||:

View File

@ -769,6 +769,38 @@ Example:
log_query_threads=1
```
## log_comment {#settings-log-comment}
Specifies the value for the `log_comment` field of the [system.query_log](../system-tables/query_log.md) table and comment text for the server log.
It can be used to improve the readability of server logs. Additionally, it helps to select queries related to the test from the `system.query_log` after running [clickhouse-test](../../development/tests.md).
Possible values:
- Any string no longer than [max_query_size](#settings-max_query_size). If length is exceeded, the server throws an exception.
Default value: empty string.
**Example**
Query:
``` sql
SET log_comment = 'log_comment test', log_queries = 1;
SELECT 1;
SYSTEM FLUSH LOGS;
SELECT type, query FROM system.query_log WHERE log_comment = 'log_comment test' AND event_date >= yesterday() ORDER BY event_time DESC LIMIT 2;
```
Result:
``` text
┌─type────────┬─query─────┐
│ QueryStart │ SELECT 1; │
│ QueryFinish │ SELECT 1; │
└─────────────┴───────────┘
```
## max_insert_block_size {#settings-max_insert_block_size}
The size of blocks (in a count of rows) to form for insertion into a table.

View File

@ -253,7 +253,7 @@ windowFunnel(window, [mode, [mode, ... ]])(timestamp, cond1, cond2, ..., condN)
**Parameters**
- `window` — Length of the sliding window. The unit of `window` depends on the `timestamp` itself and varies. Determined using the expression `timestamp of cond2 <= timestamp of cond1 + window`.
- `window` — Length of the sliding window, it is the time interval between first condition and last condition. The unit of `window` depends on the `timestamp` itself and varies. Determined using the expression `timestamp of cond1 <= timestamp of cond2 <= ... <= timestamp of condN <= timestamp of cond1 + window`.
- `mode` — It is an optional argument. One or more modes can be set.
- `'strict'` — If same condition holds for sequence of events then such non-unique events would be skipped.
- `'strict_order'` — Don't allow interventions of other events. E.g. in the case of `A->B->D->C`, it stops finding `A->B->C` at the `D` and the max event level is 2.

View File

@ -394,3 +394,55 @@ Result:
└──────────────────┴────────────────────┘
```
## isIPAddressInRange {#isipaddressinrange}
Determines if an IP address is contained in a network represented in the [CIDR](https://en.wikipedia.org/wiki/Classless_Inter-Domain_Routing) notation. Returns `1` if true, or `0` otherwise.
**Syntax**
``` sql
isIPAddressInRange(address, prefix)
```
This function accepts both IPv4 and IPv6 addresses (and networks) represented as strings. It returns `0` if the IP version of the address and the CIDR don't match.
**Arguments**
- `address` — An IPv4 or IPv6 address. [String](../../sql-reference/data-types/string.md).
- `prefix` — An IPv4 or IPv6 network prefix in CIDR. [String](../../sql-reference/data-types/string.md).
**Returned value**
- `1` or `0`.
Type: [UInt8](../../sql-reference/data-types/int-uint.md).
**Example**
Query:
``` sql
SELECT isIPAddressInRange('127.0.0.1', '127.0.0.0/8')
```
Result:
``` text
┌─isIPAddressInRange('127.0.0.1', '127.0.0.0/8')─┐
│ 1 │
└────────────────────────────────────────────────┘
```
Query:
``` sql
SELECT isIPAddressInRange('127.0.0.1', 'ffff::/16')
```
Result:
``` text
┌─isIPAddressInRange('127.0.0.1', 'ffff::/16')─┐
│ 0 │
└──────────────────────────────────────────────┘
```

View File

@ -759,6 +759,38 @@ log_queries_min_type='EXCEPTION_WHILE_PROCESSING'
log_query_threads=1
```
## log_comment {#settings-log-comment}
Задаёт значение поля `log_comment` таблицы [system.query_log](../system-tables/query_log.md) и текст комментария в логе сервера.
Может быть использована для улучшения читабельности логов сервера. Кроме того, помогает быстро выделить связанные с тестом запросы из `system.query_log` после запуска [clickhouse-test](../../development/tests.md).
Возможные значения:
- Любая строка не длиннее [max_query_size](#settings-max_query_size). При превышении длины сервер сгенерирует исключение.
Значение по умолчанию: пустая строка.
**Пример**
Запрос:
``` sql
SET log_comment = 'log_comment test', log_queries = 1;
SELECT 1;
SYSTEM FLUSH LOGS;
SELECT type, query FROM system.query_log WHERE log_comment = 'log_comment test' AND event_date >= yesterday() ORDER BY event_time DESC LIMIT 2;
```
Результат:
``` text
┌─type────────┬─query─────┐
│ QueryStart │ SELECT 1; │
│ QueryFinish │ SELECT 1; │
└─────────────┴───────────┘
```
## max_insert_block_size {#settings-max_insert_block_size}
Формировать блоки указанного размера, при вставке в таблицу.
@ -2655,7 +2687,6 @@ SELECT * FROM test2;
Значение по умолчанию: `0`.
## live_view_heartbeat_interval {#live-view-heartbeat-interval}
Задает интервал в секундах для периодической проверки существования [LIVE VIEW](../../sql-reference/statements/create/view.md#live-view).

View File

@ -395,3 +395,54 @@ SELECT addr, isIPv6String(addr) FROM ( SELECT ['::', '1111::ffff', '::ffff:127.0
└──────────────────┴────────────────────┘
```
## isIPAddressInRange {#isipaddressinrange}
Проверяет попадает ли IP адрес в интервал, заданный в [CIDR](https://en.wikipedia.org/wiki/Classless_Inter-Domain_Routing) нотации.
**Syntax**
``` sql
isIPAddressInRange(address, prefix)
```
Функция принимает IPv4 или IPv6 адрес виде строки. Возвращает `0`, если версия адреса и интервала не совпадают.
**Аргументы**
- `address` — IPv4 или IPv6 адрес. [String](../../sql-reference/data-types/string.md).
- `prefix` — IPv4 или IPv6 подсеть, заданная в CIDR нотации. [String](../../sql-reference/data-types/string.md).
**Возвращаемое значение**
- `1` или `0`.
Тип: [UInt8](../../sql-reference/data-types/int-uint.md).
**Примеры**
Запрос:
``` sql
SELECT isIPAddressInRange('127.0.0.1', '127.0.0.0/8')
```
Результат:
``` text
┌─isIPAddressInRange('127.0.0.1', '127.0.0.0/8')─┐
│ 1 │
└────────────────────────────────────────────────┘
```
Запрос:
``` sql
SELECT isIPAddressInRange('127.0.0.1', 'ffff::/16')
```
Результат:
``` text
┌─isIPAddressInRange('127.0.0.1', 'ffff::/16')─┐
│ 0 │
└──────────────────────────────────────────────┘
```

View File

@ -599,11 +599,13 @@ TaskStatus ClusterCopier::tryMoveAllPiecesToDestinationTable(const TaskTable & t
toString(current_piece_number));
Settings settings_push = task_cluster->settings_push;
/// It is important, ALTER ATTACH PARTITION must be done synchronously
/// And we will execute this ALTER query on each replica of a shard.
/// It is correct, because this query is idempotent.
settings_push.replication_alter_partitions_sync = 2;
ClusterExecutionMode execution_mode = ClusterExecutionMode::ON_EACH_NODE;
UInt64 max_successful_executions_per_shard = 0;
if (settings_push.replication_alter_partitions_sync == 1)
{
execution_mode = ClusterExecutionMode::ON_EACH_SHARD;
max_successful_executions_per_shard = 1;
}
query_alter_ast_string += " ALTER TABLE " + getQuotedTable(original_table) +
((partition_name == "'all'") ? " ATTACH PARTITION ID " : " ATTACH PARTITION ") + partition_name +
@ -613,14 +615,33 @@ TaskStatus ClusterCopier::tryMoveAllPiecesToDestinationTable(const TaskTable & t
try
{
size_t num_nodes = executeQueryOnCluster(
task_table.cluster_push,
query_alter_ast_string,
settings_push,
PoolMode::GET_MANY,
ClusterExecutionMode::ON_EACH_NODE);
/// Try attach partition on each shard
UInt64 num_nodes = executeQueryOnCluster(
task_table.cluster_push,
query_alter_ast_string,
task_cluster->settings_push,
PoolMode::GET_MANY,
execution_mode,
max_successful_executions_per_shard);
LOG_INFO(log, "Number of nodes that executed ALTER query successfully : {}", toString(num_nodes));
if (settings_push.replication_alter_partitions_sync == 1)
{
LOG_INFO(
log,
"Destination tables {} have been executed alter query successfully on {} shards of {}",
getQuotedTable(task_table.table_push),
num_nodes,
task_table.cluster_push->getShardCount());
if (num_nodes != task_table.cluster_push->getShardCount())
{
return TaskStatus::Error;
}
}
else
{
LOG_INFO(log, "Number of nodes that executed ALTER query successfully : {}", toString(num_nodes));
}
}
catch (...)
{
@ -856,6 +877,16 @@ bool ClusterCopier::tryDropPartitionPiece(
bool ClusterCopier::tryProcessTable(const ConnectionTimeouts & timeouts, TaskTable & task_table)
{
/// Create destination table
TaskStatus task_status = TaskStatus::Error;
task_status = tryCreateDestinationTable(timeouts, task_table);
/// Exit if success
if (task_status != TaskStatus::Finished)
{
LOG_WARNING(log, "Create destination Tale Failed ");
return false;
}
/// An heuristic: if previous shard is already done, then check next one without sleeps due to max_workers constraint
bool previous_shard_is_instantly_finished = false;
@ -932,7 +963,7 @@ bool ClusterCopier::tryProcessTable(const ConnectionTimeouts & timeouts, TaskTab
/// Do not sleep if there is a sequence of already processed shards to increase startup
bool is_unprioritized_task = !previous_shard_is_instantly_finished && shard->priority.is_remote;
TaskStatus task_status = TaskStatus::Error;
task_status = TaskStatus::Error;
bool was_error = false;
has_shard_to_process = true;
for (UInt64 try_num = 0; try_num < max_shard_partition_tries; ++try_num)
@ -1050,6 +1081,44 @@ bool ClusterCopier::tryProcessTable(const ConnectionTimeouts & timeouts, TaskTab
return table_is_done;
}
TaskStatus ClusterCopier::tryCreateDestinationTable(const ConnectionTimeouts & timeouts, TaskTable & task_table)
{
/// Try create original table (if not exists) on each shard
//TaskTable & task_table = task_shard.task_table;
const TaskShardPtr task_shard = task_table.all_shards.at(0);
/// We need to update table definitions for each part, it could be changed after ALTER
task_shard->current_pull_table_create_query = getCreateTableForPullShard(timeouts, *task_shard);
try
{
auto create_query_push_ast
= rewriteCreateQueryStorage(task_shard->current_pull_table_create_query, task_table.table_push, task_table.engine_push_ast);
auto & create = create_query_push_ast->as<ASTCreateQuery &>();
create.if_not_exists = true;
InterpreterCreateQuery::prepareOnClusterQuery(create, context, task_table.cluster_push_name);
String query = queryToString(create_query_push_ast);
LOG_DEBUG(log, "Create destination tables. Query: {}", query);
UInt64 shards = executeQueryOnCluster(task_table.cluster_push, query, task_cluster->settings_push, PoolMode::GET_MANY);
LOG_INFO(
log,
"Destination tables {} have been created on {} shards of {}",
getQuotedTable(task_table.table_push),
shards,
task_table.cluster_push->getShardCount());
if (shards != task_table.cluster_push->getShardCount())
{
return TaskStatus::Error;
}
}
catch (...)
{
tryLogCurrentException(log, "Error while creating original table. Maybe we are not first.");
}
return TaskStatus::Finished;
}
/// Job for copying partition from particular shard.
TaskStatus ClusterCopier::tryProcessPartitionTask(const ConnectionTimeouts & timeouts, ShardPartition & task_partition, bool is_unprioritized_task)
{
@ -1366,8 +1435,17 @@ TaskStatus ClusterCopier::processPartitionPieceTaskImpl(
LOG_DEBUG(log, "Create destination tables. Query: {}", query);
UInt64 shards = executeQueryOnCluster(task_table.cluster_push, query, task_cluster->settings_push, PoolMode::GET_MANY);
LOG_DEBUG(log, "Destination tables {} have been created on {} shards of {}",
getQuotedTable(task_table.table_push), shards, task_table.cluster_push->getShardCount());
LOG_INFO(
log,
"Destination tables {} have been created on {} shards of {}",
getQuotedTable(task_table.table_push),
shards,
task_table.cluster_push->getShardCount());
if (shards != task_table.cluster_push->getShardCount())
{
return TaskStatus::Error;
}
}
/// Do the copying
@ -1477,26 +1555,6 @@ TaskStatus ClusterCopier::processPartitionPieceTaskImpl(
LOG_INFO(log, "Partition {} piece {} copied. But not moved to original destination table.", task_partition.name, toString(current_piece_number));
/// Try create original table (if not exists) on each shard
try
{
auto create_query_push_ast = rewriteCreateQueryStorage(task_shard.current_pull_table_create_query,
task_table.table_push, task_table.engine_push_ast);
auto & create = create_query_push_ast->as<ASTCreateQuery &>();
create.if_not_exists = true;
InterpreterCreateQuery::prepareOnClusterQuery(create, context, task_table.cluster_push_name);
String query = queryToString(create_query_push_ast);
LOG_DEBUG(log, "Create destination tables. Query: {}", query);
UInt64 shards = executeQueryOnCluster(task_table.cluster_push, query, task_cluster->settings_push, PoolMode::GET_MANY);
LOG_DEBUG(log, "Destination tables {} have been created on {} shards of {}", getQuotedTable(task_table.table_push), shards, task_table.cluster_push->getShardCount());
}
catch (...)
{
tryLogCurrentException(log, "Error while creating original table. Maybe we are not first.");
}
/// Finalize the processing, change state of current partition task (and also check is_dirty flag)
{
String state_finished = TaskStateWithOwner::getData(TaskState::Finished, host_id);
@ -1538,33 +1596,36 @@ void ClusterCopier::dropLocalTableIfExists(const DatabaseAndTableName & table_na
interpreter.execute();
}
void ClusterCopier::dropHelpingTablesByPieceNumber(const TaskTable & task_table, size_t current_piece_number)
{
LOG_DEBUG(log, "Removing helping tables piece {}", current_piece_number);
DatabaseAndTableName original_table = task_table.table_push;
DatabaseAndTableName helping_table
= DatabaseAndTableName(original_table.first, original_table.second + "_piece_" + toString(current_piece_number));
String query = "DROP TABLE IF EXISTS " + getQuotedTable(helping_table);
const ClusterPtr & cluster_push = task_table.cluster_push;
Settings settings_push = task_cluster->settings_push;
LOG_DEBUG(log, "Execute distributed DROP TABLE: {}", query);
/// We have to drop partition_piece on each replica
UInt64 num_nodes = executeQueryOnCluster(cluster_push, query, settings_push, PoolMode::GET_MANY, ClusterExecutionMode::ON_EACH_NODE);
LOG_INFO(log, "DROP TABLE query was successfully executed on {} nodes.", toString(num_nodes));
}
void ClusterCopier::dropHelpingTables(const TaskTable & task_table)
{
LOG_DEBUG(log, "Removing helping tables");
for (size_t current_piece_number = 0; current_piece_number < task_table.number_of_splits; ++current_piece_number)
{
DatabaseAndTableName original_table = task_table.table_push;
DatabaseAndTableName helping_table = DatabaseAndTableName(original_table.first, original_table.second + "_piece_" + toString(current_piece_number));
String query = "DROP TABLE IF EXISTS " + getQuotedTable(helping_table);
const ClusterPtr & cluster_push = task_table.cluster_push;
Settings settings_push = task_cluster->settings_push;
LOG_DEBUG(log, "Execute distributed DROP TABLE: {}", query);
/// We have to drop partition_piece on each replica
UInt64 num_nodes = executeQueryOnCluster(
cluster_push, query,
settings_push,
PoolMode::GET_MANY,
ClusterExecutionMode::ON_EACH_NODE);
LOG_DEBUG(log, "DROP TABLE query was successfully executed on {} nodes.", toString(num_nodes));
dropHelpingTablesByPieceNumber(task_table, current_piece_number);
}
}
void ClusterCopier::dropParticularPartitionPieceFromAllHelpingTables(const TaskTable & task_table, const String & partition_name)
{
LOG_DEBUG(log, "Try drop partition partition from all helping tables.");
@ -1586,7 +1647,7 @@ void ClusterCopier::dropParticularPartitionPieceFromAllHelpingTables(const TaskT
PoolMode::GET_MANY,
ClusterExecutionMode::ON_EACH_NODE);
LOG_DEBUG(log, "DROP PARTITION query was successfully executed on {} nodes.", toString(num_nodes));
LOG_INFO(log, "DROP PARTITION query was successfully executed on {} nodes.", toString(num_nodes));
}
LOG_DEBUG(log, "All helping tables dropped partition {}", partition_name);
}

View File

@ -123,12 +123,13 @@ protected:
bool tryDropPartitionPiece(ShardPartition & task_partition, const size_t current_piece_number,
const zkutil::ZooKeeperPtr & zookeeper, const CleanStateClock & clean_state_clock);
static constexpr UInt64 max_table_tries = 1000;
static constexpr UInt64 max_shard_partition_tries = 600;
static constexpr UInt64 max_shard_partition_piece_tries_for_alter = 100;
static constexpr UInt64 max_table_tries = 3;
static constexpr UInt64 max_shard_partition_tries = 3;
static constexpr UInt64 max_shard_partition_piece_tries_for_alter = 3;
bool tryProcessTable(const ConnectionTimeouts & timeouts, TaskTable & task_table);
TaskStatus tryCreateDestinationTable(const ConnectionTimeouts & timeouts, TaskTable & task_table);
/// Job for copying partition from particular shard.
TaskStatus tryProcessPartitionTask(const ConnectionTimeouts & timeouts,
ShardPartition & task_partition,
@ -149,6 +150,8 @@ protected:
void dropHelpingTables(const TaskTable & task_table);
void dropHelpingTablesByPieceNumber(const TaskTable & task_table, size_t current_piece_number);
/// Is used for usage less disk space.
/// After all pieces were successfully moved to original destination
/// table we can get rid of partition pieces (partitions in helping tables).

View File

@ -98,6 +98,7 @@ inline void DB::TaskCluster::reloadSettings(const Poco::Util::AbstractConfigurat
set_default_value(settings_pull.max_block_size, 8192UL);
set_default_value(settings_pull.preferred_block_size_bytes, 0);
set_default_value(settings_push.insert_distributed_timeout, 0);
set_default_value(settings_push.replication_alter_partitions_sync, 2);
}
}

View File

@ -19,15 +19,6 @@ namespace ErrorCodes
extern const int BAD_ARGUMENTS;
}
struct ComparePair final
{
template <typename T1, typename T2>
bool operator()(const std::pair<T1, T2> & lhs, const std::pair<T1, T2> & rhs) const
{
return lhs.first == rhs.first ? lhs.second < rhs.second : lhs.first < rhs.first;
}
};
static constexpr auto max_events = 32;
template <typename T>
@ -35,7 +26,6 @@ struct AggregateFunctionWindowFunnelData
{
using TimestampEvent = std::pair<T, UInt8>;
using TimestampEvents = PODArrayWithStackMemory<TimestampEvent, 64>;
using Comparator = ComparePair;
bool sorted = true;
TimestampEvents events_list;
@ -69,7 +59,7 @@ struct AggregateFunctionWindowFunnelData
/// either sort whole container or do so partially merging ranges afterwards
if (!sorted && !other.sorted)
std::stable_sort(std::begin(events_list), std::end(events_list), Comparator{});
std::stable_sort(std::begin(events_list), std::end(events_list));
else
{
const auto begin = std::begin(events_list);
@ -77,12 +67,12 @@ struct AggregateFunctionWindowFunnelData
const auto end = std::end(events_list);
if (!sorted)
std::stable_sort(begin, middle, Comparator{});
std::stable_sort(begin, middle);
if (!other.sorted)
std::stable_sort(middle, end, Comparator{});
std::stable_sort(middle, end);
std::inplace_merge(begin, middle, end, Comparator{});
std::inplace_merge(begin, middle, end);
}
sorted = true;
@ -92,7 +82,7 @@ struct AggregateFunctionWindowFunnelData
{
if (!sorted)
{
std::stable_sort(std::begin(events_list), std::end(events_list), Comparator{});
std::stable_sort(std::begin(events_list), std::end(events_list));
sorted = true;
}
}

View File

@ -13,8 +13,7 @@ namespace DB
/// Result array could be indexed with all possible uint8 values without extra check.
/// For values greater than 128 we will store same value as for 128 (all bits set).
constexpr size_t IPV6_MASKS_COUNT = 256;
using RawMaskArray = std::array<uint8_t, IPV6_BINARY_LENGTH>;
using RawMaskArrayV6 = std::array<uint8_t, IPV6_BINARY_LENGTH>;
void IPv6ToRawBinary(const Poco::Net::IPAddress & address, char * res)
{
@ -41,33 +40,86 @@ std::array<char, 16> IPv6ToBinary(const Poco::Net::IPAddress & address)
return res;
}
static constexpr RawMaskArray generateBitMask(size_t prefix)
template <typename RawMaskArrayT>
static constexpr RawMaskArrayT generateBitMask(size_t prefix)
{
if (prefix >= 128)
prefix = 128;
RawMaskArray arr{0};
RawMaskArrayT arr{0};
if (prefix >= arr.size() * 8)
prefix = arr.size() * 8;
size_t i = 0;
for (; prefix >= 8; ++i, prefix -= 8)
arr[i] = 0xff;
if (prefix > 0)
arr[i++] = ~(0xff >> prefix);
while (i < 16)
while (i < arr.size())
arr[i++] = 0x00;
return arr;
}
static constexpr std::array<RawMaskArray, IPV6_MASKS_COUNT> generateBitMasks()
template <typename RawMaskArrayT, size_t masksCount>
static constexpr std::array<RawMaskArrayT, masksCount> generateBitMasks()
{
std::array<RawMaskArray, IPV6_MASKS_COUNT> arr{};
for (size_t i = 0; i < IPV6_MASKS_COUNT; ++i)
arr[i] = generateBitMask(i);
std::array<RawMaskArrayT, masksCount> arr{};
for (size_t i = 0; i < masksCount; ++i)
arr[i] = generateBitMask<RawMaskArrayT>(i);
return arr;
}
const uint8_t * getCIDRMaskIPv6(UInt8 prefix_len)
const std::array<uint8_t, 16> & getCIDRMaskIPv6(UInt8 prefix_len)
{
static constexpr std::array<RawMaskArray, IPV6_MASKS_COUNT> IPV6_RAW_MASK_ARRAY = generateBitMasks();
return IPV6_RAW_MASK_ARRAY[prefix_len].data();
static constexpr auto IPV6_RAW_MASK_ARRAY = generateBitMasks<RawMaskArrayV6, IPV6_MASKS_COUNT>();
return IPV6_RAW_MASK_ARRAY[prefix_len];
}
bool matchIPv4Subnet(UInt32 addr, UInt32 cidr_addr, UInt8 prefix)
{
UInt32 mask = (prefix >= 32) ? 0xffffffffu : ~(0xffffffffu >> prefix);
return (addr & mask) == (cidr_addr & mask);
}
#if defined(__SSE2__)
#include <emmintrin.h>
bool matchIPv6Subnet(const uint8_t * addr, const uint8_t * cidr_addr, UInt8 prefix)
{
uint16_t mask = _mm_movemask_epi8(_mm_cmpeq_epi8(
_mm_loadu_si128(reinterpret_cast<const __m128i *>(addr)),
_mm_loadu_si128(reinterpret_cast<const __m128i *>(cidr_addr))));
mask = ~mask;
if (mask)
{
auto offset = __builtin_ctz(mask);
if (prefix / 8 != offset)
return prefix / 8 < offset;
auto cmpmask = ~(0xff >> (prefix % 8));
return (addr[offset] & cmpmask) == (cidr_addr[offset] & cmpmask);
}
return true;
}
# else
bool matchIPv6Subnet(const uint8_t * addr, const uint8_t * cidr_addr, UInt8 prefix)
{
if (prefix > IPV6_BINARY_LENGTH * 8U)
prefix = IPV6_BINARY_LENGTH * 8U;
size_t i = 0;
for (; prefix >= 8; ++i, prefix -= 8)
{
if (addr[i] != cidr_addr[i])
return false;
}
if (prefix == 0)
return true;
auto mask = ~(0xff >> prefix);
return (addr[i] & mask) == (cidr_addr[i] & mask);
}
#endif // __SSE2__
}

View File

@ -14,9 +14,13 @@ void IPv6ToRawBinary(const Poco::Net::IPAddress & address, char * res);
/// Convert IP address to 16-byte array with IPv6 data (big endian). If it's an IPv4, map it to IPv6.
std::array<char, 16> IPv6ToBinary(const Poco::Net::IPAddress & address);
/// Returns pointer to 16-byte array containing mask with first `prefix_len` bits set to `1` and `128 - prefix_len` to `0`.
/// Pointer is valid during all program execution time and doesn't require freeing.
/// Returns a reference to 16-byte array containing mask with first `prefix_len` bits set to `1` and `128 - prefix_len` to `0`.
/// The reference is valid during all program execution time.
/// Values of prefix_len greater than 128 interpreted as 128 exactly.
const uint8_t * getCIDRMaskIPv6(UInt8 prefix_len);
const std::array<uint8_t, 16> & getCIDRMaskIPv6(UInt8 prefix_len);
/// Check that address contained in CIDR range
bool matchIPv4Subnet(UInt32 addr, UInt32 cidr_addr, UInt8 prefix);
bool matchIPv6Subnet(const uint8_t * addr, const uint8_t * cidr_addr, UInt8 prefix);
}

View File

@ -25,7 +25,7 @@ void formatIPv6(const unsigned char * src, char *& dst, uint8_t zeroed_tail_byte
/** Unsafe (no bounds-checking for src nor dst), optimized version of parsing IPv4 string.
*
* Parses the input string `src` and stores binary BE value into buffer pointed by `dst`,
* Parses the input string `src` and stores binary host-endian value into buffer pointed by `dst`,
* which should be long enough.
* That is "127.0.0.1" becomes 0x7f000001.
*
@ -63,7 +63,7 @@ inline bool parseIPv4(const char * src, unsigned char * dst)
/** Unsafe (no bounds-checking for src nor dst), optimized version of parsing IPv6 string.
*
* Slightly altered implementation from http://svn.apache.org/repos/asf/apr/apr/trunk/network_io/unix/inet_pton.c
* Parses the input string `src` and stores binary LE value into buffer pointed by `dst`,
* Parses the input string `src` and stores binary big-endian value into buffer pointed by `dst`,
* which should be long enough. In case of failure zeroes
* IPV6_BINARY_LENGTH bytes of buffer pointed by `dst`.
*

View File

@ -470,8 +470,8 @@ INSTANTIATE_TEST_SUITE_P(AllTimezones_Year2010,
::testing::ValuesIn(allTimezones()),
::testing::ValuesIn(std::initializer_list<TimeRangeParam>{
// Values from tests/date_lut3.cpp
{YYYYMMDDToDay(20101031), YYYYMMDDToDay(20101101), 15 * 60},
{YYYYMMDDToDay(20100328), YYYYMMDDToDay(20100330), 15 * 60}
{YYYYMMDDToDay(20101031), YYYYMMDDToDay(20101101), 10 * 15 * 60},
{YYYYMMDDToDay(20100328), YYYYMMDDToDay(20100330), 10 * 15 * 60}
}))
);
@ -481,7 +481,7 @@ INSTANTIATE_TEST_SUITE_P(AllTimezones_Year1970_WHOLE,
::testing::ValuesIn(allTimezones(false)),
::testing::ValuesIn(std::initializer_list<TimeRangeParam>{
// Values from tests/date_lut3.cpp
{YYYYMMDDToDay(19700101), YYYYMMDDToDay(19701231), 3191 /*53m 11s*/},
{YYYYMMDDToDay(19700101), YYYYMMDDToDay(19701231), 10 * 3191 /*53m 11s*/},
}))
);
@ -491,7 +491,7 @@ INSTANTIATE_TEST_SUITE_P(AllTimezones_Year2010_WHOLE,
::testing::ValuesIn(allTimezones(false)),
::testing::ValuesIn(std::initializer_list<TimeRangeParam>{
// Values from tests/date_lut3.cpp
{YYYYMMDDToDay(20100101), YYYYMMDDToDay(20101231), 3191 /*53m 11s*/},
{YYYYMMDDToDay(20100101), YYYYMMDDToDay(20101231), 10 * 3191 /*53m 11s*/},
}))
);
@ -501,7 +501,7 @@ INSTANTIATE_TEST_SUITE_P(AllTimezones_Year2020_WHOLE,
::testing::ValuesIn(allTimezones()),
::testing::ValuesIn(std::initializer_list<TimeRangeParam>{
// Values from tests/date_lut3.cpp
{YYYYMMDDToDay(20200101), YYYYMMDDToDay(20201231), 3191 /*53m 11s*/},
{YYYYMMDDToDay(20200101), YYYYMMDDToDay(20201231), 10 * 3191 /*53m 11s*/},
}))
);
@ -510,8 +510,8 @@ INSTANTIATE_TEST_SUITE_P(AllTimezones_PreEpoch,
::testing::Combine(
::testing::ValuesIn(allTimezones(false)),
::testing::ValuesIn(std::initializer_list<TimeRangeParam>{
{YYYYMMDDToDay(19500101), YYYYMMDDToDay(19600101), 15 * 60},
{YYYYMMDDToDay(19300101), YYYYMMDDToDay(19350101), 11 * 15 * 60}
{YYYYMMDDToDay(19500101), YYYYMMDDToDay(19600101), 10 * 15 * 60},
{YYYYMMDDToDay(19300101), YYYYMMDDToDay(19350101), 10 * 11 * 15 * 60}
}))
);
@ -520,8 +520,8 @@ INSTANTIATE_TEST_SUITE_P(AllTimezones_Year1970,
::testing::Combine(
::testing::ValuesIn(allTimezones(false)),
::testing::ValuesIn(std::initializer_list<TimeRangeParam>{
{YYYYMMDDToDay(19700101), YYYYMMDDToDay(19700201), 15 * 60},
{YYYYMMDDToDay(19700101), YYYYMMDDToDay(19701231), 11 * 13 * 17}
{YYYYMMDDToDay(19700101), YYYYMMDDToDay(19700201), 10 * 15 * 60},
{YYYYMMDDToDay(19700101), YYYYMMDDToDay(19701231), 10 * 11 * 13 * 17}
// // 11 was chosen as a number which can't divide product of 2-combinarions of (7, 24, 60),
// // to reduce likelehood of hitting same hour/minute/second values for different days.
// // + 12 is just to make sure that last day is covered fully.

View File

@ -141,7 +141,7 @@ class IColumn;
M(UInt64, optimize_min_equality_disjunction_chain_length, 3, "The minimum length of the expression `expr = x1 OR ... expr = xN` for optimization ", 0) \
\
M(UInt64, min_bytes_to_use_direct_io, 0, "The minimum number of bytes for reading the data with O_DIRECT option during SELECT queries execution. 0 - disabled.", 0) \
M(UInt64, min_bytes_to_use_mmap_io, 0, "The minimum number of bytes for reading the data with mmap option during SELECT queries execution. 0 - disabled.", 0) \
M(UInt64, min_bytes_to_use_mmap_io, (64 * 1024 * 1024), "The minimum number of bytes for reading the data with mmap option during SELECT queries execution. 0 - disabled.", 0) \
M(Bool, checksum_on_read, true, "Validate checksums on reading. It is enabled by default and should be always enabled in production. Please do not expect any benefits in disabling this setting. It may only be used for experiments and benchmarks. The setting only applicable for tables of MergeTree family. Checksums are always validated for other table engines and when receiving data over network.", 0) \
\
M(Bool, force_index_by_date, 0, "Throw an exception if there is a partition key in a table, and it is not used.", 0) \
@ -240,6 +240,7 @@ class IColumn;
M(Bool, metrics_perf_events_enabled, false, "If enabled, some of the perf events will be measured throughout queries' execution.", 0) \
M(String, metrics_perf_events_list, "", "Comma separated list of perf metrics that will be measured throughout queries' execution. Empty means all events. See PerfEventInfo in sources for the available events.", 0) \
M(Float, opentelemetry_start_trace_probability, 0., "Probability to start an OpenTelemetry trace for an incoming query.", 0) \
M(Bool, prefer_column_name_to_alias, false, "Prefer using column names instead of aliases if possible.", 0) \
\
\
/** Limits during query execution are part of the settings. \

View File

@ -188,8 +188,6 @@ private:
size_t fetched_columns_index = 0;
size_t keys_size = keys.size();
std::chrono::seconds max_lifetime_seconds(configuration.strict_max_lifetime_seconds);
PaddedPODArray<FetchedKey> fetched_keys;
fetched_keys.resize_fill(keys_size);

View File

@ -4,19 +4,17 @@
#include <Common/assert_cast.h>
#include <Common/IPv6ToBinary.h>
#include <Common/memcmpSmall.h>
#include <Common/memcpySmall.h>
#include <Common/typeid_cast.h>
#include <DataTypes/DataTypeFixedString.h>
#include <DataTypes/DataTypeString.h>
#include <DataTypes/DataTypesDecimal.h>
#include <IO/WriteIntText.h>
#include <Poco/ByteOrder.h>
#include <Common/formatIPv6.h>
#include <common/itoa.h>
#include <ext/map.h>
#include <ext/range.h>
#include "DictionaryBlockInputStream.h"
#include "DictionaryFactory.h"
#include <Dictionaries/DictionaryBlockInputStream.h>
#include <Dictionaries/DictionaryFactory.h>
#include <Functions/FunctionHelpers.h>
namespace DB
@ -191,57 +189,6 @@ inline static void mapIPv4ToIPv6(UInt32 addr, uint8_t * buf)
memcpy(&buf[12], &addr, 4);
}
static bool matchIPv4Subnet(UInt32 target, UInt32 addr, UInt8 prefix)
{
UInt32 mask = (prefix >= 32) ? 0xffffffffu : ~(0xffffffffu >> prefix);
return (target & mask) == addr;
}
#if defined(__SSE2__)
#include <emmintrin.h>
static bool matchIPv6Subnet(const uint8_t * target, const uint8_t * addr, UInt8 prefix)
{
uint16_t mask = _mm_movemask_epi8(_mm_cmpeq_epi8(
_mm_loadu_si128(reinterpret_cast<const __m128i *>(target)),
_mm_loadu_si128(reinterpret_cast<const __m128i *>(addr))));
mask = ~mask;
if (mask)
{
auto offset = __builtin_ctz(mask);
if (prefix / 8 != offset)
return prefix / 8 < offset;
auto cmpmask = ~(0xff >> (prefix % 8));
return (target[offset] & cmpmask) == addr[offset];
}
return true;
}
# else
static bool matchIPv6Subnet(const uint8_t * target, const uint8_t * addr, UInt8 prefix)
{
if (prefix > IPV6_BINARY_LENGTH * 8U)
prefix = IPV6_BINARY_LENGTH * 8U;
size_t i = 0;
for (; prefix >= 8; ++i, prefix -= 8)
{
if (target[i] != addr[i])
return false;
}
if (prefix == 0)
return true;
auto mask = ~(0xff >> prefix);
return (target[i] & mask) == addr[i];
}
#endif // __SSE2__
IPAddressDictionary::IPAddressDictionary(
const StorageID & dict_id_,
const DictionaryStructure & dict_struct_,

View File

@ -271,7 +271,7 @@ std::vector<IPolygonDictionary::Point> IPolygonDictionary::extractPoints(const C
throw Exception(ErrorCodes::BAD_ARGUMENTS,
"PolygonDictionary input point component must not be NaN");
if (isinf(x) || isinf(y))
if (std::isinf(x) || std::isinf(y))
throw Exception(ErrorCodes::BAD_ARGUMENTS,
"PolygonDictionary input point component must not be infinite");

View File

@ -1645,7 +1645,7 @@ private:
static inline void applyCIDRMask(const UInt8 * __restrict src, UInt8 * __restrict dst_lower, UInt8 * __restrict dst_upper, UInt8 bits_to_keep)
{
__m128i mask = _mm_loadu_si128(reinterpret_cast<const __m128i *>(getCIDRMaskIPv6(bits_to_keep)));
__m128i mask = _mm_loadu_si128(reinterpret_cast<const __m128i *>(getCIDRMaskIPv6(bits_to_keep).data()));
__m128i lower = _mm_and_si128(_mm_loadu_si128(reinterpret_cast<const __m128i *>(src)), mask);
_mm_storeu_si128(reinterpret_cast<__m128i *>(dst_lower), lower);
@ -1659,7 +1659,7 @@ private:
/// NOTE IPv6 is stored in memory in big endian format that makes some difficulties.
static void applyCIDRMask(const UInt8 * __restrict src, UInt8 * __restrict dst_lower, UInt8 * __restrict dst_upper, UInt8 bits_to_keep)
{
const auto * mask = getCIDRMaskIPv6(bits_to_keep);
const auto & mask = getCIDRMaskIPv6(bits_to_keep);
for (size_t i = 0; i < 16; ++i)
{

View File

@ -22,6 +22,11 @@ namespace ErrorCodes
}
/** Generates array
* range(size): [0, size)
* range(start, end): [start, end)
* range(start, end, step): [start, end) with step increments.
*/
class FunctionRange : public IFunction
{
public:
@ -40,9 +45,9 @@ private:
{
if (arguments.size() > 3 || arguments.empty())
{
throw Exception{"Function " + getName() + " needs 1..3 arguments; passed "
+ std::to_string(arguments.size()) + ".",
ErrorCodes::NUMBER_OF_ARGUMENTS_DOESNT_MATCH};
throw Exception(ErrorCodes::NUMBER_OF_ARGUMENTS_DOESNT_MATCH,
"Function {} needs 1..3 arguments; passed {}.",
getName(), arguments.size());
}
for (const auto & arg : arguments)
@ -339,6 +344,18 @@ private:
ColumnPtr executeImpl(const ColumnsWithTypeAndName & arguments, const DataTypePtr & result_type, size_t input_rows_count) const override
{
DataTypePtr elem_type = checkAndGetDataType<DataTypeArray>(result_type.get())->getNestedType();
WhichDataType which(elem_type);
if (!which.isUInt8()
&& !which.isUInt16()
&& !which.isUInt32()
&& !which.isUInt64())
{
throw Exception{"Illegal columns of arguments of function " + getName()
+ ", the function only implemented for unsigned integers up to 64 bit", ErrorCodes::ILLEGAL_COLUMN};
}
ColumnPtr res;
if (arguments.size() == 1)
{
@ -356,22 +373,24 @@ private:
Columns columns_holder(3);
ColumnRawPtrs column_ptrs(3);
const auto return_type = checkAndGetDataType<DataTypeArray>(result_type.get())->getNestedType();
for (size_t i = 0; i < arguments.size(); ++i)
{
if (i == 1)
columns_holder[i] = castColumn(arguments[i], return_type)->convertToFullColumnIfConst();
columns_holder[i] = castColumn(arguments[i], elem_type)->convertToFullColumnIfConst();
else
columns_holder[i] = castColumn(arguments[i], return_type);
columns_holder[i] = castColumn(arguments[i], elem_type);
column_ptrs[i] = columns_holder[i].get();
}
// for step column, defaults to 1
/// Step is one by default.
if (arguments.size() == 2)
{
columns_holder[2] = return_type->createColumnConst(input_rows_count, 1);
/// Convert a column with constant 1 to the result type.
columns_holder[2] = castColumn(
{DataTypeUInt8().createColumnConst(input_rows_count, 1), std::make_shared<DataTypeUInt8>(), {}},
elem_type);
column_ptrs[2] = columns_holder[2].get();
}
@ -385,7 +404,9 @@ private:
if ((res = executeConstStartStep<UInt8>(column_ptrs[1], start, step, input_rows_count)) ||
(res = executeConstStartStep<UInt16>(column_ptrs[1], start, step, input_rows_count)) ||
(res = executeConstStartStep<UInt32>(column_ptrs[1], start, step, input_rows_count)) ||
(res = executeConstStartStep<UInt64>(column_ptrs[1], start, step, input_rows_count))) {}
(res = executeConstStartStep<UInt64>(column_ptrs[1], start, step, input_rows_count)))
{
}
}
else if (is_start_const && !is_step_const)
{
@ -394,7 +415,9 @@ private:
if ((res = executeConstStart<UInt8>(column_ptrs[1], column_ptrs[2], start, input_rows_count)) ||
(res = executeConstStart<UInt16>(column_ptrs[1], column_ptrs[2], start, input_rows_count)) ||
(res = executeConstStart<UInt32>(column_ptrs[1], column_ptrs[2], start, input_rows_count)) ||
(res = executeConstStart<UInt64>(column_ptrs[1], column_ptrs[2], start, input_rows_count))) {}
(res = executeConstStart<UInt64>(column_ptrs[1], column_ptrs[2], start, input_rows_count)))
{
}
}
else if (!is_start_const && is_step_const)
{
@ -403,14 +426,18 @@ private:
if ((res = executeConstStep<UInt8>(column_ptrs[0], column_ptrs[1], step, input_rows_count)) ||
(res = executeConstStep<UInt16>(column_ptrs[0], column_ptrs[1], step, input_rows_count)) ||
(res = executeConstStep<UInt32>(column_ptrs[0], column_ptrs[1], step, input_rows_count)) ||
(res = executeConstStep<UInt64>(column_ptrs[0], column_ptrs[1], step, input_rows_count))) {}
(res = executeConstStep<UInt64>(column_ptrs[0], column_ptrs[1], step, input_rows_count)))
{
}
}
else
{
if ((res = executeGeneric<UInt8>(column_ptrs[0], column_ptrs[1], column_ptrs[2], input_rows_count)) ||
(res = executeGeneric<UInt16>(column_ptrs[0], column_ptrs[1], column_ptrs[2], input_rows_count)) ||
(res = executeGeneric<UInt32>(column_ptrs[0], column_ptrs[1], column_ptrs[2], input_rows_count)) ||
(res = executeGeneric<UInt64>(column_ptrs[0], column_ptrs[1], column_ptrs[2], input_rows_count))) {}
(res = executeGeneric<UInt64>(column_ptrs[0], column_ptrs[1], column_ptrs[2], input_rows_count)))
{
}
}
if (!res)

View File

@ -7,15 +7,15 @@
namespace DB
{
/// Get the connection ID. It's used for MySQL handler only.
class FunctionConnectionID : public IFunction
/// Get the connection Id. It's used for MySQL handler only.
class FunctionConnectionId : public IFunction
{
public:
static constexpr auto name = "connectionID";
static constexpr auto name = "connectionId";
explicit FunctionConnectionID(const Context & context_) : context(context_) {}
explicit FunctionConnectionId(const Context & context_) : context(context_) {}
static FunctionPtr create(const Context & context) { return std::make_shared<FunctionConnectionID>(context); }
static FunctionPtr create(const Context & context) { return std::make_shared<FunctionConnectionId>(context); }
String getName() const override { return name; }
@ -32,9 +32,9 @@ private:
const Context & context;
};
void registerFunctionConnectionID(FunctionFactory & factory)
void registerFunctionConnectionId(FunctionFactory & factory)
{
factory.registerFunction<FunctionConnectionID>(FunctionFactory::CaseInsensitive);
factory.registerFunction<FunctionConnectionId>(FunctionFactory::CaseInsensitive);
factory.registerAlias("connection_id", "connectionID", FunctionFactory::CaseInsensitive);
}

View File

@ -76,7 +76,7 @@ struct ColumnToPointsConverter
if (isNaN(first) || isNaN(second))
throw Exception("Point's component must not be NaN", ErrorCodes::ILLEGAL_TYPE_OF_ARGUMENT);
if (isinf(first) || isinf(second))
if (std::isinf(first) || std::isinf(second))
throw Exception("Point's component must not be infinite", ErrorCodes::ILLEGAL_TYPE_OF_ARGUMENT);
answer[i] = Point(first, second);

View File

@ -0,0 +1,259 @@
#include <Columns/ColumnConst.h>
#include <Columns/ColumnString.h>
#include <Columns/ColumnsNumber.h>
#include <Common/IPv6ToBinary.h>
#include <Common/formatIPv6.h>
#include <DataTypes/DataTypeNullable.h>
#include <DataTypes/DataTypesNumber.h>
#include <Functions/IFunctionImpl.h>
#include <Functions/FunctionFactory.h>
#include <Functions/FunctionHelpers.h>
#include <variant>
#include <charconv>
#include <common/logger_useful.h>
namespace DB::ErrorCodes
{
extern const int CANNOT_PARSE_TEXT;
extern const int ILLEGAL_TYPE_OF_ARGUMENT;
extern const int NUMBER_OF_ARGUMENTS_DOESNT_MATCH;
}
namespace
{
class IPAddressVariant
{
public:
explicit IPAddressVariant(const StringRef & address_str)
{
/// IP address parser functions require that the input is
/// NULL-terminated so we need to copy it.
const auto address_str_copy = std::string(address_str);
UInt32 v4;
if (DB::parseIPv4(address_str_copy.c_str(), reinterpret_cast<unsigned char *>(&v4)))
{
addr = v4;
}
else
{
addr = IPv6AddrType();
bool success = DB::parseIPv6(address_str_copy.c_str(), std::get<IPv6AddrType>(addr).data());
if (!success)
throw DB::Exception("Neither IPv4 nor IPv6 address: '" + address_str_copy + "'",
DB::ErrorCodes::CANNOT_PARSE_TEXT);
}
}
UInt32 asV4() const
{
if (const auto * val = std::get_if<IPv4AddrType>(&addr))
return *val;
return 0;
}
const uint8_t * asV6() const
{
if (const auto * val = std::get_if<IPv6AddrType>(&addr))
return val->data();
return nullptr;
}
private:
using IPv4AddrType = UInt32;
using IPv6AddrType = std::array<uint8_t, IPV6_BINARY_LENGTH>;
std::variant<IPv4AddrType, IPv6AddrType> addr;
};
struct IPAddressCIDR
{
IPAddressVariant address;
UInt8 prefix;
};
IPAddressCIDR parseIPWithCIDR(const StringRef cidr_str)
{
std::string_view cidr_str_view(cidr_str);
size_t pos_slash = cidr_str_view.find('/');
if (pos_slash == 0)
throw DB::Exception("Error parsing IP address with prefix: " + std::string(cidr_str), DB::ErrorCodes::CANNOT_PARSE_TEXT);
if (pos_slash == std::string_view::npos)
throw DB::Exception("The text does not contain '/': " + std::string(cidr_str), DB::ErrorCodes::CANNOT_PARSE_TEXT);
std::string_view addr_str = cidr_str_view.substr(0, pos_slash);
IPAddressVariant addr(StringRef{addr_str.data(), addr_str.size()});
uint8_t prefix = 0;
auto prefix_str = cidr_str_view.substr(pos_slash+1);
const auto * prefix_str_end = prefix_str.data() + prefix_str.size();
auto [parse_end, parse_error] = std::from_chars(prefix_str.data(), prefix_str_end, prefix);
uint8_t max_prefix = (addr.asV6() ? IPV6_BINARY_LENGTH : IPV4_BINARY_LENGTH) * 8;
bool has_error = parse_error != std::errc() || parse_end != prefix_str_end || prefix > max_prefix;
if (has_error)
throw DB::Exception("The CIDR has a malformed prefix bits: " + std::string(cidr_str), DB::ErrorCodes::CANNOT_PARSE_TEXT);
return {addr, static_cast<UInt8>(prefix)};
}
inline bool isAddressInRange(const IPAddressVariant & address, const IPAddressCIDR & cidr)
{
if (const auto * cidr_v6 = cidr.address.asV6())
{
if (const auto * addr_v6 = address.asV6())
return DB::matchIPv6Subnet(addr_v6, cidr_v6, cidr.prefix);
}
else
{
if (!address.asV6())
return DB::matchIPv4Subnet(address.asV4(), cidr.address.asV4(), cidr.prefix);
}
return false;
}
}
namespace DB
{
class FunctionIsIPAddressContainedIn : public IFunction
{
public:
static constexpr auto name = "isIPAddressInRange";
String getName() const override { return name; }
static FunctionPtr create(const Context &) { return std::make_shared<FunctionIsIPAddressContainedIn>(); }
ColumnPtr executeImpl(const ColumnsWithTypeAndName & arguments, const DataTypePtr & /* return_type */, size_t input_rows_count) const override
{
const IColumn * col_addr = arguments[0].column.get();
const IColumn * col_cidr = arguments[1].column.get();
if (const auto * col_addr_const = checkAndGetAnyColumnConst(col_addr))
{
if (const auto * col_cidr_const = checkAndGetAnyColumnConst(col_cidr))
return executeImpl(*col_addr_const, *col_cidr_const, input_rows_count);
else
return executeImpl(*col_addr_const, *col_cidr, input_rows_count);
}
else
{
if (const auto * col_cidr_const = checkAndGetAnyColumnConst(col_cidr))
return executeImpl(*col_addr, *col_cidr_const, input_rows_count);
else
return executeImpl(*col_addr, *col_cidr, input_rows_count);
}
}
virtual DataTypePtr getReturnTypeImpl(const DataTypes & arguments) const override
{
if (arguments.size() != 2)
throw Exception(
"Number of arguments for function " + getName() + " doesn't match: passed " + toString(arguments.size()) + ", should be 2",
ErrorCodes::NUMBER_OF_ARGUMENTS_DOESNT_MATCH);
const DataTypePtr & addr_type = arguments[0];
const DataTypePtr & prefix_type = arguments[1];
if (!isString(addr_type) || !isString(prefix_type))
throw Exception("The arguments of function " + getName() + " must be String",
ErrorCodes::ILLEGAL_TYPE_OF_ARGUMENT);
return std::make_shared<DataTypeUInt8>();
}
virtual size_t getNumberOfArguments() const override { return 2; }
bool useDefaultImplementationForNulls() const override { return false; }
private:
/// Like checkAndGetColumnConst() but this function doesn't
/// care about the type of data column.
static const ColumnConst * checkAndGetAnyColumnConst(const IColumn * column)
{
if (!column || !isColumnConst(*column))
return nullptr;
return assert_cast<const ColumnConst *>(column);
}
/// Both columns are constant.
static ColumnPtr executeImpl(
const ColumnConst & col_addr_const,
const ColumnConst & col_cidr_const,
size_t input_rows_count)
{
const auto & col_addr = col_addr_const.getDataColumn();
const auto & col_cidr = col_cidr_const.getDataColumn();
const auto addr = IPAddressVariant(col_addr.getDataAt(0));
const auto cidr = parseIPWithCIDR(col_cidr.getDataAt(0));
ColumnUInt8::MutablePtr col_res = ColumnUInt8::create(1);
ColumnUInt8::Container & vec_res = col_res->getData();
vec_res[0] = isAddressInRange(addr, cidr) ? 1 : 0;
return ColumnConst::create(std::move(col_res), input_rows_count);
}
/// Address is constant.
static ColumnPtr executeImpl(const ColumnConst & col_addr_const, const IColumn & col_cidr, size_t input_rows_count)
{
const auto & col_addr = col_addr_const.getDataColumn();
const auto addr = IPAddressVariant(col_addr.getDataAt (0));
ColumnUInt8::MutablePtr col_res = ColumnUInt8::create(input_rows_count);
ColumnUInt8::Container & vec_res = col_res->getData();
for (size_t i = 0; i < input_rows_count; i++)
{
const auto cidr = parseIPWithCIDR(col_cidr.getDataAt(i));
vec_res[i] = isAddressInRange(addr, cidr) ? 1 : 0;
}
return col_res;
}
/// CIDR is constant.
static ColumnPtr executeImpl(const IColumn & col_addr, const ColumnConst & col_cidr_const, size_t input_rows_count)
{
const auto & col_cidr = col_cidr_const.getDataColumn();
const auto cidr = parseIPWithCIDR(col_cidr.getDataAt(0));
ColumnUInt8::MutablePtr col_res = ColumnUInt8::create(input_rows_count);
ColumnUInt8::Container & vec_res = col_res->getData();
for (size_t i = 0; i < input_rows_count; i++)
{
const auto addr = IPAddressVariant(col_addr.getDataAt(i));
vec_res[i] = isAddressInRange(addr, cidr) ? 1 : 0;
}
return col_res;
}
/// Neither are constant.
static ColumnPtr executeImpl(const IColumn & col_addr, const IColumn & col_cidr, size_t input_rows_count)
{
ColumnUInt8::MutablePtr col_res = ColumnUInt8::create(input_rows_count);
ColumnUInt8::Container & vec_res = col_res->getData();
for (size_t i = 0; i < input_rows_count; i++)
{
const auto addr = IPAddressVariant(col_addr.getDataAt(i));
const auto cidr = parseIPWithCIDR(col_cidr.getDataAt(i));
vec_res[i] = isAddressInRange(addr, cidr) ? 1 : 0;
}
return col_res;
}
};
void registerFunctionIsIPAddressContainedIn(FunctionFactory & factory)
{
factory.registerFunction<FunctionIsIPAddressContainedIn>();
}
}

View File

@ -0,0 +1,70 @@
#include <memory>
#include <Columns/ColumnString.h>
#include <Core/Block.h>
#include <DataTypes/DataTypeString.h>
#include <Functions/FunctionFactory.h>
#include <Functions/IFunctionImpl.h>
#include <Storages/MergeTree/MergeTreePartition.h>
namespace DB
{
namespace ErrorCodes
{
extern const int NUMBER_OF_ARGUMENTS_DOESNT_MATCH;
}
/** partitionId(x, y, ...) is a function that computes partition ids of arguments.
* The function is slow and should not be called for large amount of rows.
*/
class FunctionPartitionId : public IFunction
{
public:
static constexpr auto name = "partitionId";
static FunctionPtr create(const Context &) { return std::make_shared<FunctionPartitionId>(); }
String getName() const override { return name; }
bool isVariadic() const override { return true; }
size_t getNumberOfArguments() const override { return 0; }
bool isInjective(const ColumnsWithTypeAndName & /*sample_columns*/) const override { return true; }
bool useDefaultImplementationForNulls() const override { return true; }
bool useDefaultImplementationForConstants() const override { return true; }
DataTypePtr getReturnTypeImpl(const DataTypes & arguments) const override
{
if (arguments.empty())
throw Exception("Function " + getName() + " requires at least one argument.", ErrorCodes::NUMBER_OF_ARGUMENTS_DOESNT_MATCH);
return std::make_shared<DataTypeString>();
}
virtual ColumnPtr executeImpl(const ColumnsWithTypeAndName & arguments, const DataTypePtr &, size_t input_rows_count) const override
{
Block sample_block(arguments);
size_t size = arguments.size();
auto result_column = ColumnString::create();
for (size_t j = 0; j < input_rows_count; ++j)
{
Row row(size);
for (size_t i = 0; i < size; ++i)
arguments[i].column->get(j, row[i]);
MergeTreePartition partition(std::move(row));
result_column->insert(partition.getID(sample_block));
}
return result_column;
}
};
void registerFunctionPartitionId(FunctionFactory & factory)
{
factory.registerFunction<FunctionPartitionId>();
}
}

View File

@ -70,7 +70,9 @@ void registerFunctionErrorCodeToName(FunctionFactory &);
void registerFunctionTcpPort(FunctionFactory &);
void registerFunctionByteSize(FunctionFactory &);
void registerFunctionFile(FunctionFactory & factory);
void registerFunctionConnectionID(FunctionFactory & factory);
void registerFunctionConnectionId(FunctionFactory & factory);
void registerFunctionPartitionId(FunctionFactory & factory);
void registerFunctionIsIPAddressContainedIn(FunctionFactory &);
#if USE_ICU
void registerFunctionConvertCharset(FunctionFactory &);
@ -141,7 +143,9 @@ void registerFunctionsMiscellaneous(FunctionFactory & factory)
registerFunctionTcpPort(factory);
registerFunctionByteSize(factory);
registerFunctionFile(factory);
registerFunctionConnectionID(factory);
registerFunctionConnectionId(factory);
registerFunctionPartitionId(factory);
registerFunctionIsIPAddressContainedIn(factory);
#if USE_ICU
registerFunctionConvertCharset(factory);

View File

@ -210,7 +210,7 @@ SRCS(
cbrt.cpp
coalesce.cpp
concat.cpp
connectionID.cpp
connectionId.cpp
convertCharset.cpp
cos.cpp
cosh.cpp
@ -309,6 +309,7 @@ SRCS(
isConstant.cpp
isDecimalOverflow.cpp
isFinite.cpp
isIPAddressContainedIn.cpp
isInfinite.cpp
isNaN.cpp
isNotNull.cpp
@ -373,6 +374,7 @@ SRCS(
now.cpp
now64.cpp
nullIf.cpp
partitionId.cpp
pi.cpp
plus.cpp
pointInEllipses.cpp

View File

@ -37,13 +37,6 @@ bool ReadBufferFromPocoSocket::nextImpl()
while (async_callback && !socket.poll(0, Poco::Net::Socket::SELECT_READ))
async_callback(socket.impl()->sockfd(), socket.getReceiveTimeout(), socket_description);
/// receiveBytes in SecureStreamSocket throws TimeoutException after max(receive_timeout, send_timeout),
/// but we want to get this exception exactly after receive_timeout. So, set send_timeout = receive_timeout
/// before receiveBytes.
std::unique_ptr<TimeoutSetter> timeout_setter = nullptr;
if (socket.secure())
timeout_setter = std::make_unique<TimeoutSetter>(dynamic_cast<Poco::Net::StreamSocket &>(socket), socket.getReceiveTimeout(), socket.getReceiveTimeout());
bytes_read = socket.impl()->receiveBytes(internal_buffer.begin(), internal_buffer.size());
}
catch (const Poco::Net::NetException & e)

View File

@ -41,13 +41,6 @@ void WriteBufferFromPocoSocket::nextImpl()
/// Add more details to exceptions.
try
{
/// sendBytes in SecureStreamSocket throws TimeoutException after max(receive_timeout, send_timeout),
/// but we want to get this exception exactly after send_timeout. So, set receive_timeout = send_timeout
/// before sendBytes.
std::unique_ptr<TimeoutSetter> timeout_setter = nullptr;
if (socket.secure())
timeout_setter = std::make_unique<TimeoutSetter>(dynamic_cast<Poco::Net::StreamSocket &>(socket), socket.getSendTimeout(), socket.getSendTimeout());
res = socket.impl()->sendBytes(working_buffer.begin() + bytes_written, offset() - bytes_written);
}
catch (const Poco::Net::NetException & e)

View File

@ -201,7 +201,7 @@ void ExecuteScalarSubqueriesMatcher::visit(const ASTSubquery & subquery, ASTPtr
void ExecuteScalarSubqueriesMatcher::visit(const ASTFunction & func, ASTPtr & ast, Data & data)
{
/// Don't descend into subqueries in arguments of IN operator.
/// But if an argument is not subquery, than deeper may be scalar subqueries and we need to descend in them.
/// But if an argument is not subquery, then deeper may be scalar subqueries and we need to descend in them.
std::vector<ASTPtr *> out;
if (checkFunctionIsInOrGlobalInOperator(func))

View File

@ -299,7 +299,7 @@ void ExpressionAnalyzer::initGlobalSubqueriesAndExternalTables(bool do_global)
}
void SelectQueryExpressionAnalyzer::tryMakeSetForIndexFromSubquery(const ASTPtr & subquery_or_table_name)
void ExpressionAnalyzer::tryMakeSetForIndexFromSubquery(const ASTPtr & subquery_or_table_name, const SelectQueryOptions & query_options)
{
auto set_key = PreparedSetKey::forSubquery(*subquery_or_table_name);
@ -335,7 +335,7 @@ void SelectQueryExpressionAnalyzer::tryMakeSetForIndexFromSubquery(const ASTPtr
prepared_sets[set_key] = std::move(set);
}
SetPtr SelectQueryExpressionAnalyzer::isPlainStorageSetInSubquery(const ASTPtr & subquery_or_table_name)
SetPtr ExpressionAnalyzer::isPlainStorageSetInSubquery(const ASTPtr & subquery_or_table_name)
{
const auto * table = subquery_or_table_name->as<ASTIdentifier>();
if (!table)
@ -381,7 +381,7 @@ void SelectQueryExpressionAnalyzer::makeSetsForIndex(const ASTPtr & node)
if (arg->as<ASTSubquery>() || arg->as<ASTIdentifier>())
{
if (settings.use_index_for_in_with_subqueries)
tryMakeSetForIndexFromSubquery(arg);
tryMakeSetForIndexFromSubquery(arg, query_options);
}
else
{
@ -1334,9 +1334,9 @@ ExpressionActionsPtr ExpressionAnalyzer::getActions(bool add_aliases, bool proje
}
ExpressionActionsPtr ExpressionAnalyzer::getConstActions()
ExpressionActionsPtr ExpressionAnalyzer::getConstActions(const ColumnsWithTypeAndName & constant_inputs)
{
auto actions = std::make_shared<ActionsDAG>(NamesAndTypesList());
auto actions = std::make_shared<ActionsDAG>(constant_inputs);
getRootActions(query, true, actions, true);
return std::make_shared<ExpressionActions>(actions, ExpressionActionsSettings::fromContext(context));

View File

@ -111,7 +111,7 @@ public:
/// Actions that can be performed on an empty block: adding constants and applying functions that depend only on constants.
/// Does not execute subqueries.
ExpressionActionsPtr getConstActions();
ExpressionActionsPtr getConstActions(const ColumnsWithTypeAndName & constant_inputs = {});
/** Sets that require a subquery to be create.
* Only the sets needed to perform actions returned from already executed `append*` or `getActions`.
@ -128,6 +128,19 @@ public:
void makeWindowDescriptions(ActionsDAGPtr actions);
/**
* Create Set from a subquery or a table expression in the query. The created set is suitable for using the index.
* The set will not be created if its size hits the limit.
*/
void tryMakeSetForIndexFromSubquery(const ASTPtr & subquery_or_table_name, const SelectQueryOptions & query_options = {});
/**
* Checks if subquery is not a plain StorageSet.
* Because while making set we will read data from StorageSet which is not allowed.
* Returns valid SetPtr from StorageSet if the latter is used after IN or nullptr otherwise.
*/
SetPtr isPlainStorageSetInSubquery(const ASTPtr & subquery_or_table_name);
protected:
ExpressionAnalyzer(
const ASTPtr & query_,
@ -299,19 +312,6 @@ private:
NameSet required_result_columns;
SelectQueryOptions query_options;
/**
* Create Set from a subquery or a table expression in the query. The created set is suitable for using the index.
* The set will not be created if its size hits the limit.
*/
void tryMakeSetForIndexFromSubquery(const ASTPtr & subquery_or_table_name);
/**
* Checks if subquery is not a plain StorageSet.
* Because while making set we will read data from StorageSet which is not allowed.
* Returns valid SetPtr from StorageSet if the latter is used after IN or nullptr otherwise.
*/
SetPtr isPlainStorageSetInSubquery(const ASTPtr & subquery_or_table_name);
/// Create Set-s that we make from IN section to use index on them.
void makeSetsForIndex(const ASTPtr & node);

View File

@ -7,6 +7,7 @@
#include <Parsers/ASTAlterQuery.h>
#include <Parsers/ASTCreateQuery.h>
#include <Parsers/ASTColumnDeclaration.h>
#include <Parsers/ASTIndexDeclaration.h>
#include <Parsers/MySQL/ASTCreateQuery.h>
#include <Parsers/MySQL/ASTAlterCommand.h>
#include <Parsers/MySQL/ASTDeclareColumn.h>
@ -411,13 +412,26 @@ ASTs InterpreterCreateImpl::getRewrittenQueries(
return column_declaration;
};
/// Add _sign and _version column.
/// Add _sign and _version columns.
String sign_column_name = getUniqueColumnName(columns_name_and_type, "_sign");
String version_column_name = getUniqueColumnName(columns_name_and_type, "_version");
columns->set(columns->columns, InterpreterCreateQuery::formatColumns(columns_name_and_type));
columns->columns->children.emplace_back(create_materialized_column_declaration(sign_column_name, "Int8", UInt64(1)));
columns->columns->children.emplace_back(create_materialized_column_declaration(version_column_name, "UInt64", UInt64(1)));
/// Add minmax skipping index for _version column.
auto version_index = std::make_shared<ASTIndexDeclaration>();
version_index->name = version_column_name;
auto index_expr = std::make_shared<ASTIdentifier>(version_column_name);
auto index_type = makeASTFunction("minmax");
index_type->no_empty_args = true;
version_index->set(version_index->expr, index_expr);
version_index->set(version_index->type, index_type);
version_index->granularity = 1;
ASTPtr indices = std::make_shared<ASTExpressionList>();
indices->children.push_back(version_index);
columns->set(columns->indices, indices);
auto storage = std::make_shared<ASTStorage>();
/// The `partition by` expression must use primary keys, otherwise the primary keys will not be merge.

View File

@ -28,6 +28,10 @@ static inline ASTPtr tryRewrittenCreateQuery(const String & query, const Context
context, "test_database", "test_database")[0];
}
static const char MATERIALIZEMYSQL_TABLE_COLUMNS[] = ", `_sign` Int8() MATERIALIZED 1"
", `_version` UInt64() MATERIALIZED 1"
", INDEX _version _version TYPE minmax GRANULARITY 1";
TEST(MySQLCreateRewritten, ColumnsDataType)
{
tryRegisterFunctions();
@ -45,46 +49,46 @@ TEST(MySQLCreateRewritten, ColumnsDataType)
{
EXPECT_EQ(queryToString(tryRewrittenCreateQuery(
"CREATE TABLE `test_database`.`test_table_1`(`key` INT NOT NULL PRIMARY KEY, test " + test_type + ")", context_holder.context)),
"CREATE TABLE test_database.test_table_1 (`key` Int32, `test` Nullable(" + mapped_type + ")"
", `_sign` Int8() MATERIALIZED 1, `_version` UInt64() MATERIALIZED 1) ENGINE = "
"CREATE TABLE test_database.test_table_1 (`key` Int32, `test` Nullable(" + mapped_type + ")" +
MATERIALIZEMYSQL_TABLE_COLUMNS + ") ENGINE = "
"ReplacingMergeTree(_version) PARTITION BY intDiv(key, 4294967) ORDER BY tuple(key)");
EXPECT_EQ(queryToString(tryRewrittenCreateQuery(
"CREATE TABLE `test_database`.`test_table_1`(`key` INT NOT NULL PRIMARY KEY, test " + test_type + " NOT NULL)", context_holder.context)),
"CREATE TABLE test_database.test_table_1 (`key` Int32, `test` " + mapped_type +
", `_sign` Int8() MATERIALIZED 1, `_version` UInt64() MATERIALIZED 1) ENGINE = "
MATERIALIZEMYSQL_TABLE_COLUMNS + ") ENGINE = "
"ReplacingMergeTree(_version) PARTITION BY intDiv(key, 4294967) ORDER BY tuple(key)");
EXPECT_EQ(queryToString(tryRewrittenCreateQuery(
"CREATE TABLE `test_database`.`test_table_1`(`key` INT NOT NULL PRIMARY KEY, test " + test_type + " COMMENT 'test_comment' NOT NULL)", context_holder.context)),
"CREATE TABLE test_database.test_table_1 (`key` Int32, `test` " + mapped_type +
", `_sign` Int8() MATERIALIZED 1, `_version` UInt64() MATERIALIZED 1) ENGINE = "
MATERIALIZEMYSQL_TABLE_COLUMNS + ") ENGINE = "
"ReplacingMergeTree(_version) PARTITION BY intDiv(key, 4294967) ORDER BY tuple(key)");
if (Poco::toUpper(test_type).find("INT") != std::string::npos)
{
EXPECT_EQ(queryToString(tryRewrittenCreateQuery(
"CREATE TABLE `test_database`.`test_table_1`(`key` INT NOT NULL PRIMARY KEY, test " + test_type + " UNSIGNED)", context_holder.context)),
"CREATE TABLE test_database.test_table_1 (`key` Int32, `test` Nullable(U" + mapped_type + ")"
", `_sign` Int8() MATERIALIZED 1, `_version` UInt64() MATERIALIZED 1) ENGINE = "
"CREATE TABLE test_database.test_table_1 (`key` Int32, `test` Nullable(U" + mapped_type + ")" +
MATERIALIZEMYSQL_TABLE_COLUMNS + ") ENGINE = "
"ReplacingMergeTree(_version) PARTITION BY intDiv(key, 4294967) ORDER BY tuple(key)");
EXPECT_EQ(queryToString(tryRewrittenCreateQuery(
"CREATE TABLE `test_database`.`test_table_1`(`key` INT NOT NULL PRIMARY KEY, test " + test_type + " COMMENT 'test_comment' UNSIGNED)", context_holder.context)),
"CREATE TABLE test_database.test_table_1 (`key` Int32, `test` Nullable(U" + mapped_type + ")"
", `_sign` Int8() MATERIALIZED 1, `_version` UInt64() MATERIALIZED 1) ENGINE = "
"CREATE TABLE test_database.test_table_1 (`key` Int32, `test` Nullable(U" + mapped_type + ")" +
MATERIALIZEMYSQL_TABLE_COLUMNS + ") ENGINE = "
"ReplacingMergeTree(_version) PARTITION BY intDiv(key, 4294967) ORDER BY tuple(key)");
EXPECT_EQ(queryToString(tryRewrittenCreateQuery(
"CREATE TABLE `test_database`.`test_table_1`(`key` INT NOT NULL PRIMARY KEY, test " + test_type + " NOT NULL UNSIGNED)", context_holder.context)),
"CREATE TABLE test_database.test_table_1 (`key` Int32, `test` U" + mapped_type +
", `_sign` Int8() MATERIALIZED 1, `_version` UInt64() MATERIALIZED 1) ENGINE = "
MATERIALIZEMYSQL_TABLE_COLUMNS + ") ENGINE = "
"ReplacingMergeTree(_version) PARTITION BY intDiv(key, 4294967) ORDER BY tuple(key)");
EXPECT_EQ(queryToString(tryRewrittenCreateQuery(
"CREATE TABLE `test_database`.`test_table_1`(`key` INT NOT NULL PRIMARY KEY, test " + test_type + " COMMENT 'test_comment' UNSIGNED NOT NULL)", context_holder.context)),
"CREATE TABLE test_database.test_table_1 (`key` Int32, `test` U" + mapped_type +
", `_sign` Int8() MATERIALIZED 1, `_version` UInt64() MATERIALIZED 1) ENGINE = "
MATERIALIZEMYSQL_TABLE_COLUMNS + ") ENGINE = "
"ReplacingMergeTree(_version) PARTITION BY intDiv(key, 4294967) ORDER BY tuple(key)");
}
}
@ -109,13 +113,15 @@ TEST(MySQLCreateRewritten, PartitionPolicy)
{
EXPECT_EQ(queryToString(tryRewrittenCreateQuery(
"CREATE TABLE `test_database`.`test_table_1` (`key` " + test_type + " PRIMARY KEY)", context_holder.context)),
"CREATE TABLE test_database.test_table_1 (`key` " + mapped_type + ", `_sign` Int8() MATERIALIZED 1, "
"`_version` UInt64() MATERIALIZED 1) ENGINE = ReplacingMergeTree(_version)" + partition_policy + " ORDER BY tuple(key)");
"CREATE TABLE test_database.test_table_1 (`key` " + mapped_type +
MATERIALIZEMYSQL_TABLE_COLUMNS +
") ENGINE = ReplacingMergeTree(_version)" + partition_policy + " ORDER BY tuple(key)");
EXPECT_EQ(queryToString(tryRewrittenCreateQuery(
"CREATE TABLE `test_database`.`test_table_1` (`key` " + test_type + " NOT NULL PRIMARY KEY)", context_holder.context)),
"CREATE TABLE test_database.test_table_1 (`key` " + mapped_type + ", `_sign` Int8() MATERIALIZED 1, "
"`_version` UInt64() MATERIALIZED 1) ENGINE = ReplacingMergeTree(_version)" + partition_policy + " ORDER BY tuple(key)");
"CREATE TABLE test_database.test_table_1 (`key` " + mapped_type +
MATERIALIZEMYSQL_TABLE_COLUMNS +
") ENGINE = ReplacingMergeTree(_version)" + partition_policy + " ORDER BY tuple(key)");
}
}
@ -138,23 +144,27 @@ TEST(MySQLCreateRewritten, OrderbyPolicy)
{
EXPECT_EQ(queryToString(tryRewrittenCreateQuery(
"CREATE TABLE `test_database`.`test_table_1` (`key` " + test_type + " PRIMARY KEY, `key2` " + test_type + " UNIQUE KEY)", context_holder.context)),
"CREATE TABLE test_database.test_table_1 (`key` " + mapped_type + ", `key2` Nullable(" + mapped_type + "), `_sign` Int8() MATERIALIZED 1, "
"`_version` UInt64() MATERIALIZED 1) ENGINE = ReplacingMergeTree(_version)" + partition_policy + " ORDER BY (key, assumeNotNull(key2))");
"CREATE TABLE test_database.test_table_1 (`key` " + mapped_type + ", `key2` Nullable(" + mapped_type + ")" +
MATERIALIZEMYSQL_TABLE_COLUMNS +
") ENGINE = ReplacingMergeTree(_version)" + partition_policy + " ORDER BY (key, assumeNotNull(key2))");
EXPECT_EQ(queryToString(tryRewrittenCreateQuery(
"CREATE TABLE `test_database`.`test_table_1` (`key` " + test_type + " NOT NULL PRIMARY KEY, `key2` " + test_type + " NOT NULL UNIQUE KEY)", context_holder.context)),
"CREATE TABLE test_database.test_table_1 (`key` " + mapped_type + ", `key2` " + mapped_type + ", `_sign` Int8() MATERIALIZED 1, "
"`_version` UInt64() MATERIALIZED 1) ENGINE = ReplacingMergeTree(_version)" + partition_policy + " ORDER BY (key, key2)");
"CREATE TABLE test_database.test_table_1 (`key` " + mapped_type + ", `key2` " + mapped_type +
MATERIALIZEMYSQL_TABLE_COLUMNS +
") ENGINE = ReplacingMergeTree(_version)" + partition_policy + " ORDER BY (key, key2)");
EXPECT_EQ(queryToString(tryRewrittenCreateQuery(
"CREATE TABLE `test_database`.`test_table_1` (`key` " + test_type + " KEY UNIQUE KEY)", context_holder.context)),
"CREATE TABLE test_database.test_table_1 (`key` " + mapped_type + ", `_sign` Int8() MATERIALIZED 1, "
"`_version` UInt64() MATERIALIZED 1) ENGINE = ReplacingMergeTree(_version)" + partition_policy + " ORDER BY tuple(key)");
"CREATE TABLE test_database.test_table_1 (`key` " + mapped_type +
MATERIALIZEMYSQL_TABLE_COLUMNS +
") ENGINE = ReplacingMergeTree(_version)" + partition_policy + " ORDER BY tuple(key)");
EXPECT_EQ(queryToString(tryRewrittenCreateQuery(
"CREATE TABLE `test_database`.`test_table_1` (`key` " + test_type + ", `key2` " + test_type + " UNIQUE KEY, PRIMARY KEY(`key`, `key2`))", context_holder.context)),
"CREATE TABLE test_database.test_table_1 (`key` " + mapped_type + ", `key2` " + mapped_type + ", `_sign` Int8() MATERIALIZED 1, "
"`_version` UInt64() MATERIALIZED 1) ENGINE = ReplacingMergeTree(_version)" + partition_policy + " ORDER BY (key, key2)");
"CREATE TABLE test_database.test_table_1 (`key` " + mapped_type + ", `key2` " + mapped_type +
MATERIALIZEMYSQL_TABLE_COLUMNS +
") ENGINE = ReplacingMergeTree(_version)" + partition_policy + " ORDER BY (key, key2)");
}
}
@ -165,23 +175,27 @@ TEST(MySQLCreateRewritten, RewrittenQueryWithPrimaryKey)
EXPECT_EQ(queryToString(tryRewrittenCreateQuery(
"CREATE TABLE `test_database`.`test_table_1` (`key` int NOT NULL PRIMARY KEY) ENGINE=InnoDB DEFAULT CHARSET=utf8", context_holder.context)),
"CREATE TABLE test_database.test_table_1 (`key` Int32, `_sign` Int8() MATERIALIZED 1, `_version` UInt64() MATERIALIZED 1) ENGINE = ReplacingMergeTree(_version) "
"PARTITION BY intDiv(key, 4294967) ORDER BY tuple(key)");
"CREATE TABLE test_database.test_table_1 (`key` Int32" +
std::string(MATERIALIZEMYSQL_TABLE_COLUMNS) +
") ENGINE = ReplacingMergeTree(_version) PARTITION BY intDiv(key, 4294967) ORDER BY tuple(key)");
EXPECT_EQ(queryToString(tryRewrittenCreateQuery(
"CREATE TABLE `test_database`.`test_table_1` (`key` int NOT NULL, PRIMARY KEY (`key`)) ENGINE=InnoDB DEFAULT CHARSET=utf8", context_holder.context)),
"CREATE TABLE test_database.test_table_1 (`key` Int32, `_sign` Int8() MATERIALIZED 1, `_version` UInt64() MATERIALIZED 1) ENGINE = ReplacingMergeTree(_version) "
"PARTITION BY intDiv(key, 4294967) ORDER BY tuple(key)");
"CREATE TABLE test_database.test_table_1 (`key` Int32" +
std::string(MATERIALIZEMYSQL_TABLE_COLUMNS) +
") ENGINE = ReplacingMergeTree(_version) PARTITION BY intDiv(key, 4294967) ORDER BY tuple(key)");
EXPECT_EQ(queryToString(tryRewrittenCreateQuery(
"CREATE TABLE `test_database`.`test_table_1` (`key_1` int NOT NULL, key_2 INT NOT NULL, PRIMARY KEY (`key_1`, `key_2`)) ENGINE=InnoDB DEFAULT CHARSET=utf8", context_holder.context)),
"CREATE TABLE test_database.test_table_1 (`key_1` Int32, `key_2` Int32, `_sign` Int8() MATERIALIZED 1, `_version` UInt64() MATERIALIZED 1) ENGINE = "
"ReplacingMergeTree(_version) PARTITION BY intDiv(key_1, 4294967) ORDER BY (key_1, key_2)");
"CREATE TABLE test_database.test_table_1 (`key_1` Int32, `key_2` Int32" +
std::string(MATERIALIZEMYSQL_TABLE_COLUMNS) +
") ENGINE = ReplacingMergeTree(_version) PARTITION BY intDiv(key_1, 4294967) ORDER BY (key_1, key_2)");
EXPECT_EQ(queryToString(tryRewrittenCreateQuery(
"CREATE TABLE `test_database`.`test_table_1` (`key_1` BIGINT NOT NULL, key_2 INT NOT NULL, PRIMARY KEY (`key_1`, `key_2`)) ENGINE=InnoDB DEFAULT CHARSET=utf8", context_holder.context)),
"CREATE TABLE test_database.test_table_1 (`key_1` Int64, `key_2` Int32, `_sign` Int8() MATERIALIZED 1, `_version` UInt64() MATERIALIZED 1) ENGINE = "
"ReplacingMergeTree(_version) PARTITION BY intDiv(key_2, 4294967) ORDER BY (key_1, key_2)");
"CREATE TABLE test_database.test_table_1 (`key_1` Int64, `key_2` Int32" +
std::string(MATERIALIZEMYSQL_TABLE_COLUMNS) +
") ENGINE = ReplacingMergeTree(_version) PARTITION BY intDiv(key_2, 4294967) ORDER BY (key_1, key_2)");
}
TEST(MySQLCreateRewritten, RewrittenQueryWithPrefixKey)
@ -191,7 +205,8 @@ TEST(MySQLCreateRewritten, RewrittenQueryWithPrefixKey)
EXPECT_EQ(queryToString(tryRewrittenCreateQuery(
"CREATE TABLE `test_database`.`test_table_1` (`key` int NOT NULL PRIMARY KEY, `prefix_key` varchar(200) NOT NULL, KEY prefix_key_index(prefix_key(2))) ENGINE=InnoDB DEFAULT CHARSET=utf8", context_holder.context)),
"CREATE TABLE test_database.test_table_1 (`key` Int32, `prefix_key` String, `_sign` Int8() MATERIALIZED 1, `_version` UInt64() MATERIALIZED 1) ENGINE = "
"CREATE TABLE test_database.test_table_1 (`key` Int32, `prefix_key` String" +
std::string(MATERIALIZEMYSQL_TABLE_COLUMNS) + ") ENGINE = "
"ReplacingMergeTree(_version) PARTITION BY intDiv(key, 4294967) ORDER BY (key, prefix_key)");
}
@ -204,6 +219,7 @@ TEST(MySQLCreateRewritten, UniqueKeysConvert)
"CREATE TABLE `test_database`.`test_table_1` (code varchar(255) CHARACTER SET utf8 COLLATE utf8_bin NOT NULL,name varchar(255) CHARACTER SET utf8 COLLATE utf8_bin NOT NULL,"
" id bigint NOT NULL AUTO_INCREMENT, tenant_id bigint NOT NULL, PRIMARY KEY (id), UNIQUE KEY code_id (code, tenant_id), UNIQUE KEY name_id (name, tenant_id))"
" ENGINE=InnoDB AUTO_INCREMENT=100 DEFAULT CHARSET=utf8 COLLATE=utf8_bin;", context_holder.context)),
"CREATE TABLE test_database.test_table_1 (`code` String, `name` String, `id` Int64, `tenant_id` Int64, `_sign` Int8() MATERIALIZED 1, `_version` UInt64() MATERIALIZED 1)"
" ENGINE = ReplacingMergeTree(_version) PARTITION BY intDiv(id, 18446744073709551) ORDER BY (code, name, tenant_id, id)");
"CREATE TABLE test_database.test_table_1 (`code` String, `name` String, `id` Int64, `tenant_id` Int64" +
std::string(MATERIALIZEMYSQL_TABLE_COLUMNS) +
") ENGINE = ReplacingMergeTree(_version) PARTITION BY intDiv(id, 18446744073709551) ORDER BY (code, name, tenant_id, id)");
}

View File

@ -455,7 +455,10 @@ QueryStatusInfo QueryStatus::getInfo(bool get_thread_list, bool get_profile_even
}
if (get_settings && query_context)
{
res.query_settings = std::make_shared<Settings>(query_context->getSettings());
res.current_database = query_context->getCurrentDatabase();
}
return res;
}

View File

@ -68,6 +68,7 @@ struct QueryStatusInfo
std::vector<UInt64> thread_ids;
std::shared_ptr<ProfileEvents::Counters> profile_counters;
std::shared_ptr<Settings> query_settings;
std::string current_database;
};
/// Query and information about its execution.

View File

@ -72,6 +72,12 @@ void QueryNormalizer::visit(ASTIdentifier & node, ASTPtr & ast, Data & data)
if (!IdentifierSemantic::getColumnName(node))
return;
if (data.settings.prefer_column_name_to_alias)
{
if (data.source_columns_set.find(node.name()) != data.source_columns_set.end())
return;
}
/// If it is an alias, but not a parent alias (for constructs like "SELECT column + 1 AS column").
auto it_alias = data.aliases.find(node.name());
if (it_alias != data.aliases.end() && current_alias != node.name())
@ -131,8 +137,20 @@ static bool needVisitChild(const ASTPtr & child)
void QueryNormalizer::visit(ASTSelectQuery & select, const ASTPtr &, Data & data)
{
for (auto & child : select.children)
if (needVisitChild(child))
{
if (child == select.groupBy() || child == select.orderBy() || child == select.having())
{
bool old_setting = data.settings.prefer_column_name_to_alias;
data.settings.prefer_column_name_to_alias = false;
visit(child, data);
data.settings.prefer_column_name_to_alias = old_setting;
}
else
{
if (needVisitChild(child))
visit(child, data);
}
}
/// If the WHERE clause or HAVING consists of a single alias, the reference must be replaced not only in children,
/// but also in where_expression and having_expression.

View File

@ -4,6 +4,7 @@
#include <Parsers/IAST.h>
#include <Interpreters/Aliases.h>
#include <Core/Names.h>
namespace DB
{
@ -21,12 +22,15 @@ class QueryNormalizer
{
const UInt64 max_ast_depth;
const UInt64 max_expanded_ast_elements;
bool prefer_column_name_to_alias;
template <typename T>
ExtractedSettings(const T & settings)
: max_ast_depth(settings.max_ast_depth),
max_expanded_ast_elements(settings.max_expanded_ast_elements)
{}
: max_ast_depth(settings.max_ast_depth)
, max_expanded_ast_elements(settings.max_expanded_ast_elements)
, prefer_column_name_to_alias(settings.prefer_column_name_to_alias)
{
}
};
public:
@ -36,7 +40,8 @@ public:
using MapOfASTs = std::map<ASTPtr, ASTPtr>;
const Aliases & aliases;
const ExtractedSettings settings;
const NameSet & source_columns_set;
ExtractedSettings settings;
/// tmp data
size_t level;
@ -44,8 +49,9 @@ public:
SetOfASTs current_asts; /// vertices in the current call stack of this method
std::string current_alias; /// the alias referencing to the ancestor of ast (the deepest ancestor with aliases)
Data(const Aliases & aliases_, ExtractedSettings && settings_)
Data(const Aliases & aliases_, const NameSet & source_columns_set_, ExtractedSettings && settings_)
: aliases(aliases_)
, source_columns_set(source_columns_set_)
, settings(settings_)
, level(0)
{}

View File

@ -662,7 +662,10 @@ void TreeRewriterResult::collectUsedColumns(const ASTPtr & query, bool is_select
const auto & partition_desc = metadata_snapshot->getPartitionKey();
if (partition_desc.expression)
{
const auto & partition_source_columns = partition_desc.expression->getRequiredColumns();
auto partition_source_columns = partition_desc.expression->getRequiredColumns();
partition_source_columns.push_back("_part");
partition_source_columns.push_back("_partition_id");
partition_source_columns.push_back("_part_uuid");
optimize_trivial_count = true;
for (const auto & required_column : required)
{
@ -813,7 +816,14 @@ TreeRewriterResultPtr TreeRewriter::analyzeSelect(
/// Optimizes logical expressions.
LogicalExpressionsOptimizer(select_query, settings.optimize_min_equality_disjunction_chain_length.value).perform();
normalize(query, result.aliases, settings);
NameSet all_source_columns_set = source_columns_set;
if (table_join)
{
for (const auto & [name, _] : table_join->columns_from_joined_table)
all_source_columns_set.insert(name);
}
normalize(query, result.aliases, all_source_columns_set, settings);
/// Remove unneeded columns according to 'required_result_columns'.
/// Leave all selected columns in case of DISTINCT; columns that contain arrayJoin function inside.
@ -871,7 +881,7 @@ TreeRewriterResultPtr TreeRewriter::analyze(
TreeRewriterResult result(source_columns, storage, metadata_snapshot, false);
normalize(query, result.aliases, settings);
normalize(query, result.aliases, result.source_columns_set, settings);
/// Executing scalar subqueries. Column defaults could be a scalar subquery.
executeScalarSubqueries(query, context, 0, result.scalars, false);
@ -896,7 +906,7 @@ TreeRewriterResultPtr TreeRewriter::analyze(
return std::make_shared<const TreeRewriterResult>(result);
}
void TreeRewriter::normalize(ASTPtr & query, Aliases & aliases, const Settings & settings)
void TreeRewriter::normalize(ASTPtr & query, Aliases & aliases, const NameSet & source_columns_set, const Settings & settings)
{
CustomizeCountDistinctVisitor::Data data_count_distinct{settings.count_distinct_implementation};
CustomizeCountDistinctVisitor(data_count_distinct).visit(query);
@ -945,7 +955,7 @@ void TreeRewriter::normalize(ASTPtr & query, Aliases & aliases, const Settings &
FunctionNameNormalizer().visit(query.get());
/// Common subexpression elimination. Rewrite rules.
QueryNormalizer::Data normalizer_data(aliases, settings);
QueryNormalizer::Data normalizer_data(aliases, source_columns_set, settings);
QueryNormalizer(normalizer_data).visit(query);
}

View File

@ -119,7 +119,7 @@ public:
private:
const Context & context;
static void normalize(ASTPtr & query, Aliases & aliases, const Settings & settings);
static void normalize(ASTPtr & query, Aliases & aliases, const NameSet & source_columns_set, const Settings & settings);
};
}

View File

@ -20,6 +20,6 @@ TEST(QueryNormalizer, SimpleCycleAlias)
aliases["b"] = parseQuery(parser, "a as b", 0, 0)->children[0];
Settings settings;
QueryNormalizer::Data normalizer_data(aliases, settings);
QueryNormalizer::Data normalizer_data(aliases, {}, settings);
EXPECT_THROW(QueryNormalizer(normalizer_data).visit(ast), Exception);
}

View File

@ -42,6 +42,7 @@
#include <Storages/MergeTree/localBackup.h>
#include <Storages/StorageMergeTree.h>
#include <Storages/StorageReplicatedMergeTree.h>
#include <Storages/VirtualColumnUtils.h>
#include <Common/Increment.h>
#include <Common/SimpleIncrement.h>
#include <Common/Stopwatch.h>
@ -681,6 +682,41 @@ void MergeTreeData::MergingParams::check(const StorageInMemoryMetadata & metadat
}
std::optional<UInt64> MergeTreeData::totalRowsByPartitionPredicateImpl(
const SelectQueryInfo & query_info, const Context & context, const DataPartsVector & parts) const
{
auto metadata_snapshot = getInMemoryMetadataPtr();
ASTPtr expression_ast;
Block virtual_columns_block = MergeTreeDataSelectExecutor::getSampleBlockWithVirtualPartColumns();
// Generate valid expressions for filtering
bool valid = VirtualColumnUtils::prepareFilterBlockWithQuery(query_info.query, context, virtual_columns_block, expression_ast);
PartitionPruner partition_pruner(metadata_snapshot->getPartitionKey(), query_info, context, true /* strict */);
if (partition_pruner.isUseless() && !valid)
return {};
std::unordered_set<String> part_values;
if (valid && expression_ast)
{
MergeTreeDataSelectExecutor::fillBlockWithVirtualPartColumns(parts, virtual_columns_block);
VirtualColumnUtils::filterBlockWithQuery(query_info.query, virtual_columns_block, context, expression_ast);
part_values = VirtualColumnUtils::extractSingleValueFromBlock<String>(virtual_columns_block, "_part");
if (part_values.empty())
return 0;
}
// At this point, empty `part_values` means all parts.
size_t res = 0;
for (const auto & part : parts)
{
if ((part_values.empty() || part_values.find(part->name) != part_values.end()) && !partition_pruner.canBePruned(part))
res += part->rows_count;
}
return res;
}
String MergeTreeData::MergingParams::getModeName() const
{
switch (mode)

View File

@ -893,6 +893,9 @@ protected:
return {begin, end};
}
std::optional<UInt64> totalRowsByPartitionPredicateImpl(
const SelectQueryInfo & query_info, const Context & context, const DataPartsVector & parts) const;
static decltype(auto) getStateModifier(DataPartState state)
{
return [state] (const DataPartPtr & part) { part->setState(state); };

View File

@ -39,6 +39,7 @@
#include <DataTypes/DataTypeUUID.h>
#include <DataTypes/DataTypesNumber.h>
#include <Storages/VirtualColumnUtils.h>
#include <DataStreams/materializeBlock.h>
namespace ProfileEvents
{
@ -71,28 +72,30 @@ MergeTreeDataSelectExecutor::MergeTreeDataSelectExecutor(const MergeTreeData & d
}
/// Construct a block consisting only of possible values of virtual columns
static Block getBlockWithVirtualPartColumns(const MergeTreeData::DataPartsVector & parts, bool with_uuid)
Block MergeTreeDataSelectExecutor::getSampleBlockWithVirtualPartColumns()
{
auto part_column = ColumnString::create();
auto part_uuid_column = ColumnUUID::create();
return Block(std::initializer_list<ColumnWithTypeAndName>{
ColumnWithTypeAndName(ColumnString::create(), std::make_shared<DataTypeString>(), "_part"),
ColumnWithTypeAndName(ColumnString::create(), std::make_shared<DataTypeString>(), "_partition_id"),
ColumnWithTypeAndName(ColumnUUID::create(), std::make_shared<DataTypeUUID>(), "_part_uuid")});
}
void MergeTreeDataSelectExecutor::fillBlockWithVirtualPartColumns(const MergeTreeData::DataPartsVector & parts, Block & block)
{
MutableColumns columns = block.mutateColumns();
auto & part_column = columns[0];
auto & partition_id_column = columns[1];
auto & part_uuid_column = columns[2];
for (const auto & part : parts)
{
part_column->insert(part->name);
if (with_uuid)
part_uuid_column->insert(part->uuid);
partition_id_column->insert(part->info.partition_id);
part_uuid_column->insert(part->uuid);
}
if (with_uuid)
{
return Block(std::initializer_list<ColumnWithTypeAndName>{
ColumnWithTypeAndName(std::move(part_column), std::make_shared<DataTypeString>(), "_part"),
ColumnWithTypeAndName(std::move(part_uuid_column), std::make_shared<DataTypeUUID>(), "_part_uuid"),
});
}
return Block{ColumnWithTypeAndName(std::move(part_column), std::make_shared<DataTypeString>(), "_part")};
block.setColumns(std::move(columns));
}
@ -176,8 +179,6 @@ QueryPlanPtr MergeTreeDataSelectExecutor::readFromParts(
Names real_column_names;
size_t total_parts = parts.size();
bool part_column_queried = false;
bool part_uuid_column_queried = false;
bool sample_factor_column_queried = false;
Float64 used_sample_factor = 1;
@ -186,7 +187,6 @@ QueryPlanPtr MergeTreeDataSelectExecutor::readFromParts(
{
if (name == "_part")
{
part_column_queried = true;
virt_column_names.push_back(name);
}
else if (name == "_part_index")
@ -199,7 +199,6 @@ QueryPlanPtr MergeTreeDataSelectExecutor::readFromParts(
}
else if (name == "_part_uuid")
{
part_uuid_column_queried = true;
virt_column_names.push_back(name);
}
else if (name == "_sample_factor")
@ -219,12 +218,23 @@ QueryPlanPtr MergeTreeDataSelectExecutor::readFromParts(
if (real_column_names.empty())
real_column_names.push_back(ExpressionActions::getSmallestColumn(available_real_columns));
/// If `_part` or `_part_uuid` virtual columns are requested, we try to filter out data by them.
Block virtual_columns_block = getBlockWithVirtualPartColumns(parts, part_uuid_column_queried);
if (part_column_queried || part_uuid_column_queried)
VirtualColumnUtils::filterBlockWithQuery(query_info.query, virtual_columns_block, context);
std::unordered_set<String> part_values;
ASTPtr expression_ast;
auto virtual_columns_block = getSampleBlockWithVirtualPartColumns();
auto part_values = VirtualColumnUtils::extractSingleValueFromBlock<String>(virtual_columns_block, "_part");
// Generate valid expressions for filtering
VirtualColumnUtils::prepareFilterBlockWithQuery(query_info.query, context, virtual_columns_block, expression_ast);
// If there is still something left, fill the virtual block and do the filtering.
if (expression_ast)
{
fillBlockWithVirtualPartColumns(parts, virtual_columns_block);
VirtualColumnUtils::filterBlockWithQuery(query_info.query, virtual_columns_block, context, expression_ast);
part_values = VirtualColumnUtils::extractSingleValueFromBlock<String>(virtual_columns_block, "_part");
if (part_values.empty())
return std::make_unique<QueryPlan>();
}
// At this point, empty `part_values` means all parts.
metadata_snapshot->check(real_column_names, data.getVirtuals(), data.getStorageID());
@ -373,7 +383,7 @@ QueryPlanPtr MergeTreeDataSelectExecutor::readFromParts(
{
LOG_DEBUG(log, "Will use no data on this replica because parallel replicas processing has been requested"
" (the setting 'max_parallel_replicas') but the table does not support sampling and this replica is not the first.");
return {};
return std::make_unique<QueryPlan>();
}
bool use_sampling = relative_sample_size > 0 || (settings.parallel_replicas_count > 1 && data.supportsSampling());
@ -1894,7 +1904,7 @@ void MergeTreeDataSelectExecutor::selectPartsToRead(
for (const auto & part : prev_parts)
{
if (part_values.find(part->name) == part_values.end())
if (!part_values.empty() && part_values.find(part->name) == part_values.end())
continue;
if (part->isEmpty())
@ -1945,7 +1955,7 @@ void MergeTreeDataSelectExecutor::selectPartsToReadWithUUIDFilter(
for (const auto & part : prev_parts)
{
if (part_values.find(part->name) == part_values.end())
if (!part_values.empty() && part_values.find(part->name) == part_values.end())
continue;
if (part->isEmpty())

View File

@ -44,6 +44,12 @@ public:
unsigned num_streams,
const PartitionIdToMaxBlock * max_block_numbers_to_read = nullptr) const;
/// Construct a sample block consisting only of possible virtual columns for part pruning.
static Block getSampleBlockWithVirtualPartColumns();
/// Fill in values of possible virtual columns for part pruning.
static void fillBlockWithVirtualPartColumns(const MergeTreeData::DataPartsVector & parts, Block & block);
private:
const MergeTreeData & data;

View File

@ -206,18 +206,18 @@ MergeTreeData::DataPartPtr MergeTreePartsMover::clonePart(const MergeTreeMoveEnt
/// Try to fetch part from S3 without copy and fallback to default copy
/// if it's not possible
moving_part.part->assertOnDisk();
String path_to_clone = data->getRelativeDataPath() + directory_to_move + '/';
String path_to_clone = data->getRelativeDataPath() + directory_to_move + "/";
String relative_path = part->relative_path;
if (disk->exists(path_to_clone + relative_path))
{
LOG_WARNING(log, "Path " + fullPath(disk, path_to_clone + relative_path) + " already exists. Will remove it and clone again.");
disk->removeRecursive(path_to_clone + relative_path + '/');
disk->removeRecursive(path_to_clone + relative_path + "/");
}
disk->createDirectories(path_to_clone);
bool is_fetched = data->tryToFetchIfShared(*part, disk, path_to_clone + "/" + part->name);
if (!is_fetched)
part->volume->getDisk()->copy(data->getRelativeDataPath() + relative_path, disk, path_to_clone);
part->volume->getDisk()->removeFileIfExists(path_to_clone + '/' + IMergeTreeDataPart::DELETE_ON_DESTROY_MARKER_FILE_NAME);
part->volume->getDisk()->copy(data->getRelativeDataPath() + relative_path + "/", disk, path_to_clone);
part->volume->getDisk()->removeFileIfExists(path_to_clone + "/" + IMergeTreeDataPart::DELETE_ON_DESTROY_MARKER_FILE_NAME);
}
else
{

View File

@ -3,6 +3,7 @@
#include <Core/Types.h>
#include <Poco/Util/AbstractConfiguration.h>
#include "PostgreSQLConnectionPool.h"
#include <mutex>
namespace DB

View File

@ -219,8 +219,8 @@ Pipe StorageMerge::read(
/** First we make list of selected tables to find out its size.
* This is necessary to correctly pass the recommended number of threads to each table.
*/
StorageListWithLocks selected_tables = getSelectedTables(
query_info.query, has_table_virtual_column, context.getCurrentQueryId(), context.getSettingsRef());
StorageListWithLocks selected_tables
= getSelectedTables(query_info, has_table_virtual_column, context.getCurrentQueryId(), context.getSettingsRef());
if (selected_tables.empty())
/// FIXME: do we support sampling in this case?
@ -409,8 +409,9 @@ StorageMerge::StorageListWithLocks StorageMerge::getSelectedTables(const String
StorageMerge::StorageListWithLocks StorageMerge::getSelectedTables(
const ASTPtr & query, bool has_virtual_column, const String & query_id, const Settings & settings) const
const SelectQueryInfo & query_info, bool has_virtual_column, const String & query_id, const Settings & settings) const
{
const ASTPtr & query = query_info.query;
StorageListWithLocks selected_tables;
DatabaseTablesIteratorPtr iterator = getDatabaseIterator(global_context);
@ -438,7 +439,7 @@ StorageMerge::StorageListWithLocks StorageMerge::getSelectedTables(
if (has_virtual_column)
{
Block virtual_columns_block = Block{ColumnWithTypeAndName(std::move(virtual_column), std::make_shared<DataTypeString>(), "_table")};
VirtualColumnUtils::filterBlockWithQuery(query, virtual_columns_block, global_context);
VirtualColumnUtils::filterBlockWithQuery(query_info.query, virtual_columns_block, global_context);
auto values = VirtualColumnUtils::extractSingleValueFromBlock<String>(virtual_columns_block, "_table");
/// Remove unused tables from the list

View File

@ -59,7 +59,7 @@ private:
StorageListWithLocks getSelectedTables(const String & query_id, const Settings & settings) const;
StorageMerge::StorageListWithLocks getSelectedTables(
const ASTPtr & query, bool has_virtual_column, const String & query_id, const Settings & settings) const;
const SelectQueryInfo & query_info, bool has_virtual_column, const String & query_id, const Settings & settings) const;
template <typename F>
StoragePtr getFirstTable(F && predicate) const;

View File

@ -212,18 +212,8 @@ std::optional<UInt64> StorageMergeTree::totalRows(const Settings &) const
std::optional<UInt64> StorageMergeTree::totalRowsByPartitionPredicate(const SelectQueryInfo & query_info, const Context & context) const
{
auto metadata_snapshot = getInMemoryMetadataPtr();
PartitionPruner partition_pruner(metadata_snapshot->getPartitionKey(), query_info, context, true /* strict */);
if (partition_pruner.isUseless())
return {};
size_t res = 0;
auto lock = lockParts();
for (const auto & part : getDataPartsStateRange(DataPartState::Committed))
{
if (!partition_pruner.canBePruned(part))
res += part->rows_count;
}
return res;
auto parts = getDataPartsVector({DataPartState::Committed});
return totalRowsByPartitionPredicateImpl(query_info, context, parts);
}
std::optional<UInt64> StorageMergeTree::totalBytes(const Settings &) const

View File

@ -1663,9 +1663,10 @@ bool StorageReplicatedMergeTree::tryExecutePartMutation(const StorageReplicatedM
if (source_part->name != source_part_name)
{
throw Exception("Part " + source_part_name + " is covered by " + source_part->name
+ " but should be mutated to " + entry.new_part_name + ". This is a bug.",
ErrorCodes::LOGICAL_ERROR);
LOG_WARNING(log, "Part " + source_part_name + " is covered by " + source_part->name
+ " but should be mutated to " + entry.new_part_name + ". "
+ "Possibly the mutation of this part is not needed and will be skipped. This shouldn't happen often.");
return false;
}
/// TODO - some better heuristic?
@ -4115,17 +4116,9 @@ std::optional<UInt64> StorageReplicatedMergeTree::totalRows(const Settings & set
std::optional<UInt64> StorageReplicatedMergeTree::totalRowsByPartitionPredicate(const SelectQueryInfo & query_info, const Context & context) const
{
auto metadata_snapshot = getInMemoryMetadataPtr();
PartitionPruner partition_pruner(metadata_snapshot->getPartitionKey(), query_info, context, true /* strict */);
if (partition_pruner.isUseless())
return {};
size_t res = 0;
foreachCommittedParts([&](auto & part)
{
if (!partition_pruner.canBePruned(part))
res += part->rows_count;
}, context.getSettingsRef().select_sequential_consistency);
return res;
DataPartsVector parts;
foreachCommittedParts([&](auto & part) { parts.push_back(part); }, context.getSettingsRef().select_sequential_consistency);
return totalRowsByPartitionPredicateImpl(query_info, context, parts);
}
std::optional<UInt64> StorageReplicatedMergeTree::totalBytes(const Settings & settings) const

View File

@ -64,6 +64,8 @@ NamesAndTypesList StorageSystemProcesses::getNamesAndTypes()
{"ProfileEvents.Values", std::make_shared<DataTypeArray>(std::make_shared<DataTypeUInt64>())},
{"Settings.Names", std::make_shared<DataTypeArray>(std::make_shared<DataTypeString>())},
{"Settings.Values", std::make_shared<DataTypeArray>(std::make_shared<DataTypeString>())},
{"current_database", std::make_shared<DataTypeString>()},
};
}
@ -149,6 +151,8 @@ void StorageSystemProcesses::fillData(MutableColumns & res_columns, const Contex
column_settings_values->insertDefault();
}
}
res_columns[i++]->insert(process.current_database);
}
}

View File

@ -62,7 +62,7 @@ StorageSystemTables::StorageSystemTables(const StorageID & table_id_)
}
static ColumnPtr getFilteredDatabases(const ASTPtr & query, const Context & context)
static ColumnPtr getFilteredDatabases(const SelectQueryInfo & query_info, const Context & context)
{
MutableColumnPtr column = ColumnString::create();
@ -76,7 +76,7 @@ static ColumnPtr getFilteredDatabases(const ASTPtr & query, const Context & cont
}
Block block { ColumnWithTypeAndName(std::move(column), std::make_shared<DataTypeString>(), "database") };
VirtualColumnUtils::filterBlockWithQuery(query, block, context);
VirtualColumnUtils::filterBlockWithQuery(query_info.query, block, context);
return block.getByPosition(0).column;
}
@ -525,7 +525,7 @@ Pipe StorageSystemTables::read(
}
}
ColumnPtr filtered_databases_column = getFilteredDatabases(query_info.query, context);
ColumnPtr filtered_databases_column = getFilteredDatabases(query_info, context);
return Pipe(std::make_shared<TablesBlockSource>(
std::move(columns_mask), std::move(res_block), max_block_size, std::move(filtered_databases_column), context));

View File

@ -5,12 +5,14 @@
#include <Interpreters/ExpressionAnalyzer.h>
#include <Interpreters/ExpressionActions.h>
#include <Interpreters/IdentifierSemantic.h>
#include <Interpreters/misc.h>
#include <Parsers/ASTIdentifier.h>
#include <Parsers/ASTExpressionList.h>
#include <Parsers/ASTLiteral.h>
#include <Parsers/ASTFunction.h>
#include <Parsers/ASTSelectQuery.h>
#include <Parsers/ASTSubquery.h>
#include <Columns/ColumnsNumber.h>
#include <Columns/ColumnsCommon.h>
@ -19,6 +21,7 @@
#include <Storages/VirtualColumnUtils.h>
#include <IO/WriteHelpers.h>
#include <Common/typeid_cast.h>
#include <Interpreters/ActionsVisitor.h>
namespace DB
@ -28,31 +31,36 @@ namespace
{
/// Verifying that the function depends only on the specified columns
bool isValidFunction(const ASTPtr & expression, const NameSet & columns)
bool isValidFunction(const ASTPtr & expression, const std::function<bool(const ASTPtr &)> & is_constant)
{
for (const auto & child : expression->children)
if (!isValidFunction(child, columns))
return false;
if (auto opt_name = IdentifierSemantic::getColumnName(expression))
return columns.count(*opt_name);
return true;
const auto * function = expression->as<ASTFunction>();
if (function && functionIsInOrGlobalInOperator(function->name))
{
// Second argument of IN can be a scalar subquery
return isValidFunction(function->arguments->children[0], is_constant);
}
else
return is_constant(expression);
}
/// Extract all subfunctions of the main conjunction, but depending only on the specified columns
void extractFunctions(const ASTPtr & expression, const NameSet & columns, std::vector<ASTPtr> & result)
bool extractFunctions(const ASTPtr & expression, const std::function<bool(const ASTPtr &)> & is_constant, std::vector<ASTPtr> & result)
{
const auto * function = expression->as<ASTFunction>();
if (function && function->name == "and")
if (function && (function->name == "and" || function->name == "indexHint"))
{
bool ret = true;
for (const auto & child : function->arguments->children)
extractFunctions(child, columns, result);
ret &= extractFunctions(child, is_constant, result);
return ret;
}
else if (isValidFunction(expression, columns))
else if (isValidFunction(expression, is_constant))
{
result.push_back(expression->clone());
return true;
}
else
return false;
}
/// Construct a conjunction from given functions
@ -65,6 +73,25 @@ ASTPtr buildWhereExpression(const ASTs & functions)
return makeASTFunction("and", functions);
}
void buildSets(const ASTPtr & expression, ExpressionAnalyzer & analyzer)
{
const auto * func = expression->as<ASTFunction>();
if (func && functionIsInOrGlobalInOperator(func->name))
{
const IAST & args = *func->arguments;
const ASTPtr & arg = args.children.at(1);
if (arg->as<ASTSubquery>() || arg->as<ASTIdentifier>())
{
analyzer.tryMakeSetForIndexFromSubquery(arg);
}
}
else
{
for (const auto & child : expression->children)
buildSets(child, analyzer);
}
}
}
namespace VirtualColumnUtils
@ -76,7 +103,6 @@ void rewriteEntityInAst(ASTPtr ast, const String & column_name, const Field & va
if (!select.with())
select.setExpression(ASTSelectQuery::Expression::WITH, std::make_shared<ASTExpressionList>());
if (func.empty())
{
auto literal = std::make_shared<ASTLiteral>(value);
@ -96,30 +122,63 @@ void rewriteEntityInAst(ASTPtr ast, const String & column_name, const Field & va
}
}
void filterBlockWithQuery(const ASTPtr & query, Block & block, const Context & context)
bool prepareFilterBlockWithQuery(const ASTPtr & query, const Context & context, Block block, ASTPtr & expression_ast)
{
bool unmodified = true;
const auto & select = query->as<ASTSelectQuery &>();
if (!select.where() && !select.prewhere())
return;
return unmodified;
NameSet columns;
for (const auto & it : block.getNamesAndTypesList())
columns.insert(it.name);
ASTPtr condition_ast;
if (select.prewhere() && select.where())
condition_ast = makeASTFunction("and", select.prewhere()->clone(), select.where()->clone());
else
condition_ast = select.prewhere() ? select.prewhere()->clone() : select.where()->clone();
/// We will create an expression that evaluates the expressions in WHERE and PREWHERE, depending only on the existing columns.
// Prepare a constant block with valid expressions
for (size_t i = 0; i < block.columns(); ++i)
block.getByPosition(i).column = block.getByPosition(i).type->createColumnConstWithDefaultValue(1);
// Provide input columns as constant columns to check if an expression is constant.
std::function<bool(const ASTPtr &)> is_constant = [&block, &context](const ASTPtr & node)
{
auto actions = std::make_shared<ActionsDAG>(block.getColumnsWithTypeAndName());
PreparedSets prepared_sets;
SubqueriesForSets subqueries_for_sets;
ActionsVisitor::Data visitor_data(
context, SizeLimits{}, 1, {}, std::move(actions), prepared_sets, subqueries_for_sets, true, true, true, false);
ActionsVisitor(visitor_data).visit(node);
actions = visitor_data.getActions();
auto expression_actions = std::make_shared<ExpressionActions>(actions);
auto block_with_constants = block;
expression_actions->execute(block_with_constants);
auto column_name = node->getColumnName();
return block_with_constants.has(column_name) && isColumnConst(*block_with_constants.getByName(column_name).column);
};
/// Create an expression that evaluates the expressions in WHERE and PREWHERE, depending only on the existing columns.
std::vector<ASTPtr> functions;
if (select.where())
extractFunctions(select.where(), columns, functions);
unmodified &= extractFunctions(select.where(), is_constant, functions);
if (select.prewhere())
extractFunctions(select.prewhere(), columns, functions);
unmodified &= extractFunctions(select.prewhere(), is_constant, functions);
expression_ast = buildWhereExpression(functions);
return unmodified;
}
void filterBlockWithQuery(const ASTPtr & query, Block & block, const Context & context, ASTPtr expression_ast)
{
if (!expression_ast)
prepareFilterBlockWithQuery(query, context, block, expression_ast);
ASTPtr expression_ast = buildWhereExpression(functions);
if (!expression_ast)
return;
/// Let's analyze and calculate the expression.
/// Let's analyze and calculate the prepared expression.
auto syntax_result = TreeRewriter(context).analyze(expression_ast, block.getNamesAndTypesList());
ExpressionAnalyzer analyzer(expression_ast, syntax_result, context);
buildSets(expression_ast, analyzer);
ExpressionActionsPtr actions = analyzer.getActions(false);
Block block_with_filter = block;

View File

@ -4,6 +4,7 @@
#include <Core/Block.h>
#include <Parsers/IAST_fwd.h>
#include <Storages/SelectQueryInfo.h>
namespace DB
@ -24,9 +25,14 @@ namespace VirtualColumnUtils
/// - `WITH toUInt16(9000) as _port`.
void rewriteEntityInAst(ASTPtr ast, const String & column_name, const Field & value, const String & func = "");
/// Prepare `expression_ast` to filter block. Returns true if `expression_ast` is not trimmed, that is,
/// `block` provides all needed columns for `expression_ast`, else return false.
bool prepareFilterBlockWithQuery(const ASTPtr & query, const Context & context, Block block, ASTPtr & expression_ast);
/// Leave in the block only the rows that fit under the WHERE clause and the PREWHERE clause of the query.
/// Only elements of the outer conjunction are considered, depending only on the columns present in the block.
void filterBlockWithQuery(const ASTPtr & query, Block & block, const Context & context);
/// If `expression_ast` is passed, use it to filter block.
void filterBlockWithQuery(const ASTPtr & query, Block & block, const Context & context, ASTPtr expression_ast = {});
/// Extract from the input stream a set of `name` column values
template <typename T>

View File

@ -1,5 +1,6 @@
#!/usr/bin/env python3
import shutil
import sys
import os
import os.path
@ -112,13 +113,14 @@ def get_db_engine(args, database_name):
return " ENGINE=" + args.db_engine
return "" # Will use default engine
def run_single_test(args, ext, server_logs_level, client_options, case_file, stdout_file, stderr_file):
def run_single_test(args, ext, server_logs_level, client_options, case_file, stdout_file, stderr_file, suite_tmp_dir):
# print(client_options)
start_time = datetime.now()
if args.database:
database = args.database
os.environ.setdefault("CLICKHOUSE_DATABASE", database)
os.environ.setdefault("CLICKHOUSE_TMP", suite_tmp_dir)
else:
# If --database is not specified, we will create temporary database with unique name
@ -136,6 +138,12 @@ def run_single_test(args, ext, server_logs_level, client_options, case_file, std
return clickhouse_proc_create, "", "Timeout creating database {} before test".format(database), total_time
os.environ["CLICKHOUSE_DATABASE"] = database
# Set temporary directory to match the randomly generated database,
# because .sh tests also use it for temporary files and we want to avoid
# collisions.
test_tmp_dir = os.path.join(suite_tmp_dir, database)
os.mkdir(test_tmp_dir)
os.environ.setdefault("CLICKHOUSE_TMP", test_tmp_dir)
# This is for .sh tests
os.environ["CLICKHOUSE_LOG_COMMENT"] = case_file
@ -170,7 +178,7 @@ def run_single_test(args, ext, server_logs_level, client_options, case_file, std
if need_drop_database:
clickhouse_proc_create = Popen(shlex.split(args.client), stdin=PIPE, stdout=PIPE, stderr=PIPE, universal_newlines=True)
seconds_left = max(args.timeout - (datetime.now() - start_time).total_seconds(), 10)
seconds_left = max(args.timeout - (datetime.now() - start_time).total_seconds(), 20)
try:
clickhouse_proc_create.communicate(("DROP DATABASE " + database), timeout=seconds_left)
except TimeoutExpired:
@ -185,6 +193,8 @@ def run_single_test(args, ext, server_logs_level, client_options, case_file, std
total_time = (datetime.now() - start_time).total_seconds()
return clickhouse_proc_create, "", "Timeout dropping database {} after test".format(database), total_time
shutil.rmtree(test_tmp_dir)
total_time = (datetime.now() - start_time).total_seconds()
# Normalize randomized database names in stdout, stderr files.
@ -207,7 +217,7 @@ def need_retry(stderr):
def get_processlist(args):
try:
clickhouse_proc = Popen(shlex.split(args.client), stdin=PIPE, stdout=PIPE, stderr=PIPE)
(stdout, _) = clickhouse_proc.communicate((b"SHOW PROCESSLIST FORMAT Vertical"), timeout=10)
(stdout, _) = clickhouse_proc.communicate((b"SHOW PROCESSLIST FORMAT Vertical"), timeout=20)
return False, stdout.decode('utf-8')
except Exception as ex:
print("Exception", ex)
@ -352,7 +362,7 @@ def run_tests_array(all_tests_with_params):
clickhouse_proc = Popen(shlex.split(args.client), stdin=PIPE, stdout=PIPE, stderr=PIPE, universal_newlines=True)
failed_to_check = False
try:
clickhouse_proc.communicate(("SELECT 'Running test {suite}/{case} from pid={pid}';".format(pid = os.getpid(), case = case, suite = suite)), timeout=10)
clickhouse_proc.communicate(("SELECT 'Running test {suite}/{case} from pid={pid}';".format(pid = os.getpid(), case = case, suite = suite)), timeout=20)
except:
failed_to_check = True
@ -367,7 +377,7 @@ def run_tests_array(all_tests_with_params):
stdout_file = os.path.join(suite_tmp_dir, name) + file_suffix + '.stdout'
stderr_file = os.path.join(suite_tmp_dir, name) + file_suffix + '.stderr'
proc, stdout, stderr, total_time = run_single_test(args, ext, server_logs_level, client_options, case_file, stdout_file, stderr_file)
proc, stdout, stderr, total_time = run_single_test(args, ext, server_logs_level, client_options, case_file, stdout_file, stderr_file, suite_tmp_dir)
if proc.returncode is None:
try:
@ -385,7 +395,7 @@ def run_tests_array(all_tests_with_params):
else:
counter = 1
while proc.returncode != 0 and need_retry(stderr):
proc, stdout, stderr, total_time = run_single_test(args, ext, server_logs_level, client_options, case_file, stdout_file, stderr_file)
proc, stdout, stderr, total_time = run_single_test(args, ext, server_logs_level, client_options, case_file, stdout_file, stderr_file, suite_tmp_dir)
sleep(2**counter)
counter += 1
if counter > 6:
@ -448,7 +458,7 @@ def run_tests_array(all_tests_with_params):
failures_chain += 1
status += MSG_FAIL
status += print_test_time(total_time)
status += " - Long test not marked as 'long'"
status += " - Test runs too long (> 30s). Make it faster."
else:
passed_total += 1
failures_chain = 0
@ -649,7 +659,6 @@ def main(args):
os.environ.setdefault("CLICKHOUSE_CONFIG", args.configserver)
if args.configclient:
os.environ.setdefault("CLICKHOUSE_CONFIG_CLIENT", args.configclient)
os.environ.setdefault("CLICKHOUSE_TMP", tmp_dir)
# Force to print server warnings in stderr
# Shell scripts could change logging level

View File

@ -17,6 +17,16 @@
</main>
</volumes>
</s3>
<hybrid>
<volumes>
<main>
<disk>default</disk>
</main>
<external>
<disk>s31</disk>
</external>
</volumes>
</hybrid>
</policies>
</storage_configuration>

View File

@ -88,3 +88,46 @@ def test_s3_zero_copy_replication(cluster, policy):
node1.query("DROP TABLE IF EXISTS s3_test NO DELAY")
node2.query("DROP TABLE IF EXISTS s3_test NO DELAY")
def test_s3_zero_copy_on_hybrid_storage(cluster):
node1 = cluster.instances["node1"]
node2 = cluster.instances["node2"]
node1.query(
"""
CREATE TABLE hybrid_test ON CLUSTER test_cluster (id UInt32, value String)
ENGINE=ReplicatedMergeTree('/clickhouse/tables/s3_test', '{}')
ORDER BY id
SETTINGS storage_policy='hybrid'
"""
.format('{replica}')
)
node1.query("INSERT INTO hybrid_test VALUES (0,'data'),(1,'data')")
time.sleep(1)
assert node1.query("SELECT * FROM hybrid_test ORDER BY id FORMAT Values") == "(0,'data'),(1,'data')"
assert node2.query("SELECT * FROM hybrid_test ORDER BY id FORMAT Values") == "(0,'data'),(1,'data')"
assert node1.query("SELECT partition_id,disk_name FROM system.parts WHERE table='hybrid_test' FORMAT Values") == "('all','default')"
assert node2.query("SELECT partition_id,disk_name FROM system.parts WHERE table='hybrid_test' FORMAT Values") == "('all','default')"
node1.query("ALTER TABLE hybrid_test MOVE PARTITION ID 'all' TO DISK 's31'")
assert node1.query("SELECT partition_id,disk_name FROM system.parts WHERE table='hybrid_test' FORMAT Values") == "('all','s31')"
assert node2.query("SELECT partition_id,disk_name FROM system.parts WHERE table='hybrid_test' FORMAT Values") == "('all','default')"
# Total objects in S3
s3_objects = get_large_objects_count(cluster, 0)
node2.query("ALTER TABLE hybrid_test MOVE PARTITION ID 'all' TO DISK 's31'")
assert node1.query("SELECT partition_id,disk_name FROM system.parts WHERE table='hybrid_test' FORMAT Values") == "('all','s31')"
assert node2.query("SELECT partition_id,disk_name FROM system.parts WHERE table='hybrid_test' FORMAT Values") == "('all','s31')"
# Check that after moving partition on node2 no new obects on s3
assert get_large_objects_count(cluster, 0) == s3_objects
assert node1.query("SELECT * FROM hybrid_test ORDER BY id FORMAT Values") == "(0,'data'),(1,'data')"
assert node2.query("SELECT * FROM hybrid_test ORDER BY id FORMAT Values") == "(0,'data'),(1,'data')"

View File

@ -59,12 +59,12 @@ def test(started_cluster):
start = time.time()
NODES['node1'].query_and_get_error('SELECT * FROM distributed_table settings receive_timeout=5, use_hedged_requests=0, async_socket_for_remote=0;')
NODES['node1'].query_and_get_error('SELECT * FROM distributed_table settings receive_timeout=5, send_timeout=5, use_hedged_requests=0, async_socket_for_remote=0;')
end = time.time()
assert end - start < 10
start = time.time()
error = NODES['node1'].query_and_get_error('SELECT * FROM distributed_table settings receive_timeout=5, use_hedged_requests=0;')
error = NODES['node1'].query_and_get_error('SELECT * FROM distributed_table settings receive_timeout=5, send_timeout=5, use_hedged_requests=0;')
end = time.time()
assert end - start < 10
@ -73,7 +73,7 @@ def test(started_cluster):
assert error.find('DB::ReadBufferFromPocoSocket::nextImpl()') == -1
start = time.time()
error = NODES['node1'].query_and_get_error('SELECT * FROM distributed_table settings receive_timeout=5;')
error = NODES['node1'].query_and_get_error('SELECT * FROM distributed_table settings receive_timeout=5, send_timeout=5;')
end = time.time()
assert end - start < 10

View File

@ -8,11 +8,6 @@ fun:tolower
# Suppress some failures in contrib so that we can enable MSan in CI.
# Ideally, we should report these upstream.
src:*/contrib/zlib-ng/*
src:*/contrib/simdjson/*
src:*/contrib/lz4/*
# Hyperscan
fun:roseRunProgram
# mariadbclient, NSS functions from libc
fun:_nss_files_parse_servent

View File

@ -11,7 +11,7 @@
<fill_query>INSERT INTO hits_none SELECT WatchID FROM test.hits</fill_query>
<fill_query>OPTIMIZE TABLE hits_none FINAL</fill_query>
<query><![CDATA[SELECT sum(WatchID) FROM hits_none]]></query>
<query short="1"><![CDATA[SELECT sum(WatchID) FROM hits_none]]></query>
<drop_query>DROP TABLE hits_none</drop_query>
</test>

View File

@ -48,7 +48,7 @@ SELECT
threads_realtime >= threads_time_user_system_io,
any(length(thread_ids)) >= 1
FROM
(SELECT * FROM system.query_log PREWHERE query='$heavy_cpu_query' WHERE event_date >= today()-1 AND current_database = currentDatabase() AND type=2 ORDER BY event_time DESC LIMIT 1)
(SELECT * FROM system.query_log PREWHERE query='$heavy_cpu_query' WHERE event_date >= today()-2 AND current_database = currentDatabase() AND type=2 ORDER BY event_time DESC LIMIT 1)
ARRAY JOIN ProfileEvents.Names AS PN, ProfileEvents.Values AS PV"
# Clean

View File

@ -65,7 +65,7 @@ echo '5.1'
# wait until the query in background will start (max: 10 seconds as sleepEachRow)
for _ in {1..100}; do
$CLICKHOUSE_CLIENT --query="SHOW PROCESSLIST" --log_queries=0 >"$tmp_file" 2>&1
$CLICKHOUSE_CLIENT --query="SELECT * FROM system.processes WHERE current_database = currentDatabase()" --log_queries=0 >"$tmp_file" 2>&1
grep -q -F 'fwerkh_that_magic_string_make_me_unique' "$tmp_file" && break
sleep 0.1
done
@ -97,7 +97,7 @@ echo 7
# and finally querylog
$CLICKHOUSE_CLIENT \
--server_logs_file=/dev/null \
--query="select * from system.query_log where current_database = currentDatabase() AND event_time > now() - 10 and query like '%TOPSECRET%';"
--query="select * from system.query_log where current_database = currentDatabase() AND event_date >= yesterday() and query like '%TOPSECRET%';"
rm -f "$tmp_file" >/dev/null 2>&1
@ -118,8 +118,8 @@ $CLICKHOUSE_CLIENT --query="SYSTEM FLUSH LOGS" --server_logs_file=/dev/null
echo 9
$CLICKHOUSE_CLIENT \
--server_logs_file=/dev/null \
--query="SELECT if( count() > 0, 'text_log non empty', 'text_log empty') FROM system.text_log WHERE event_time>now() - 60 and message like '%find_me%';
select * from system.text_log where event_time > now() - 60 and message like '%TOPSECRET=TOPSECRET%';" --ignore-error --multiquery
--query="SELECT if( count() > 0, 'text_log non empty', 'text_log empty') FROM system.text_log WHERE event_date >= yesterday() and message like '%find_me%';
select * from system.text_log where event_date >= yesterday() and message like '%TOPSECRET=TOPSECRET%';" --ignore-error --multiquery
echo 'finish'
rm -f "$tmp_file" >/dev/null 2>&1

View File

@ -10,7 +10,8 @@ SELECT count() FROM merge_tree;
SET max_rows_to_read = 900000;
SELECT count() FROM merge_tree WHERE not ignore(); -- { serverError 158 }
SELECT count() FROM merge_tree WHERE not ignore(); -- { serverError 158 }
-- constant ignore will be pruned by part pruner. ignore(*) is used.
SELECT count() FROM merge_tree WHERE not ignore(*); -- { serverError 158 }
SELECT count() FROM merge_tree WHERE not ignore(*); -- { serverError 158 }
DROP TABLE merge_tree;

View File

@ -2,26 +2,33 @@ set log_queries=1;
select '01231_log_queries_min_type/QUERY_START';
system flush logs;
select count() from system.query_log where current_database = currentDatabase() and query like '%01231_log_queries_min_type/QUERY_START%' and query not like '%system.query_log%' and event_date = today() and event_time >= now() - interval 1 minute;
select count() from system.query_log where current_database = currentDatabase()
and query like 'select \'01231_log_queries_min_type/QUERY_START%'
and event_date >= yesterday();
set log_queries_min_type='EXCEPTION_BEFORE_START';
select '01231_log_queries_min_type/EXCEPTION_BEFORE_START';
system flush logs;
select count() from system.query_log where current_database = currentDatabase() and query like '%01231_log_queries_min_type/EXCEPTION_BEFORE_START%' and query not like '%system.query_log%' and event_date = today() and event_time >= now() - interval 1 minute;
select count() from system.query_log where current_database = currentDatabase()
and query like 'select \'01231_log_queries_min_type/EXCEPTION_BEFORE_START%'
and event_date >= yesterday();
set max_rows_to_read='100K';
set log_queries_min_type='EXCEPTION_WHILE_PROCESSING';
select '01231_log_queries_min_type/EXCEPTION_WHILE_PROCESSING', max(number) from system.numbers limit 1e6; -- { serverError 158; }
set max_rows_to_read=0;
system flush logs;
select count() from system.query_log where current_database = currentDatabase() and query like '%01231_log_queries_min_type/EXCEPTION_WHILE_PROCESSING%' and query not like '%system.query_log%' and event_date = today() and event_time >= now() - interval 1 minute and type = 'ExceptionWhileProcessing';
select count() from system.query_log where current_database = currentDatabase()
and query like 'select \'01231_log_queries_min_type/EXCEPTION_WHILE_PROCESSING%'
and event_date >= yesterday() and type = 'ExceptionWhileProcessing';
set max_rows_to_read='100K';
select '01231_log_queries_min_type w/ Settings/EXCEPTION_WHILE_PROCESSING', max(number) from system.numbers limit 1e6; -- { serverError 158; }
system flush logs;
set max_rows_to_read=0;
select count() from system.query_log where
current_database = currentDatabase() and
query like '%01231_log_queries_min_type w/ Settings/EXCEPTION_WHILE_PROCESSING%' and
query not like '%system.query_log%' and
event_date = today() and
event_time >= now() - interval 1 minute and
query like 'select \'01231_log_queries_min_type w/ Settings/EXCEPTION_WHILE_PROCESSING%' and
event_date >= yesterday() and
type = 'ExceptionWhileProcessing' and
has(Settings.Names, 'max_rows_to_read');

View File

@ -9,12 +9,12 @@ $CLICKHOUSE_CLIENT --multiquery --query "
CREATE TABLE bug (UserID UInt64, Date Date) ENGINE = MergeTree ORDER BY Date;
INSERT INTO bug SELECT rand64(), '2020-06-07' FROM numbers(50000000);
OPTIMIZE TABLE bug FINAL;"
LOG="$CLICKHOUSE_TMP/err-$CLICKHOUSE_DATABASE"
$CLICKHOUSE_BENCHMARK --iterations 10 --max_threads 100 --min_bytes_to_use_direct_io 1 <<< "SELECT sum(UserID) FROM bug PREWHERE NOT ignore(Date)" 1>/dev/null 2>"$LOG"
cat "$LOG" | grep Exception
cat "$LOG" | grep Loaded
$CLICKHOUSE_BENCHMARK --iterations 10 --max_threads 100 --min_bytes_to_use_direct_io 1 <<< "SELECT sum(UserID) FROM bug PREWHERE NOT ignore(Date)" 1>/dev/null 2>"$CLICKHOUSE_TMP"/err
cat "$CLICKHOUSE_TMP"/err | grep Exception
cat "$CLICKHOUSE_TMP"/err | grep Loaded
rm "$CLICKHOUSE_TMP"/err
rm "$LOG"
$CLICKHOUSE_CLIENT --multiquery --query "
DROP TABLE bug;"

View File

@ -6,6 +6,6 @@ SET min_bytes_to_use_mmap_io = 1;
SELECT * FROM test_01344 WHERE x = 'Hello, world';
SYSTEM FLUSH LOGS;
SELECT PE.Values FROM system.query_log ARRAY JOIN ProfileEvents AS PE WHERE current_database = currentDatabase() AND event_date >= yesterday() AND event_time >= now() - 300 AND query LIKE 'SELECT * FROM test_01344 WHERE x = ''Hello, world''%' AND PE.Names = 'CreatedReadBufferMMap' AND type = 2 ORDER BY event_time DESC LIMIT 1;
SELECT PE.Values FROM system.query_log ARRAY JOIN ProfileEvents AS PE WHERE current_database = currentDatabase() AND event_date >= yesterday() AND query LIKE 'SELECT * FROM test_01344 WHERE x = ''Hello, world''%' AND PE.Names = 'CreatedReadBufferMMap' AND type = 2 ORDER BY event_time DESC LIMIT 1;
DROP TABLE test_01344;

View File

@ -17,7 +17,7 @@ CREATE MATERIALIZED VIEW slow_log Engine=Memory AS
extract(query,'/\\*\\s*QUERY_GROUP_ID:(.*?)\\s*\\*/') as QUERY_GROUP_ID,
*
FROM system.query_log
WHERE type<>1 and event_date >= yesterday() and event_time > now() - 120
WHERE type<>1 and event_date >= yesterday()
) as ql
INNER JOIN expected_times USING (QUERY_GROUP_ID)
WHERE query_duration_ms > max_query_duration_ms
@ -38,7 +38,7 @@ SELECT
extract(query,'/\\*\\s*QUERY_GROUP_ID:(.*?)\\s*\\*/') as QUERY_GROUP_ID,
count()
FROM system.query_log
WHERE current_database = currentDatabase() AND type<>1 and event_date >= yesterday() and event_time > now() - 20 and QUERY_GROUP_ID<>''
WHERE current_database = currentDatabase() AND type<>1 and event_date >= yesterday() and QUERY_GROUP_ID<>''
GROUP BY QUERY_GROUP_ID
ORDER BY QUERY_GROUP_ID;

View File

@ -10,7 +10,7 @@ SYSTEM FLUSH LOGS;
SELECT ProfileEvents.Values[indexOf(ProfileEvents.Names, 'FileOpen')]
FROM system.query_log
WHERE (type = 'QueryFinish') AND (lower(query) LIKE lower('SELECT a.size0 FROM %t_arr%'))
AND event_time > now() - INTERVAL 10 SECOND AND current_database = currentDatabase();
AND current_database = currentDatabase();
SELECT '====tuple====';
DROP TABLE IF EXISTS t_tup;
@ -27,7 +27,7 @@ SYSTEM FLUSH LOGS;
SELECT ProfileEvents.Values[indexOf(ProfileEvents.Names, 'FileOpen')]
FROM system.query_log
WHERE (type = 'QueryFinish') AND (lower(query) LIKE lower('SELECT t._ FROM %t_tup%'))
AND event_time > now() - INTERVAL 10 SECOND AND current_database = currentDatabase();
AND current_database = currentDatabase();
SELECT '====nullable====';
DROP TABLE IF EXISTS t_nul;
@ -41,7 +41,7 @@ SYSTEM FLUSH LOGS;
SELECT ProfileEvents.Values[indexOf(ProfileEvents.Names, 'FileOpen')]
FROM system.query_log
WHERE (type = 'QueryFinish') AND (lower(query) LIKE lower('SELECT n.null FROM %t_nul%'))
AND event_time > now() - INTERVAL 10 SECOND AND current_database = currentDatabase();
AND current_database = currentDatabase();
SELECT '====map====';
SET allow_experimental_map_type = 1;
@ -60,7 +60,7 @@ SYSTEM FLUSH LOGS;
SELECT ProfileEvents.Values[indexOf(ProfileEvents.Names, 'FileOpen')]
FROM system.query_log
WHERE (type = 'QueryFinish') AND (lower(query) LIKE lower('SELECT m.% FROM %t_map%'))
AND event_time > now() - INTERVAL 10 SECOND AND current_database = currentDatabase();
AND current_database = currentDatabase();
DROP TABLE t_arr;
DROP TABLE t_nul;

View File

@ -1,20 +1,20 @@
set log_queries=1;
set log_queries_min_type='QUERY_FINISH';
set enable_global_with_statement=1;
set enable_global_with_statement=0;
select /* test=01531, enable_global_with_statement=0 */ 2;
system flush logs;
select count() from system.query_log
where event_time >= now() - interval 5 minute
and query like '%select /* test=01531, enable_global_with_statement=0 */ 2%'
where event_date >= yesterday()
and query like 'select /* test=01531, enable_global_with_statement=0 */ 2%'
and current_database = currentDatabase()
;
set enable_global_with_statement=1;
select /* test=01531 enable_global_with_statement=1 */ 2;
select /* test=01531, enable_global_with_statement=1 */ 2;
system flush logs;
select count() from system.query_log
where event_time >= now() - interval 5 minute
and query like '%select /* test=01531 enable_global_with_statement=1 */ 2%'
where event_date >= yesterday()
and query like 'select /* test=01531, enable_global_with_statement=1 */ 2%'
and current_database = currentDatabase()
;

View File

@ -13,16 +13,16 @@ col3
read files
4
6
0 899984 7199412
1 899987 7199877
2 899990 7200255
3 899993 7199883
4 899996 7199798
5 899999 7200306
6 900002 7200064
7 900005 7199429
8 900008 7200067
9 899992 7199993
0 89982 719752
1 89988 720017
2 89994 720152
3 90000 720157
4 90006 720100
5 90012 720168
6 90018 720106
7 90005 719891
8 89992 719854
9 89979 719706
0 []
0 [0]
1 [0,2]

View File

@ -36,7 +36,7 @@ SYSTEM FLUSH LOGS;
SELECT ProfileEvents.Values[indexOf(ProfileEvents.Names, 'FileOpen')]
FROM system.query_log
WHERE (type = 'QueryFinish') AND (lower(query) LIKE lower('SELECT col1.a FROM %nested%'))
AND event_time > now() - INTERVAL 10 SECOND AND current_database = currentDatabase();
AND event_date >= yesterday() AND current_database = currentDatabase();
SYSTEM DROP MARK CACHE;
SELECT col3.n2.s FROM nested FORMAT Null;
@ -46,7 +46,7 @@ SYSTEM FLUSH LOGS;
SELECT ProfileEvents.Values[indexOf(ProfileEvents.Names, 'FileOpen')]
FROM system.query_log
WHERE (type = 'QueryFinish') AND (lower(query) LIKE lower('SELECT col3.n2.s FROM %nested%'))
AND event_time > now() - INTERVAL 10 SECOND AND current_database = currentDatabase();
AND event_date >= yesterday() AND current_database = currentDatabase();
DROP TABLE nested;
@ -59,7 +59,7 @@ ENGINE = MergeTree
ORDER BY id
SETTINGS min_bytes_for_wide_part = 0;
INSERT INTO nested SELECT number, arrayMap(x -> (x, arrayMap(y -> (toString(y * x), y + x), range(number % 17))), range(number % 19)) FROM numbers(1000000);
INSERT INTO nested SELECT number, arrayMap(x -> (x, arrayMap(y -> (toString(y * x), y + x), range(number % 17))), range(number % 19)) FROM numbers(100000);
SELECT id % 10, sum(length(col1)), sumArray(arrayMap(x -> length(x), col1.n.b)) FROM nested GROUP BY id % 10;
SELECT arraySum(col1.a), arrayMap(x -> x * x * 2, col1.a) FROM nested ORDER BY id LIMIT 5;

View File

@ -12,19 +12,15 @@ system flush logs;
select count()
from system.query_log
where
query like '%01546_log_queries_min_query_duration_ms-fast%'
and query not like '%system.query_log%'
query like 'select \'01546_log_queries_min_query_duration_ms-fast%'
and current_database = currentDatabase()
and event_date = today()
and event_time >= now() - interval 1 minute;
and event_date >= yesterday();
select count()
from system.query_thread_log
where
query like '%01546_log_queries_min_query_duration_ms-fast%'
and query not like '%system.query_thread_log%'
query like 'select \'01546_log_queries_min_query_duration_ms-fast%'
and current_database = currentDatabase()
and event_date = today()
and event_time >= now() - interval 1 minute;
and event_date >= yesterday();
--
-- slow -- query logged
@ -37,18 +33,14 @@ system flush logs;
select count()
from system.query_log
where
query like '%01546_log_queries_min_query_duration_ms-slow%'
and query not like '%system.query_log%'
query like 'select \'01546_log_queries_min_query_duration_ms-slow%'
and current_database = currentDatabase()
and event_date = today()
and event_time >= now() - interval 1 minute;
and event_date >= yesterday();
-- There at least two threads involved in a simple query
-- (one thread just waits another, sigh)
select count() == 2
select if(count() == 2, 'OK', 'Fail: ' || toString(count()))
from system.query_thread_log
where
query like '%01546_log_queries_min_query_duration_ms-slow%'
and query not like '%system.query_thread_log%'
query like 'select \'01546_log_queries_min_query_duration_ms-slow%'
and current_database = currentDatabase()
and event_date = today()
and event_time >= now() - interval 1 minute;
and event_date >= yesterday();

View File

@ -21,17 +21,15 @@ system flush logs;
select count()
from system.query_log
where
query like '%01547_query_log_current_database%'
query like 'select \'01547_query_log_current_database%'
and current_database = currentDatabase()
and event_date = today()
and event_time >= now() - interval 1 minute;
and event_date >= yesterday();
-- at least two threads for processing
-- (but one just waits for another, sigh)
select count() == 2
from system.query_thread_log
where
query like '%01547_query_log_current_database%'
query like 'select \'01547_query_log_current_database%'
and current_database = currentDatabase()
and event_date = today()
and event_time >= now() - interval 1 minute;
and event_date >= yesterday()

View File

@ -13,8 +13,7 @@ where
query like '%01548_query_log_query_execution_ms%'
and current_database = currentDatabase()
and query_duration_ms between 100 and 800
and event_date = today()
and event_time >= now() - interval 1 minute;
and event_date >= yesterday();
-- at least two threads for processing
-- (but one just waits for another, sigh)

View File

@ -0,0 +1 @@
select count(*) from system.processes where current_database = currentDatabase();

View File

@ -4,9 +4,10 @@ CURDIR=$(cd "$(dirname "${BASH_SOURCE[0]}")" && pwd)
# shellcheck source=../shell_config.sh
. "$CURDIR"/../shell_config.sh
$CLICKHOUSE_BENCHMARK --iterations 10 --query "SELECT 1" 1>/dev/null 2>"$CLICKHOUSE_TMP"/err
LOG="$CLICKHOUSE_TMP/err-$CLICKHOUSE_DATABASE"
$CLICKHOUSE_BENCHMARK --iterations 10 --query "SELECT 1" 1>/dev/null 2>"$LOG"
cat "$CLICKHOUSE_TMP"/err | grep Exception
cat "$CLICKHOUSE_TMP"/err | grep Loaded
cat "$LOG" | grep Exception
cat "$LOG" | grep Loaded
rm "$CLICKHOUSE_TMP"/err
rm "$LOG"

View File

@ -8,7 +8,7 @@ INSERT INTO xp SELECT '2020-01-01', number, '' FROM numbers(100000);
CREATE TABLE xp_d AS xp ENGINE = Distributed(test_shard_localhost, currentDatabase(), xp);
SELECT count(7 = (SELECT number FROM numbers(0) ORDER BY number ASC NULLS FIRST LIMIT 7)) FROM xp_d PREWHERE toYYYYMM(A) GLOBAL IN (SELECT NULL = (SELECT number FROM numbers(1) ORDER BY number DESC NULLS LAST LIMIT 1), toYYYYMM(min(A)) FROM xp_d) WHERE B > NULL; -- { serverError 20 }
SELECT count(7 = (SELECT number FROM numbers(0) ORDER BY number ASC NULLS FIRST LIMIT 7)) FROM xp_d PREWHERE toYYYYMM(A) GLOBAL IN (SELECT NULL = (SELECT number FROM numbers(1) ORDER BY number DESC NULLS LAST LIMIT 1), toYYYYMM(min(A)) FROM xp_d) WHERE B > NULL; -- B > NULL is evaluated to 0 and this works
SELECT count() FROM xp_d WHERE A GLOBAL IN (SELECT NULL); -- { serverError 53 }

View File

@ -8,13 +8,15 @@ CREATE TABLE table_with_single_pk
ENGINE = MergeTree
ORDER BY key;
INSERT INTO table_with_single_pk SELECT number, toString(number % 10) FROM numbers(10000000);
INSERT INTO table_with_single_pk SELECT number, toString(number % 10) FROM numbers(1000000);
SYSTEM FLUSH LOGS;
WITH (
SELECT (event_time, event_time_microseconds)
FROM system.part_log
WHERE "table" = 'table_with_single_pk'
AND "database" = currentDatabase()
ORDER BY event_time DESC
LIMIT 1
) AS time

View File

@ -136,7 +136,7 @@ SELECT 'ACTUAL LOG CONTENT:';
-- Try to filter out all possible previous junk events by excluding old log entries,
SELECT query_kind, query FROM system.query_log
WHERE
log_comment LIKE '%system.query_log%' AND type == 'QueryStart' AND event_time >= now() - 10
log_comment LIKE '%system.query_log%' AND type == 'QueryStart' AND event_date >= yesterday()
AND current_database == currentDatabase()
ORDER BY event_time_microseconds;

View File

@ -0,0 +1,7 @@
1 1
1 2
1 3
1 1
1 2
1 3
3

View File

@ -0,0 +1,19 @@
drop table if exists x;
create table x (i int, j int) engine MergeTree partition by i order by j settings index_granularity = 1;
insert into x values (1, 1), (1, 2), (1, 3), (2, 4), (2, 5), (2, 6);
set max_rows_to_read = 3;
select * from x where _partition_id = partitionId(1);
set max_rows_to_read = 4; -- one row for subquery
select * from x where _partition_id in (select partitionId(number + 1) from numbers(1));
-- trivial count optimization test
set max_rows_to_read = 1; -- one row for subquery
select count() from x where _partition_id in (select partitionId(number + 1) from numbers(1));
drop table x;

View File

@ -0,0 +1,3 @@
4.5 9
3 2
3 3

View File

@ -0,0 +1,8 @@
SELECT avg(number) AS number, max(number) FROM numbers(10); -- { serverError 184 }
SELECT sum(x) AS x, max(x) FROM (SELECT 1 AS x UNION ALL SELECT 2 AS x) t; -- { serverError 184 }
select sum(C1) as C1, count(C1) as C2 from (select number as C1 from numbers(3)) as ITBL; -- { serverError 184 }
set prefer_column_name_to_alias = 1;
SELECT avg(number) AS number, max(number) FROM numbers(10);
SELECT sum(x) AS x, max(x) FROM (SELECT 1 AS x UNION ALL SELECT 2 AS x) t settings prefer_column_name_to_alias = 1;
select sum(C1) as C1, count(C1) as C2 from (select number as C1 from numbers(3)) as ITBL settings prefer_column_name_to_alias = 1;

View File

@ -0,0 +1,46 @@
# Invocation with constants
1
0
1
0
# Invocation with non-constant addresses
192.168.99.255 192.168.100.0/22 0
192.168.100.1 192.168.100.0/22 1
192.168.103.255 192.168.100.0/22 1
192.168.104.0 192.168.100.0/22 0
::192.168.99.255 ::192.168.100.0/118 0
::192.168.100.1 ::192.168.100.0/118 1
::192.168.103.255 ::192.168.100.0/118 1
::192.168.104.0 ::192.168.100.0/118 0
# Invocation with non-constant prefixes
192.168.100.1 192.168.100.0/22 1
192.168.100.1 192.168.100.0/24 1
192.168.100.1 192.168.100.0/32 0
::192.168.100.1 ::192.168.100.0/118 1
::192.168.100.1 ::192.168.100.0/120 1
::192.168.100.1 ::192.168.100.0/128 0
# Invocation with non-constants
192.168.100.1 192.168.100.0/22 1
192.168.100.1 192.168.100.0/24 1
192.168.103.255 192.168.100.0/22 1
192.168.103.255 192.168.100.0/24 0
::192.168.100.1 ::192.168.100.0/118 1
::192.168.100.1 ::192.168.100.0/120 1
::192.168.103.255 ::192.168.100.0/118 1
::192.168.103.255 ::192.168.100.0/120 0
# Check with dense table
1
1
1
1
1
1
1
1
# Mismatching IP versions is not an error.
0
0
0
0
# Unparsable arguments
# Wrong argument types

View File

@ -0,0 +1,64 @@
SELECT '# Invocation with constants';
SELECT isIPAddressInRange('127.0.0.1', '127.0.0.0/8');
SELECT isIPAddressInRange('128.0.0.1', '127.0.0.0/8');
SELECT isIPAddressInRange('ffff::1', 'ffff::/16');
SELECT isIPAddressInRange('fffe::1', 'ffff::/16');
SELECT '# Invocation with non-constant addresses';
WITH arrayJoin(['192.168.99.255', '192.168.100.1', '192.168.103.255', '192.168.104.0']) as addr, '192.168.100.0/22' as prefix SELECT addr, prefix, isIPAddressInRange(addr, prefix);
WITH arrayJoin(['::192.168.99.255', '::192.168.100.1', '::192.168.103.255', '::192.168.104.0']) as addr, '::192.168.100.0/118' as prefix SELECT addr, prefix, isIPAddressInRange(addr, prefix);
SELECT '# Invocation with non-constant prefixes';
WITH '192.168.100.1' as addr, arrayJoin(['192.168.100.0/22', '192.168.100.0/24', '192.168.100.0/32']) as prefix SELECT addr, prefix, isIPAddressInRange(addr, prefix);
WITH '::192.168.100.1' as addr, arrayJoin(['::192.168.100.0/118', '::192.168.100.0/120', '::192.168.100.0/128']) as prefix SELECT addr, prefix, isIPAddressInRange(addr, prefix);
SELECT '# Invocation with non-constants';
WITH arrayJoin(['192.168.100.1', '192.168.103.255']) as addr, arrayJoin(['192.168.100.0/22', '192.168.100.0/24']) as prefix SELECT addr, prefix, isIPAddressInRange(addr, prefix);
WITH arrayJoin(['::192.168.100.1', '::192.168.103.255']) as addr, arrayJoin(['::192.168.100.0/118', '::192.168.100.0/120']) as prefix SELECT addr, prefix, isIPAddressInRange(addr, prefix);
SELECT '# Check with dense table';
DROP TABLE IF EXISTS test_data;
CREATE TABLE test_data (cidr String) ENGINE = Memory;
INSERT INTO test_data
SELECT
IPv4NumToString(IPv4CIDRToRange(IPv4StringToNum('255.255.255.255'), toUInt8(number)).1) || '/' || toString(number) AS cidr
FROM system.numbers LIMIT 33;
SELECT sum(isIPAddressInRange('0.0.0.0', cidr)) == 1 FROM test_data;
SELECT sum(isIPAddressInRange('127.0.0.0', cidr)) == 1 FROM test_data;
SELECT sum(isIPAddressInRange('128.0.0.0', cidr)) == 2 FROM test_data;
SELECT sum(isIPAddressInRange('255.0.0.0', cidr)) == 9 FROM test_data;
SELECT sum(isIPAddressInRange('255.0.0.1', cidr)) == 9 FROM test_data;
SELECT sum(isIPAddressInRange('255.0.0.255', cidr)) == 9 FROM test_data;
SELECT sum(isIPAddressInRange('255.255.255.255', cidr)) == 33 FROM test_data;
SELECT sum(isIPAddressInRange('255.255.255.254', cidr)) == 32 FROM test_data;
DROP TABLE IF EXISTS test_data;
SELECT '# Mismatching IP versions is not an error.';
SELECT isIPAddressInRange('127.0.0.1', 'ffff::/16');
SELECT isIPAddressInRange('127.0.0.1', '::127.0.0.1/128');
SELECT isIPAddressInRange('::1', '127.0.0.0/8');
SELECT isIPAddressInRange('::127.0.0.1', '127.0.0.1/32');
SELECT '# Unparsable arguments';
SELECT isIPAddressInRange('unparsable', '127.0.0.0/8'); -- { serverError 6 }
SELECT isIPAddressInRange('127.0.0.1', 'unparsable'); -- { serverError 6 }
SELECT '# Wrong argument types';
SELECT isIPAddressInRange(100, '127.0.0.0/8'); -- { serverError 43 }
SELECT isIPAddressInRange(NULL, '127.0.0.0/8'); -- { serverError 43 }
SELECT isIPAddressInRange(CAST(NULL, 'Nullable(String)'), '127.0.0.0/8'); -- { serverError 43 }
SELECT isIPAddressInRange('127.0.0.1', 100); -- { serverError 43 }
SELECT isIPAddressInRange(100, NULL); -- { serverError 43 }
WITH arrayJoin([NULL, NULL, NULL, NULL]) AS prefix SELECT isIPAddressInRange([NULL, NULL, 0, 255, 0], prefix); -- { serverError 43 }

View File

@ -0,0 +1 @@
SELECT range(toUInt256(1), 1); -- { serverError 44 }

View File

@ -410,8 +410,8 @@
"00571_non_exist_database_when_create_materializ_view",
"00575_illegal_column_exception_when_drop_depen_column",
"00599_create_view_with_subquery",
"00604_show_create_database",
"00600_replace_running_query",
"00604_show_create_database",
"00612_http_max_query_size",
"00619_union_highlite",
"00620_optimize_on_nonleader_replica_zookeeper",
@ -462,6 +462,7 @@
"00933_test_fix_extra_seek_on_compressed_cache",
"00933_ttl_replicated_zookeeper",
"00933_ttl_with_default",
"00950_dict_get",
"00955_test_final_mark",
"00976_ttl_with_old_parts",
"00980_merge_alter_settings",
@ -625,8 +626,8 @@
"01530_drop_database_atomic_sync",
"01541_max_memory_usage_for_user_long",
"01542_dictionary_load_exception_race",
"01560_optimize_on_insert_zookeeper",
"01545_system_errors", // looks at the difference of values in system.errors
"01560_optimize_on_insert_zookeeper",
"01575_disable_detach_table_of_dictionary",
"01593_concurrent_alter_mutations_kill",
"01593_concurrent_alter_mutations_kill_many_replicas",
@ -640,11 +641,23 @@
"01666_blns",
"01646_system_restart_replicas_smoke", // system restart replicas is a global query
"01656_test_query_log_factories_info",
"01658_read_file_to_stringcolumn",
"01669_columns_declaration_serde",
"01676_dictget_in_default_expression",
"01681_cache_dictionary_simple_key",
"01682_cache_dictionary_complex_key",
"01683_flat_dictionary",
"01684_ssd_cache_dictionary_simple_key",
"01685_ssd_cache_dictionary_complex_key",
"01700_system_zookeeper_path_in",
"01702_system_query_log", // It's ok to execute in parallel with oter tests but not several instances of the same test.
"01702_system_query_log", // Runs many global system queries
"01715_background_checker_blather_zookeeper",
"01721_engine_file_truncate_on_insert", // It's ok to execute in parallel but not several instances of the same test.
"01747_alter_partition_key_enum_zookeeper",
"01748_dictionary_table_dot", // creates database
"01760_polygon_dictionaries",
"01760_system_dictionaries",
"01761_alter_decimal_zookeeper",
"attach",
"ddl_dictionaries",
@ -653,18 +666,6 @@
"live_view",
"memory_leak",
"memory_limit",
"polygon_dicts", // they use an explicitly specified database
"01658_read_file_to_stringcolumn",
"01721_engine_file_truncate_on_insert", // It's ok to execute in parallel but not several instances of the same test.
"01702_system_query_log", // It's ok to execute in parallel with oter tests but not several instances of the same test.
"01748_dictionary_table_dot", // creates database
"00950_dict_get",
"01683_flat_dictionary",
"01681_cache_dictionary_simple_key",
"01682_cache_dictionary_complex_key",
"01684_ssd_cache_dictionary_simple_key",
"01685_ssd_cache_dictionary_complex_key",
"01760_system_dictionaries",
"01760_polygon_dictionaries"
"polygon_dicts" // they use an explicitly specified database
]
}

View File

@ -21,7 +21,6 @@ if (NOT DEFINED ENABLE_UTILS OR ENABLE_UTILS)
add_subdirectory (corrector_utf8)
add_subdirectory (zookeeper-cli)
add_subdirectory (zookeeper-test)
add_subdirectory (keeper-data-dumper)
add_subdirectory (zookeeper-dump-tree)
add_subdirectory (zookeeper-remove-by-list)
add_subdirectory (zookeeper-create-entry-to-download-part)
@ -34,6 +33,10 @@ if (NOT DEFINED ENABLE_UTILS OR ENABLE_UTILS)
add_subdirectory (wal-dump)
add_subdirectory (check-mysql-binlog)
if (USE_NURAFT)
add_subdirectory (keeper-data-dumper)
endif ()
if (NOT OS_DARWIN)
add_subdirectory (memcpy-bench)
endif ()