Merge remote-tracking branch 'rschu1ze/master' into bump-grpc-and-protobuf

This commit is contained in:
Robert Schulze 2023-11-17 11:48:16 +00:00
commit 882d0d17f1
No known key found for this signature in database
GPG Key ID: 26703B55FB13728A
107 changed files with 2086 additions and 480 deletions

View File

@ -187,9 +187,10 @@ if (NOT CMAKE_BUILD_TYPE_UC STREQUAL "RELEASE")
endif ()
endif()
if (CMAKE_BUILD_TYPE_UC STREQUAL "RELEASE"
OR CMAKE_BUILD_TYPE_UC STREQUAL "RELWITHDEBINFO"
OR CMAKE_BUILD_TYPE_UC STREQUAL "MINSIZEREL")
if (NOT (SANITIZE_COVERAGE OR WITH_COVERAGE)
AND (CMAKE_BUILD_TYPE_UC STREQUAL "RELEASE"
OR CMAKE_BUILD_TYPE_UC STREQUAL "RELWITHDEBINFO"
OR CMAKE_BUILD_TYPE_UC STREQUAL "MINSIZEREL"))
set (OMIT_HEAVY_DEBUG_SYMBOLS_DEFAULT ON)
else()
set (OMIT_HEAVY_DEBUG_SYMBOLS_DEFAULT OFF)
@ -291,9 +292,6 @@ set (CMAKE_C_STANDARD 11)
set (CMAKE_C_EXTENSIONS ON) # required by most contribs written in C
set (CMAKE_C_STANDARD_REQUIRED ON)
# Compiler-specific coverage flags e.g. -fcoverage-mapping
option(WITH_COVERAGE "Profile the resulting binary/binaries" OFF)
if (COMPILER_CLANG)
# Enable C++14 sized global deallocation functions. It should be enabled by setting -std=c++14 but I'm not sure.
# See https://reviews.llvm.org/D112921
@ -309,18 +307,12 @@ if (COMPILER_CLANG)
set(BRANCHES_WITHIN_32B_BOUNDARIES "-mbranches-within-32B-boundaries")
set(COMPILER_FLAGS "${COMPILER_FLAGS} ${BRANCHES_WITHIN_32B_BOUNDARIES}")
endif()
if (WITH_COVERAGE)
set(COMPILER_FLAGS "${COMPILER_FLAGS} -fprofile-instr-generate -fcoverage-mapping")
# If we want to disable coverage for specific translation units
set(WITHOUT_COVERAGE "-fno-profile-instr-generate -fno-coverage-mapping")
endif()
endif ()
set (COMPILER_FLAGS "${COMPILER_FLAGS}")
# Our built-in unwinder only supports DWARF version up to 4.
set (DEBUG_INFO_FLAGS "-g -gdwarf-4")
set (DEBUG_INFO_FLAGS "-g")
# Disable omit frame pointer compiler optimization using -fno-omit-frame-pointer
option(DISABLE_OMIT_FRAME_POINTER "Disable omit frame pointer compiler optimization" OFF)

View File

@ -1,3 +1,5 @@
add_compile_options($<$<OR:$<COMPILE_LANGUAGE:C>,$<COMPILE_LANGUAGE:CXX>>:${COVERAGE_FLAGS}>)
if (USE_CLANG_TIDY)
set (CMAKE_CXX_CLANG_TIDY "${CLANG_TIDY_PATH}")
endif ()

View File

@ -1,11 +1,15 @@
#include "coverage.h"
#if WITH_COVERAGE
#pragma GCC diagnostic ignored "-Wreserved-identifier"
# include <mutex>
# include <unistd.h>
/// WITH_COVERAGE enables the default implementation of code coverage,
/// that dumps a map to the filesystem.
#if WITH_COVERAGE
#include <mutex>
#include <unistd.h>
# if defined(__clang__)
@ -31,3 +35,131 @@ void dumpCoverageReportIfPossible()
#endif
}
/// SANITIZE_COVERAGE enables code instrumentation,
/// but leaves the callbacks implementation to us,
/// which we use to calculate coverage on a per-test basis
/// and to write it to system tables.
#if defined(SANITIZE_COVERAGE)
namespace
{
bool pc_guards_initialized = false;
bool pc_table_initialized = false;
uint32_t * guards_start = nullptr;
uint32_t * guards_end = nullptr;
uintptr_t * coverage_array = nullptr;
size_t coverage_array_size = 0;
uintptr_t * all_addresses_array = nullptr;
size_t all_addresses_array_size = 0;
}
extern "C"
{
/// This is called at least once for every DSO for initialization.
/// But we will use it only for the main DSO.
void __sanitizer_cov_trace_pc_guard_init(uint32_t * start, uint32_t * stop)
{
if (pc_guards_initialized)
return;
pc_guards_initialized = true;
/// The function can be called multiple times, but we need to initialize only once.
if (start == stop || *start)
return;
guards_start = start;
guards_end = stop;
coverage_array_size = stop - start;
/// Note: we will leak this.
coverage_array = static_cast<uintptr_t*>(malloc(sizeof(uintptr_t) * coverage_array_size));
resetCoverage();
}
/// This is called at least once for every DSO for initialization
/// and provides information about all instrumented addresses.
void __sanitizer_cov_pcs_init(const uintptr_t * pcs_begin, const uintptr_t * pcs_end)
{
if (pc_table_initialized)
return;
pc_table_initialized = true;
all_addresses_array = static_cast<uintptr_t*>(malloc(sizeof(uintptr_t) * coverage_array_size));
all_addresses_array_size = pcs_end - pcs_begin;
/// They are not a real pointers, but also contain a flag in the most significant bit,
/// in which we are not interested for now. Reset it.
for (size_t i = 0; i < all_addresses_array_size; ++i)
all_addresses_array[i] = pcs_begin[i] & 0x7FFFFFFFFFFFFFFFULL;
}
/// This is called at every basic block / edge, etc.
void __sanitizer_cov_trace_pc_guard(uint32_t * guard)
{
/// Duplicate the guard check.
if (!*guard)
return;
*guard = 0;
/// If you set *guard to 0 this code will not be called again for this edge.
/// Now we can get the PC and do whatever you want:
/// - store it somewhere or symbolize it and print right away.
/// The values of `*guard` are as you set them in
/// __sanitizer_cov_trace_pc_guard_init and so you can make them consecutive
/// and use them to dereference an array or a bit vector.
void * pc = __builtin_return_address(0);
coverage_array[guard - guards_start] = reinterpret_cast<uintptr_t>(pc);
}
}
__attribute__((no_sanitize("coverage")))
std::span<const uintptr_t> getCoverage()
{
return {coverage_array, coverage_array_size};
}
__attribute__((no_sanitize("coverage")))
std::span<const uintptr_t> getAllInstrumentedAddresses()
{
return {all_addresses_array, all_addresses_array_size};
}
__attribute__((no_sanitize("coverage")))
void resetCoverage()
{
memset(coverage_array, 0, coverage_array_size * sizeof(*coverage_array));
/// The guard defines whether the __sanitizer_cov_trace_pc_guard should be called.
/// For example, you can unset it after first invocation to prevent excessive work.
/// Initially set all the guards to 1 to enable callbacks.
for (uint32_t * x = guards_start; x < guards_end; ++x)
*x = 1;
}
#else
std::span<const uintptr_t> getCoverage()
{
return {};
}
std::span<const uintptr_t> getAllInstrumentedAddresses()
{
return {};
}
void resetCoverage()
{
}
#endif

View File

@ -1,5 +1,8 @@
#pragma once
#include <span>
#include <cstdint>
/// Flush coverage report to file, depending on coverage system
/// proposed by compiler (llvm for clang and gcov for gcc).
///
@ -7,3 +10,16 @@
/// Thread safe (use exclusive lock).
/// Idempotent, may be called multiple times.
void dumpCoverageReportIfPossible();
/// This is effective if SANITIZE_COVERAGE is enabled at build time.
/// Get accumulated unique program addresses of the instrumented parts of the code,
/// seen so far after program startup or after previous reset.
/// The returned span will be represented as a sparse map, containing mostly zeros, which you should filter away.
std::span<const uintptr_t> getCoverage();
/// Get all instrumented addresses that could be in the coverage.
std::span<const uintptr_t> getAllInstrumentedAddresses();
/// Reset the accumulated coverage.
/// This is useful to compare coverage of different tests, including differential coverage.
void resetCoverage();

View File

@ -1,5 +1,6 @@
#include "memcpy.h"
__attribute__((no_sanitize("coverage")))
extern "C" void * memcpy(void * __restrict dst, const void * __restrict src, size_t size)
{
return inline_memcpy(dst, src, size);

View File

@ -93,7 +93,7 @@
* See https://habr.com/en/company/yandex/blog/457612/
*/
__attribute__((no_sanitize("coverage")))
static inline void * inline_memcpy(void * __restrict dst_, const void * __restrict src_, size_t size)
{
/// We will use pointer arithmetic, so char pointer will be used.

View File

@ -58,3 +58,27 @@ if (SANITIZE)
message (FATAL_ERROR "Unknown sanitizer type: ${SANITIZE}")
endif ()
endif()
# Default coverage instrumentation (dumping the coverage map on exit)
option(WITH_COVERAGE "Instrumentation for code coverage with default implementation" OFF)
if (WITH_COVERAGE)
message (INFORMATION "Enabled instrumentation for code coverage")
set(COVERAGE_FLAGS "-fprofile-instr-generate -fcoverage-mapping")
endif()
option (SANITIZE_COVERAGE "Instrumentation for code coverage with custom callbacks" OFF)
if (SANITIZE_COVERAGE)
message (INFORMATION "Enabled instrumentation for code coverage")
# We set this define for whole build to indicate that at least some parts are compiled with coverage.
# And to expose it in system.build_options.
set (CMAKE_CXX_FLAGS "${CMAKE_CXX_FLAGS} -DSANITIZE_COVERAGE=1")
set (CMAKE_C_FLAGS "${CMAKE_C_FLAGS} -DSANITIZE_COVERAGE=1")
# But the actual coverage will be enabled on per-library basis: for ClickHouse code, but not for 3rd-party.
set (COVERAGE_FLAGS "-fsanitize-coverage=trace-pc-guard,pc-table")
endif()
set (WITHOUT_COVERAGE_FLAGS "-fno-profile-instr-generate -fno-coverage-mapping -fno-sanitize-coverage=trace-pc-guard,pc-table")

View File

@ -3,15 +3,6 @@
set (CMAKE_C_FLAGS "${CMAKE_C_FLAGS} -w -ffunction-sections -fdata-sections")
set (CMAKE_CXX_FLAGS "${CMAKE_CXX_FLAGS} -w -ffunction-sections -fdata-sections")
if (WITH_COVERAGE)
set (WITHOUT_COVERAGE_LIST ${WITHOUT_COVERAGE})
separate_arguments(WITHOUT_COVERAGE_LIST)
# disable coverage for contib files and build with optimisations
if (COMPILER_CLANG)
add_compile_options(-O3 -DNDEBUG -finline-functions -finline-hint-functions ${WITHOUT_COVERAGE_LIST})
endif()
endif()
if (SANITIZE STREQUAL "undefined")
# 3rd-party libraries usually not intended to work with UBSan.
add_compile_options(-fno-sanitize=undefined)

View File

@ -378,38 +378,6 @@ else ()
COMMAND_ECHO STDOUT)
endif ()
# add_custom_command (
# OUTPUT ${PROTOC_BUILD_DIR}
# COMMAND mkdir -p ${PROTOC_BUILD_DIR})
#
# add_custom_command (
# OUTPUT "${PROTOC_BUILD_DIR}/CMakeCache.txt"
#
# COMMAND ${CMAKE_COMMAND}
# -G"${CMAKE_GENERATOR}"
# -DCMAKE_MAKE_PROGRAM="${CMAKE_MAKE_PROGRAM}"
# -DCMAKE_C_COMPILER="${CMAKE_C_COMPILER}"
# -DCMAKE_CXX_COMPILER="${CMAKE_CXX_COMPILER}"
# -Dprotobuf_BUILD_TESTS=0
# -Dprotobuf_BUILD_CONFORMANCE=0
# -Dprotobuf_BUILD_EXAMPLES=0
# -Dprotobuf_BUILD_PROTOC_BINARIES=1
# "${protobuf_source_dir}/cmake"
#
# DEPENDS "${PROTOC_BUILD_DIR}"
# WORKING_DIRECTORY "${PROTOC_BUILD_DIR}"
# COMMENT "Configuring 'protoc' for host architecture."
# USES_TERMINAL)
#
# add_custom_command (
# OUTPUT "${PROTOC_BUILD_DIR}/protoc"
# COMMAND ${CMAKE_COMMAND} --build "${PROTOC_BUILD_DIR}"
# DEPENDS "${PROTOC_BUILD_DIR}/CMakeCache.txt"
# COMMENT "Building 'protoc' for host architecture."
# USES_TERMINAL)
#
# add_custom_target (protoc-host DEPENDS "${PROTOC_BUILD_DIR}/protoc")
add_executable(protoc IMPORTED GLOBAL)
set_target_properties (protoc PROPERTIES IMPORTED_LOCATION "${PROTOC_BUILD_DIR}/protoc")
add_dependencies(protoc "${PROTOC_BUILD_DIR}/protoc")

View File

@ -238,19 +238,19 @@ Example:
## Virtual Columns {#virtual-columns}
- `_topic` — Kafka topic.
- `_key` — Key of the message.
- `_offset` — Offset of the message.
- `_timestamp` — Timestamp of the message.
- `_timestamp_ms` — Timestamp in milliseconds of the message.
- `_partition` — Partition of Kafka topic.
- `_headers.name` — Array of message's headers keys.
- `_headers.value` — Array of message's headers values.
- `_topic` — Kafka topic. Data type: `LowCardinality(String)`.
- `_key` — Key of the message. Data type: `String`.
- `_offset` — Offset of the message. Data type: `UInt64`.
- `_timestamp` — Timestamp of the message Data type: `Nullable(DateTime)`.
- `_timestamp_ms` — Timestamp in milliseconds of the message. Data type: `Nullable(DateTime64(3))`.
- `_partition` — Partition of Kafka topic. Data type: `UInt64`.
- `_headers.name` — Array of message's headers keys. Data type: `Array(String)`.
- `_headers.value` — Array of message's headers values. Data type: `Array(String)`.
Additional virtual columns when `kafka_handle_error_mode='stream'`:
- `_raw_message` - Raw message that couldn't be parsed successfully.
- `_error` - Exception message happened during failed parsing.
- `_raw_message` - Raw message that couldn't be parsed successfully. Data type: `String`.
- `_error` - Exception message happened during failed parsing. Data type: `String`.
Note: `_raw_message` and `_error` virtual columns are filled only in case of exception during parsing, they are always empty when message was parsed successfully.

View File

@ -163,14 +163,14 @@ If you want to change the target table by using `ALTER`, we recommend disabling
## Virtual Columns {#virtual-columns}
- `_subject` - NATS message subject.
- `_subject` - NATS message subject. Data type: `String`.
Additional virtual columns when `kafka_handle_error_mode='stream'`:
- `_raw_message` - Raw message that couldn't be parsed successfully.
- `_error` - Exception message happened during failed parsing.
- `_raw_message` - Raw message that couldn't be parsed successfully. Data type: `Nullable(String)`.
- `_error` - Exception message happened during failed parsing. Data type: `Nullable(String)`.
Note: `_raw_message` and `_error` virtual columns are filled only in case of exception during parsing, they are always empty when message was parsed successfully.
Note: `_raw_message` and `_error` virtual columns are filled only in case of exception during parsing, they are always `NULL` when message was parsed successfully.
## Data formats support {#data-formats-support}

View File

@ -184,19 +184,19 @@ Example:
## Virtual Columns {#virtual-columns}
- `_exchange_name` - RabbitMQ exchange name.
- `_channel_id` - ChannelID, on which consumer, who received the message, was declared.
- `_delivery_tag` - DeliveryTag of the received message. Scoped per channel.
- `_redelivered` - `redelivered` flag of the message.
- `_message_id` - messageID of the received message; non-empty if was set, when message was published.
- `_timestamp` - timestamp of the received message; non-empty if was set, when message was published.
- `_exchange_name` - RabbitMQ exchange name. Data type: `String`.
- `_channel_id` - ChannelID, on which consumer, who received the message, was declared. Data type: `String`.
- `_delivery_tag` - DeliveryTag of the received message. Scoped per channel. Data type: `UInt64`.
- `_redelivered` - `redelivered` flag of the message. Data type: `UInt8`.
- `_message_id` - messageID of the received message; non-empty if was set, when message was published. Data type: `String`.
- `_timestamp` - timestamp of the received message; non-empty if was set, when message was published. Data type: `UInt64`.
Additional virtual columns when `kafka_handle_error_mode='stream'`:
- `_raw_message` - Raw message that couldn't be parsed successfully.
- `_error` - Exception message happened during failed parsing.
- `_raw_message` - Raw message that couldn't be parsed successfully. Data type: `Nullable(String)`.
- `_error` - Exception message happened during failed parsing. Data type: `Nullable(String)`.
Note: `_raw_message` and `_error` virtual columns are filled only in case of exception during parsing, they are always empty when message was parsed successfully.
Note: `_raw_message` and `_error` virtual columns are filled only in case of exception during parsing, they are always `NULL` when message was parsed successfully.
## Data formats support {#data-formats-support}

View File

@ -94,12 +94,12 @@ If you want to change the target table by using `ALTER`, we recommend disabling
## Virtual Columns {#virtual-columns}
- `_filename` - Name of the log file.
- `_offset` - Offset in the log file.
- `_filename` - Name of the log file. Data type: `LowCardinality(String)`.
- `_offset` - Offset in the log file. Data type: `UInt64`.
Additional virtual columns when `kafka_handle_error_mode='stream'`:
- `_raw_record` - Raw record that couldn't be parsed successfully.
- `_error` - Exception message happened during failed parsing.
- `_raw_record` - Raw record that couldn't be parsed successfully. Data type: `Nullable(String)`.
- `_error` - Exception message happened during failed parsing. Data type: `Nullable(String)`.
Note: `_raw_record` and `_error` virtual columns are filled only in case of exception during parsing, they are always empty when message was parsed successfully.
Note: `_raw_record` and `_error` virtual columns are filled only in case of exception during parsing, they are always `NULL` when message was parsed successfully.

View File

@ -2469,6 +2469,7 @@ This function is designed to load a NumPy array from a .npy file into ClickHouse
| u2 | UInt16 |
| u4 | UInt32 |
| u8 | UInt64 |
| f2 | Float32 |
| f4 | Float32 |
| f8 | Float64 |
| S | String |

View File

@ -172,7 +172,27 @@ If you set `timeout_before_checking_execution_speed `to 0, ClickHouse will use c
## timeout_overflow_mode {#timeout-overflow-mode}
What to do if the query is run longer than max_execution_time: throw or break. By default, throw.
What to do if the query is run longer than `max_execution_time`: `throw` or `break`. By default, `throw`.
# max_execution_time_leaf
Similar semantic to `max_execution_time` but only apply on leaf node for distributed or remote queries.
For example, if we want to limit execution time on leaf node to `10s` but no limit on the initial node, instead of having `max_execution_time` in the nested subquery settings:
``` sql
SELECT count() FROM cluster(cluster, view(SELECT * FROM t SETTINGS max_execution_time = 10));
```
We can use `max_execution_time_leaf` as the query settings:
``` sql
SELECT count() FROM cluster(cluster, view(SELECT * FROM t)) SETTINGS max_execution_time_leaf = 10;
```
# timeout_overflow_mode_leaf
What to do when the query in leaf node run longer than `max_execution_time_leaf`: `throw` or `break`. By default, `throw`.
## min_execution_speed {#min-execution-speed}

View File

@ -6,9 +6,9 @@ sidebar_label: Random Numbers
# Functions for Generating Random Numbers
All functions in this section accept zero or one arguments. The only use of the argument (if provided) is to prevent prevent [common subexpression
elimination](../../sql-reference/functions/index.md#common-subexpression-elimination) such that two different execution of the same random
function in a query return different random values.
All functions in this section accept zero or one arguments. The only use of the argument (if provided) is to prevent [common subexpression
elimination](../../sql-reference/functions/index.md#common-subexpression-elimination) such that two different executions within a row of the same random
function return different random values.
Related content
- Blog: [Generating random data in ClickHouse](https://clickhouse.com/blog/generating-random-test-distribution-data-for-clickhouse)

View File

@ -1,4 +1,4 @@
---
--
slug: /en/sql-reference/table-functions/file
sidebar_position: 60
sidebar_label: file
@ -6,7 +6,7 @@ sidebar_label: file
# file
Provides a table-like interface to SELECT from and INSERT to files. This table function is similar to the [s3](/docs/en/sql-reference/table-functions/url.md) table function. Use file() when working with local files, and s3() when working with buckets in S3, GCS, or MinIO.
A table engine which provides a table-like interface to SELECT from and INSERT into files, similar to the [s3](/docs/en/sql-reference/table-functions/url.md) table function. Use `file()` when working with local files, and `s3()` when working with buckets in object storage such as S3, GCS, or MinIO.
The `file` function can be used in `SELECT` and `INSERT` queries to read from or write to files.
@ -18,18 +18,18 @@ file([path_to_archive ::] path [,format] [,structure] [,compression])
**Parameters**
- `path` — The relative path to the file from [user_files_path](/docs/en/operations/server-configuration-parameters/settings.md#server_configuration_parameters-user_files_path). Path to file support following globs in read-only mode: `*`, `?`, `{abc,def}` and `{N..M}` where `N`, `M` — numbers, `'abc', 'def'` — strings.
- `path_to_archive` - The relative path to zip/tar/7z archive. Path to archive support the same globs as `path`.
- `path` — The relative path to the file from [user_files_path](/docs/en/operations/server-configuration-parameters/settings.md#server_configuration_parameters-user_files_path). Supports in read-only mode the following [globs](#globs_in_path): `*`, `?`, `{abc,def}` (with `'abc'` and `'def'` being strings) and `{N..M}` (with `N` and `M` being numbers).
- `path_to_archive` - The relative path to a zip/tar/7z archive. Supports the same globs as `path`.
- `format` — The [format](/docs/en/interfaces/formats.md#formats) of the file.
- `structure` — Structure of the table. Format: `'column1_name column1_type, column2_name column2_type, ...'`.
- `compression` — The existing compression type when used in a `SELECT` query, or the desired compression type when used in an `INSERT` query. The supported compression types are `gz`, `br`, `xz`, `zst`, `lz4`, and `bz2`.
- `compression` — The existing compression type when used in a `SELECT` query, or the desired compression type when used in an `INSERT` query. Supported compression types are `gz`, `br`, `xz`, `zst`, `lz4`, and `bz2`.
**Returned value**
A table with the specified structure for reading or writing data in the specified file.
A table for reading or writing data in a file.
## File Write Examples
## Examples for Writing to a File
### Write to a TSV file
@ -48,9 +48,9 @@ As a result, the data is written into the file `test.tsv`:
1 3 2
```
### Partitioned Write to multiple TSV files
### Partitioned write to multiple TSV files
If you specify `PARTITION BY` expression when inserting data into a file() function, a separate file is created for each partition value. Splitting the data into separate files helps to improve reading operations efficiency.
If you specify a `PARTITION BY` expression when inserting data into a table function of type `file()`, then a separate file is created for each partition. Splitting the data into separate files helps to improve performance of read operations.
```sql
INSERT INTO TABLE FUNCTION
@ -72,11 +72,11 @@ As a result, the data is written into three files: `test_1.tsv`, `test_2.tsv`, a
1 2 3
```
## File Read Examples
## Examples for Reading from a File
### SELECT from a CSV file
Setting `user_files_path` and the contents of the file `test.csv`:
First, set `user_files_path` in the server configuration and prepare a file `test.csv`:
``` bash
$ grep user_files_path /etc/clickhouse-server/config.xml
@ -88,7 +88,7 @@ $ cat /var/lib/clickhouse/user_files/test.csv
78,43,45
```
Getting data from a table in `test.csv` and selecting the first two rows from it:
Then, read data from `test.csv` into a table and select its first two rows:
``` sql
SELECT * FROM
@ -103,14 +103,6 @@ LIMIT 2;
└─────────┴─────────┴─────────┘
```
Getting the first 10 lines of a table that contains 3 columns of [UInt32](/docs/en/sql-reference/data-types/int-uint.md) type from a CSV file:
``` sql
SELECT * FROM
file('test.csv', 'CSV', 'column1 UInt32, column2 UInt32, column3 UInt32')
LIMIT 10;
```
### Inserting data from a file into a table:
``` sql
@ -130,41 +122,42 @@ file('test.csv', 'CSV', 'column1 UInt32, column2 UInt32, column3 UInt32');
└─────────┴─────────┴─────────┘
```
Getting data from table in table.csv, located in archive1.zip or/and archive2.zip
Reading data from `table.csv`, located in `archive1.zip` or/and `archive2.zip`:
``` sql
SELECT * FROM file('user_files/archives/archive{1..2}.zip :: table.csv');
```
## Globs in Path {#globs_in_path}
## Globbing {#globs_in_path}
Multiple path components can have globs. For being processed file must exist and match to the whole path pattern (not only suffix or prefix).
Paths may use globbing. Files must match the whole path pattern, not only the suffix or prefix.
- `*`Substitutes any number of any characters except `/` including empty string.
- `?`Substitutes any single character.
- `{some_string,another_string,yet_another_one}`Substitutes any of strings `'some_string', 'another_string', 'yet_another_one'`. The strings can contain the `/` symbol.
- `{N..M}`Substitutes any number in range from N to M including both borders.
- `**` - Fetches all files inside the folder recursively.
- `*`Represents arbitrarily many characters except `/` but including the empty string.
- `?`Represents an arbitrary single character.
- `{some_string,another_string,yet_another_one}`Represents any of alternative strings `'some_string', 'another_string', 'yet_another_one'`. The strings may contain `/`.
- `{N..M}`Represents any number `>= N` and `<= M`.
- `**` - Represents all files inside a folder recursively.
Constructions with `{}` are similar to the [remote](remote.md) table function.
**Example**
Suppose we have several files with the following relative paths:
Suppose there are these files with the following relative paths:
- 'some_dir/some_file_1'
- 'some_dir/some_file_2'
- 'some_dir/some_file_3'
- 'another_dir/some_file_1'
- 'another_dir/some_file_2'
- 'another_dir/some_file_3'
- `some_dir/some_file_1`
- `some_dir/some_file_2`
- `some_dir/some_file_3`
- `another_dir/some_file_1`
- `another_dir/some_file_2`
- `another_dir/some_file_3`
Query the number of rows in these files:
Query the total number of rows in all files:
``` sql
SELECT count(*) FROM file('{some,another}_dir/some_file_{1..3}', 'TSV', 'name String, value UInt32');
```
Query the number of rows in all files of these two directories:
An alternative path expression which achieves the same:
``` sql
SELECT count(*) FROM file('{some,another}_dir/*', 'TSV', 'name String, value UInt32');
@ -176,7 +169,7 @@ If your listing of files contains number ranges with leading zeros, use the cons
**Example**
Query the data from files named `file000`, `file001`, … , `file999`:
Query the total number of rows in files named `file000`, `file001`, … , `file999`:
``` sql
SELECT count(*) FROM file('big_dir/file{0..9}{0..9}{0..9}', 'CSV', 'name String, value UInt32');
@ -184,7 +177,7 @@ SELECT count(*) FROM file('big_dir/file{0..9}{0..9}{0..9}', 'CSV', 'name String,
**Example**
Query the data from all files inside `big_dir` directory recursively:
Query the total number of rows from all files inside directory `big_dir/` recursively:
``` sql
SELECT count(*) FROM file('big_dir/**', 'CSV', 'name String, value UInt32');
@ -192,7 +185,7 @@ SELECT count(*) FROM file('big_dir/**', 'CSV', 'name String, value UInt32');
**Example**
Query the data from all `file002` files from any folder inside `big_dir` directory recursively:
Query the total number of rows from all files `file002` inside any folder in directory `big_dir/` recursively:
``` sql
SELECT count(*) FROM file('big_dir/**/file002', 'CSV', 'name String, value UInt32');

View File

@ -6,7 +6,7 @@ sidebar_label: remote
# remote, remoteSecure
Allows accessing remote servers, including migration of data, without creating a [Distributed](../../engines/table-engines/special/distributed.md) table. `remoteSecure` - same as `remote` but with a secured connection.
Table function `remote` allows to access remote servers on-the-fly, i.e. without creating a [Distributed](../../engines/table-engines/special/distributed.md) table. Table function `remoteSecure` is same as `remote` but over a secure connection.
Both functions can be used in `SELECT` and `INSERT` queries.
@ -21,36 +21,36 @@ remoteSecure('addresses_expr', [db.table, 'user'[, 'password'], sharding_key])
## Parameters
- `addresses_expr` — An expression that generates addresses of remote servers. This may be just one server address. The server address is `host:port`, or just `host`.
- `addresses_expr` — A remote server address or an expression that generates multiple addresses of remote servers. Format: `host` or `host:port`.
The host can be specified as the server name, or as the IPv4 or IPv6 address. An IPv6 address is specified in square brackets.
The `host` can be specified as a server name, or as a IPv4 or IPv6 address. An IPv6 address must be specified in square brackets.
The port is the TCP port on the remote server. If the port is omitted, it uses [tcp_port](../../operations/server-configuration-parameters/settings.md#server_configuration_parameters-tcp_port) from the servers config file in `remote` (by default, 9000) and [tcp_port_secure](../../operations/server-configuration-parameters/settings.md#server_configuration_parameters-tcp_port_secure) in `remoteSecure` (by default, 9440).
The `port` is the TCP port on the remote server. If the port is omitted, it uses [tcp_port](../../operations/server-configuration-parameters/settings.md#server_configuration_parameters-tcp_port) from the server config file for table function `remote` (by default, 9000) and [tcp_port_secure](../../operations/server-configuration-parameters/settings.md#server_configuration_parameters-tcp_port_secure) for table function `remoteSecure` (by default, 9440).
The port is required for an IPv6 address.
For IPv6 addresses, a port is required.
If only specify this parameter, `db` and `table` will use `system.one` by default.
If only parameter `addresses_expr` is specified, `db` and `table` will use `system.one` by default.
Type: [String](../../sql-reference/data-types/string.md).
- `db` — Database name. Type: [String](../../sql-reference/data-types/string.md).
- `table` — Table name. Type: [String](../../sql-reference/data-types/string.md).
- `user` — User name. If the user is not specified, `default` is used. Type: [String](../../sql-reference/data-types/string.md).
- `password` — User password. If the password is not specified, an empty password is used. Type: [String](../../sql-reference/data-types/string.md).
- `user` — User name. If not specified, `default` is used. Type: [String](../../sql-reference/data-types/string.md).
- `password` — User password. If not specified, an empty password is used. Type: [String](../../sql-reference/data-types/string.md).
- `sharding_key` — Sharding key to support distributing data across nodes. For example: `insert into remote('127.0.0.1:9000,127.0.0.2', db, table, 'default', rand())`. Type: [UInt32](../../sql-reference/data-types/int-uint.md).
## Returned value
The dataset from remote servers.
A table located on a remote server.
## Usage
Unless you are migrating data from one system to another, using the `remote` table function is less optimal than creating a `Distributed` table because in this case the server connection is re-established for every request. Also, if hostnames are set, the names are resolved, and errors are not counted when working with various replicas. When processing a large number of queries, always create the `Distributed` table ahead of time, and do not use the `remote` table function.
As table functions `remote` and `remoteSecure` re-establish the connection for each request, it is recommended to use a `Distributed` table instead. Also, if hostnames are set, the names are resolved, and errors are not counted when working with various replicas. When processing a large number of queries, always create the `Distributed` table ahead of time, and do not use the `remote` table function.
The `remote` table function can be useful in the following cases:
- Migrating data from one system to another
- Accessing a specific server for data comparison, debugging, and testing.
- One-time data migration from one system to another
- Accessing a specific server for data comparison, debugging, and testing, i.e. ad-hoc connections.
- Queries between various ClickHouse clusters for research purposes.
- Infrequent distributed requests that are made manually.
- Distributed requests where the set of servers is re-defined each time.
@ -68,7 +68,7 @@ localhost
[2a02:6b8:0:1111::11]:9000
```
Multiple addresses can be comma-separated. In this case, ClickHouse will use distributed processing, so it will send the query to all specified addresses (like shards with different data). Example:
Multiple addresses can be comma-separated. In this case, ClickHouse will use distributed processing and send the query to all specified addresses (like shards with different data). Example:
``` text
example01-01-1,example01-02-1
@ -91,10 +91,13 @@ SELECT * FROM remote_table;
```
### Migration of tables from one system to another:
This example uses one table from a sample dataset. The database is `imdb`, and the table is `actors`.
#### On the source ClickHouse system (the system that currently hosts the data)
- Verify the source database and table name (`imdb.actors`)
```sql
show databases
```
@ -104,6 +107,7 @@ This example uses one table from a sample dataset. The database is `imdb`, and
```
- Get the CREATE TABLE statement from the source:
```
select create_table_query
from system.tables
@ -111,6 +115,7 @@ This example uses one table from a sample dataset. The database is `imdb`, and
```
Response
```sql
CREATE TABLE imdb.actors (`id` UInt32,
`first_name` String,
@ -123,11 +128,13 @@ This example uses one table from a sample dataset. The database is `imdb`, and
#### On the destination ClickHouse system:
- Create the destination database:
```sql
CREATE DATABASE imdb
```
- Using the CREATE TABLE statement from the source, create the destination:
```sql
CREATE TABLE imdb.actors (`id` UInt32,
`first_name` String,
@ -140,21 +147,23 @@ This example uses one table from a sample dataset. The database is `imdb`, and
#### Back on the source deployment:
Insert into the new database and table created on the remote system. You will need the host, port, username, password, destination database, and destination table.
```sql
INSERT INTO FUNCTION
remoteSecure('remote.clickhouse.cloud:9440', 'imdb.actors', 'USER', 'PASSWORD')
SELECT * from imdb.actors
```
## Globs in Addresses {#globs-in-addresses}
## Globbing {#globs-in-addresses}
Patterns in curly brackets `{ }` are used to generate a set of shards and to specify replicas. If there are multiple pairs of curly brackets, then the direct product of the corresponding sets is generated.
The following pattern types are supported.
- {*a*,*b*} - Any number of variants separated by a comma. The pattern is replaced with *a* in the first shard address and it is replaced with *b* in the second shard address and so on. For instance, `example0{1,2}-1` generates addresses `example01-1` and `example02-1`.
- {*n*..*m*} - A range of numbers. This pattern generates shard addresses with incrementing indices from *n* to *m*. `example0{1..2}-1` generates `example01-1` and `example02-1`.
- {*0n*..*0m*} - A range of numbers with leading zeroes. This modification preserves leading zeroes in indices. The pattern `example{01..03}-1` generates `example01-1`, `example02-1` and `example03-1`.
- {*a*|*b*} - Any number of variants separated by a `|`. The pattern specifies replicas. For instance, `example01-{1|2}` generates replicas `example01-1` and `example01-2`.
- `{a,b,c}` - Represents any of alternative strings `a`, `b` or `c`. The pattern is replaced with `a` in the first shard address and replaced with `b` in the second shard address and so on. For instance, `example0{1,2}-1` generates addresses `example01-1` and `example02-1`.
- `{N..M}` - A range of numbers. This pattern generates shard addresses with incrementing indices from `N` to (and including) `M`. For instance, `example0{1..2}-1` generates `example01-1` and `example02-1`.
- `{0n..0m}` - A range of numbers with leading zeroes. This pattern preserves leading zeroes in indices. For instance, `example{01..03}-1` generates `example01-1`, `example02-1` and `example03-1`.
- `{a|b}` - Any number of variants separated by a `|`. The pattern specifies replicas. For instance, `example01-{1|2}` generates replicas `example01-1` and `example01-2`.
The query will be sent to the first healthy replica. However, for `remote` the replicas are iterated in the order currently set in the [load_balancing](../../operations/settings/settings.md#settings-load_balancing) setting.
The number of generated addresses is limited by [table_function_remote_max_addresses](../../operations/settings/settings.md#table_function_remote_max_addresses) setting.

View File

@ -1,3 +1,5 @@
add_compile_options($<$<OR:$<COMPILE_LANGUAGE:C>,$<COMPILE_LANGUAGE:CXX>>:${COVERAGE_FLAGS}>)
if (USE_CLANG_TIDY)
set (CMAKE_CXX_CLANG_TIDY "${CLANG_TIDY_PATH}")
endif ()

View File

@ -676,6 +676,10 @@ try
global_context->addWarningMessage("Server was built with sanitizer. It will work slowly.");
#endif
#if defined(SANITIZE_COVERAGE) || WITH_COVERAGE
global_context->addWarningMessage("Server was built with code coverage. It will work slowly.");
#endif
const size_t physical_server_memory = getMemoryAmount();
LOG_INFO(log, "Available RAM: {}; physical cores: {}; logical cores: {}.",

View File

@ -182,7 +182,7 @@ public:
struct ConvertToASTOptions
{
/// Add _CAST if constant litral type is different from column type
/// Add _CAST if constant literal type is different from column type
bool add_cast_for_constants = true;
/// Identifiers are fully qualified (`database.table.column`), otherwise names are just column names (`column`)

View File

@ -188,7 +188,7 @@ private:
if (auto * table_function_node = parent->as<TableFunctionNode>())
{
if (child != table_function_node->getArgumentsNode())
throw Exception(ErrorCodes::LOGICAL_ERROR, "TableFunctioNode is expected to have only one child node");
throw Exception(ErrorCodes::LOGICAL_ERROR, "TableFunctionNode is expected to have only one child node");
const auto & unresolved_indexes = table_function_node->getUnresolvedArgumentIndexes();

View File

@ -0,0 +1,168 @@
#include <Analyzer/Passes/RemoveUnusedProjectionColumnsPass.h>
#include <Functions/FunctionFactory.h>
#include <Analyzer/InDepthQueryTreeVisitor.h>
#include <Analyzer/FunctionNode.h>
#include <Analyzer/QueryNode.h>
#include <Analyzer/ColumnNode.h>
#include <Analyzer/SortNode.h>
#include <Analyzer/AggregationUtils.h>
#include <Analyzer/Utils.h>
namespace DB
{
namespace
{
class CollectUsedColumnsVisitor : public InDepthQueryTreeVisitorWithContext<CollectUsedColumnsVisitor>
{
public:
using Base = InDepthQueryTreeVisitorWithContext<CollectUsedColumnsVisitor>;
using Base::Base;
bool needChildVisit(QueryTreeNodePtr &, QueryTreeNodePtr & child)
{
if (isQueryOrUnionNode(child))
{
subqueries_nodes_to_visit.insert(child);
return false;
}
return true;
}
void enterImpl(QueryTreeNodePtr & node)
{
auto node_type = node->getNodeType();
if (node_type == QueryTreeNodeType::QUERY)
{
auto & query_node = node->as<QueryNode &>();
auto table_expressions = extractTableExpressions(query_node.getJoinTree());
for (const auto & table_expression : table_expressions)
if (isQueryOrUnionNode(table_expression))
query_or_union_node_to_used_columns.emplace(table_expression, std::unordered_set<std::string>());
return;
}
if (node_type != QueryTreeNodeType::COLUMN)
return;
auto & column_node = node->as<ColumnNode &>();
auto column_source_node = column_node.getColumnSource();
auto column_source_node_type = column_source_node->getNodeType();
if (column_source_node_type == QueryTreeNodeType::QUERY || column_source_node_type == QueryTreeNodeType::UNION)
query_or_union_node_to_used_columns[column_source_node].insert(column_node.getColumnName());
}
void reset()
{
subqueries_nodes_to_visit.clear();
query_or_union_node_to_used_columns.clear();
}
std::unordered_set<QueryTreeNodePtr> subqueries_nodes_to_visit;
std::unordered_map<QueryTreeNodePtr, std::unordered_set<std::string>> query_or_union_node_to_used_columns;
};
std::unordered_set<size_t> convertUsedColumnNamesToUsedProjectionIndexes(const QueryTreeNodePtr & query_or_union_node, const std::unordered_set<std::string> & used_column_names)
{
std::unordered_set<size_t> result;
auto * union_node = query_or_union_node->as<UnionNode>();
auto * query_node = query_or_union_node->as<QueryNode>();
const auto & projection_columns = query_node ? query_node->getProjectionColumns() : union_node->computeProjectionColumns();
size_t projection_columns_size = projection_columns.size();
for (size_t i = 0; i < projection_columns_size; ++i)
{
const auto & projection_column = projection_columns[i];
if (used_column_names.contains(projection_column.name))
result.insert(i);
}
return result;
}
/// We cannot remove aggregate functions, if query does not contain GROUP BY or arrayJoin from subquery projection
void updateUsedProjectionIndexes(const QueryTreeNodePtr & query_or_union_node, std::unordered_set<size_t> & used_projection_columns_indexes)
{
if (auto * union_node = query_or_union_node->as<UnionNode>())
{
auto union_node_mode = union_node->getUnionMode();
bool is_distinct = union_node_mode == SelectUnionMode::UNION_DISTINCT ||
union_node_mode == SelectUnionMode::INTERSECT_DISTINCT ||
union_node_mode == SelectUnionMode::EXCEPT_DISTINCT;
if (is_distinct)
{
auto union_projection_columns = union_node->computeProjectionColumns();
size_t union_projection_columns_size = union_projection_columns.size();
for (size_t i = 0; i < union_projection_columns_size; ++i)
used_projection_columns_indexes.insert(i);
return;
}
for (auto & query_node : union_node->getQueries().getNodes())
updateUsedProjectionIndexes(query_node, used_projection_columns_indexes);
return;
}
const auto & query_node = query_or_union_node->as<const QueryNode &>();
const auto & projection_nodes = query_node.getProjection().getNodes();
size_t projection_nodes_size = projection_nodes.size();
for (size_t i = 0; i < projection_nodes_size; ++i)
{
const auto & projection_node = projection_nodes[i];
if ((!query_node.hasGroupBy() && hasAggregateFunctionNodes(projection_node)) || hasFunctionNode(projection_node, "arrayJoin"))
used_projection_columns_indexes.insert(i);
}
}
}
void RemoveUnusedProjectionColumnsPass::run(QueryTreeNodePtr query_tree_node, ContextPtr context)
{
std::vector<QueryTreeNodePtr> nodes_to_visit;
nodes_to_visit.push_back(query_tree_node);
CollectUsedColumnsVisitor visitor(std::move(context));
while (!nodes_to_visit.empty())
{
auto node_to_visit = std::move(nodes_to_visit.back());
nodes_to_visit.pop_back();
visitor.visit(node_to_visit);
for (auto & [query_or_union_node, used_columns] : visitor.query_or_union_node_to_used_columns)
{
auto used_projection_indexes = convertUsedColumnNamesToUsedProjectionIndexes(query_or_union_node, used_columns);
updateUsedProjectionIndexes(query_or_union_node, used_projection_indexes);
/// Keep at least 1 column if used projection columns are empty
if (used_projection_indexes.empty())
used_projection_indexes.insert(0);
if (auto * union_node = query_or_union_node->as<UnionNode>())
union_node->removeUnusedProjectionColumns(used_projection_indexes);
else if (auto * query_node = query_or_union_node->as<QueryNode>())
query_node->removeUnusedProjectionColumns(used_projection_indexes);
}
for (const auto & subquery_node_to_visit : visitor.subqueries_nodes_to_visit)
nodes_to_visit.push_back(subquery_node_to_visit);
visitor.reset();
}
}
}

View File

@ -0,0 +1,24 @@
#pragma once
#include <Analyzer/IQueryTreePass.h>
namespace DB
{
/** Remove unused projection columns in subqueries.
*
* Example: SELECT a FROM (SELECT a, b FROM test_table);
* Result: SELECT a FROM (SELECT a FROM test_table);
*/
class RemoveUnusedProjectionColumnsPass final : public IQueryTreePass
{
public:
String getName() override { return "RemoveUnusedProjectionColumnsPass"; }
String getDescription() override { return "Remove unused projection columns in subqueries."; }
void run(QueryTreeNodePtr query_tree_node, ContextPtr context) override;
};
}

View File

@ -46,6 +46,54 @@ QueryNode::QueryNode(ContextMutablePtr context_)
: QueryNode(std::move(context_), {} /*settings_changes*/)
{}
void QueryNode::resolveProjectionColumns(NamesAndTypes projection_columns_value)
{
if (projection_columns_value.size() != getProjection().getNodes().size())
throw Exception(ErrorCodes::LOGICAL_ERROR, "Expected projection columns size to match projection nodes size");
projection_columns = std::move(projection_columns_value);
}
void QueryNode::removeUnusedProjectionColumns(const std::unordered_set<std::string> & used_projection_columns)
{
auto & projection_nodes = getProjection().getNodes();
size_t projection_columns_size = projection_columns.size();
size_t write_index = 0;
for (size_t i = 0; i < projection_columns_size; ++i)
{
if (!used_projection_columns.contains(projection_columns[i].name))
continue;
projection_nodes[write_index] = projection_nodes[i];
projection_columns[write_index] = projection_columns[i];
++write_index;
}
projection_nodes.erase(projection_nodes.begin() + write_index, projection_nodes.end());
projection_columns.erase(projection_columns.begin() + write_index, projection_columns.end());
}
void QueryNode::removeUnusedProjectionColumns(const std::unordered_set<size_t> & used_projection_columns_indexes)
{
auto & projection_nodes = getProjection().getNodes();
size_t projection_columns_size = projection_columns.size();
size_t write_index = 0;
for (size_t i = 0; i < projection_columns_size; ++i)
{
if (!used_projection_columns_indexes.contains(i))
continue;
projection_nodes[write_index] = projection_nodes[i];
projection_columns[write_index] = projection_columns[i];
++write_index;
}
projection_nodes.erase(projection_nodes.begin() + write_index, projection_nodes.end());
projection_columns.erase(projection_columns.begin() + write_index, projection_columns.end());
}
void QueryNode::dumpTreeImpl(WriteBuffer & buffer, FormatState & format_state, size_t indent) const
{
buffer << std::string(indent, ' ') << "QUERY id: " << format_state.getNodeId(this);

View File

@ -556,10 +556,13 @@ public:
}
/// Resolve query node projection columns
void resolveProjectionColumns(NamesAndTypes projection_columns_value)
{
projection_columns = std::move(projection_columns_value);
}
void resolveProjectionColumns(NamesAndTypes projection_columns_value);
/// Remove unused projection columns
void removeUnusedProjectionColumns(const std::unordered_set<std::string> & used_projection_columns);
/// Remove unused projection columns
void removeUnusedProjectionColumns(const std::unordered_set<size_t> & used_projection_columns_indexes);
QueryTreeNodeType getNodeType() const override
{

View File

@ -17,6 +17,7 @@
#include <Analyzer/InDepthQueryTreeVisitor.h>
#include <Analyzer/Utils.h>
#include <Analyzer/Passes/QueryAnalysisPass.h>
#include <Analyzer/Passes/RemoveUnusedProjectionColumnsPass.h>
#include <Analyzer/Passes/CountDistinctPass.h>
#include <Analyzer/Passes/UniqToCountPass.h>
#include <Analyzer/Passes/FunctionToSubcolumnsPass.h>
@ -243,6 +244,7 @@ void QueryTreePassManager::dump(WriteBuffer & buffer, size_t up_to_pass_index)
void addQueryTreePasses(QueryTreePassManager & manager)
{
manager.addPass(std::make_unique<QueryAnalysisPass>());
manager.addPass(std::make_unique<RemoveUnusedProjectionColumnsPass>());
manager.addPass(std::make_unique<FunctionToSubcolumnsPass>());
manager.addPass(std::make_unique<ConvertLogicalExpressionToCNFPass>());

View File

@ -88,6 +88,41 @@ NamesAndTypes UnionNode::computeProjectionColumns() const
return result_columns;
}
void UnionNode::removeUnusedProjectionColumns(const std::unordered_set<std::string> & used_projection_columns)
{
auto projection_columns = computeProjectionColumns();
size_t projection_columns_size = projection_columns.size();
std::unordered_set<size_t> used_projection_column_indexes;
for (size_t i = 0; i < projection_columns_size; ++i)
{
const auto & projection_column = projection_columns[i];
if (used_projection_columns.contains(projection_column.name))
used_projection_column_indexes.insert(i);
}
auto & query_nodes = getQueries().getNodes();
for (auto & query_node : query_nodes)
{
if (auto * query_node_typed = query_node->as<QueryNode>())
query_node_typed->removeUnusedProjectionColumns(used_projection_column_indexes);
else if (auto * union_node_typed = query_node->as<UnionNode>())
union_node_typed->removeUnusedProjectionColumns(used_projection_column_indexes);
}
}
void UnionNode::removeUnusedProjectionColumns(const std::unordered_set<size_t> & used_projection_columns_indexes)
{
auto & query_nodes = getQueries().getNodes();
for (auto & query_node : query_nodes)
{
if (auto * query_node_typed = query_node->as<QueryNode>())
query_node_typed->removeUnusedProjectionColumns(used_projection_columns_indexes);
else if (auto * union_node_typed = query_node->as<UnionNode>())
union_node_typed->removeUnusedProjectionColumns(used_projection_columns_indexes);
}
}
void UnionNode::dumpTreeImpl(WriteBuffer & buffer, FormatState & format_state, size_t indent) const
{
buffer << std::string(indent, ' ') << "UNION id: " << format_state.getNodeId(this);

View File

@ -129,6 +129,12 @@ public:
/// Compute union node projection columns
NamesAndTypes computeProjectionColumns() const;
/// Remove unused projection columns
void removeUnusedProjectionColumns(const std::unordered_set<std::string> & used_projection_columns);
/// Remove unused projection columns
void removeUnusedProjectionColumns(const std::unordered_set<size_t> & used_projection_columns_indexes);
QueryTreeNodeType getNodeType() const override
{
return QueryTreeNodeType::UNION;

View File

@ -152,6 +152,17 @@ void makeUniqueColumnNamesInBlock(Block & block)
}
}
bool isQueryOrUnionNode(const IQueryTreeNode * node)
{
auto node_type = node->getNodeType();
return node_type == QueryTreeNodeType::QUERY || node_type == QueryTreeNodeType::UNION;
}
bool isQueryOrUnionNode(const QueryTreeNodePtr & node)
{
return isQueryOrUnionNode(node.get());
}
QueryTreeNodePtr buildCastFunction(const QueryTreeNodePtr & expression,
const DataTypePtr & type,
const ContextPtr & context,

View File

@ -27,6 +27,12 @@ std::string getGlobalInFunctionNameForLocalInFunctionName(const std::string & fu
/// Add unique suffix to names of duplicate columns in block
void makeUniqueColumnNamesInBlock(Block & block);
/// Returns true, if node has type QUERY or UNION
bool isQueryOrUnionNode(const IQueryTreeNode * node);
/// Returns true, if node has type QUERY or UNION
bool isQueryOrUnionNode(const QueryTreeNodePtr & node);
/** Build cast function that cast expression into type.
* If resolve = true, then result cast function is resolved during build, otherwise
* result cast function is not resolved during build.

View File

@ -1,3 +1,5 @@
add_compile_options($<$<OR:$<COMPILE_LANGUAGE:C>,$<COMPILE_LANGUAGE:CXX>>:${COVERAGE_FLAGS}>)
if (USE_INCLUDE_WHAT_YOU_USE)
set (CMAKE_CXX_INCLUDE_WHAT_YOU_USE ${IWYU_PATH})
endif ()
@ -293,7 +295,8 @@ set_source_files_properties(
Common/Elf.cpp
Common/Dwarf.cpp
Common/SymbolIndex.cpp
PROPERTIES COMPILE_FLAGS "-O2 ${WITHOUT_COVERAGE}")
Common/ThreadFuzzer.cpp
PROPERTIES COMPILE_FLAGS "-O2 ${WITHOUT_COVERAGE_FLAGS}")
target_link_libraries (clickhouse_common_io
PRIVATE

View File

@ -1797,7 +1797,12 @@ void ClientBase::processParsedSingleQuery(const String & full_query, const Strin
{
const auto * logs_level_field = set_query->changes.tryGet(std::string_view{"send_logs_level"});
if (logs_level_field)
updateLoggerLevel(logs_level_field->safeGet<String>());
{
auto logs_level = logs_level_field->safeGet<String>();
/// Check that setting value is correct before updating logger level.
SettingFieldLogsLevelTraits::fromString(logs_level);
updateLoggerLevel(logs_level);
}
}
if (const auto * create_user_query = parsed_query->as<ASTCreateUserQuery>())

View File

@ -251,10 +251,12 @@ void LocalConnection::finishQuery()
else if (state->pushing_async_executor)
{
state->pushing_async_executor->finish();
state->pushing_async_executor.reset();
}
else if (state->pushing_executor)
{
state->pushing_executor->finish();
state->pushing_executor.reset();
}
state->io.onFinish();

View File

@ -330,6 +330,12 @@ void ConfigProcessor::mergeRecursive(XMLDocumentPtr config, Node * config_root,
{
Element & config_element = dynamic_cast<Element &>(*config_node);
/// Remove substitution attributes from the merge target node if source node already has a value
bool source_has_value = with_element.hasChildNodes();
if (source_has_value)
for (const auto & attr_name: SUBSTITUTION_ATTRS)
config_element.removeAttribute(attr_name);
mergeAttributes(config_element, with_element);
mergeRecursive(config, config_node, with_node);
}
@ -513,6 +519,9 @@ void ConfigProcessor::doIncludesRecursive(
if (attr_nodes["from_zk"]) /// we have zookeeper subst
{
if (node->hasChildNodes()) /// only allow substitution for nodes with no value
throw Poco::Exception("Element <" + node->nodeName() + "> has value, can't process from_zk substitution");
contributing_zk_paths.insert(attr_nodes["from_zk"]->getNodeValue());
if (zk_node_cache)
@ -535,6 +544,9 @@ void ConfigProcessor::doIncludesRecursive(
if (attr_nodes["from_env"]) /// we have env subst
{
if (node->hasChildNodes()) /// only allow substitution for nodes with no value
throw Poco::Exception("Element <" + node->nodeName() + "> has value, can't process from_env substitution");
XMLDocumentPtr env_document;
auto get_env_node = [&](const std::string & name) -> const Node *
{

View File

@ -46,8 +46,8 @@ class Elf;
* can parse Debug Information Entries (DIEs), abbreviations, attributes (of
* all forms), and we can interpret bytecode for the line number VM.
*
* We can interpret DWARF records of version 2, 3, or 4, although we don't
* actually support many of the version 4 features (such as VLIW, multiple
* We can interpret DWARF records of version 2, 3, 4, or 5, although we don't
* actually support many of the features of versions 4 and 5 (such as VLIW, multiple
* operations per instruction)
*
* Note that the DWARF record parser does not allocate heap memory at all.

View File

@ -17,6 +17,11 @@
namespace DB
{
namespace ErrorCodes
{
extern const int BAD_ARGUMENTS;
}
/* Transforms string from grep-wildcard-syntax ("{N..M}", "{a,b,c}" as in remote table function and "*", "?") to perl-regexp for using re2 library for matching
* with such steps:
* 1) search intervals like {0..9} and enums like {abc,xyz,qwe} in {}, replace them by regexp with pipe (expr1|expr2|expr3),
@ -116,4 +121,79 @@ std::string makeRegexpPatternFromGlobs(const std::string & initial_str_with_glob
}
return buf_final_processing.str();
}
namespace
{
void expandSelectorGlobImpl(const std::string & path, std::vector<std::string> & for_match_paths_expanded)
{
/// regexp for {expr1,expr2,....} (a selector glob);
/// expr1, expr2,... cannot contain any of these: '{', '}', ','
static const re2::RE2 selector_regex(R"({([^{}*,]+,[^{}*]*[^{}*,])})");
std::string_view path_view(path);
std::string_view matched;
// No (more) selector globs found, quit
if (!RE2::FindAndConsume(&path_view, selector_regex, &matched))
{
for_match_paths_expanded.push_back(path);
return;
}
std::vector<size_t> anchor_positions;
bool opened = false;
bool closed = false;
// Looking for first occurrence of {} selector: write down positions of {, } and all intermediate commas
for (auto it = path.begin(); it != path.end(); ++it)
{
if (*it == '{')
{
if (opened)
throw Exception(ErrorCodes::BAD_ARGUMENTS,
"Unexpected '{{' found in path '{}' at position {}.", path, it - path.begin());
anchor_positions.push_back(std::distance(path.begin(), it));
opened = true;
}
else if (*it == '}')
{
if (!opened)
throw Exception(ErrorCodes::BAD_ARGUMENTS,
"Unexpected '}}' found in path '{}' at position {}.", path, it - path.begin());
anchor_positions.push_back(std::distance(path.begin(), it));
closed = true;
break;
}
else if (*it == ',')
{
if (!opened)
throw Exception(ErrorCodes::BAD_ARGUMENTS,
"Unexpected ',' found in path '{}' at position {}.", path, std::distance(path.begin(), it));
anchor_positions.push_back(std::distance(path.begin(), it));
}
}
if (!opened || !closed)
throw Exception(ErrorCodes::BAD_ARGUMENTS,
"Invalid {{}} glob in path {}.", path);
// generate result: prefix/{a,b,c}/suffix -> [prefix/a/suffix, prefix/b/suffix, prefix/c/suffix]
std::string common_prefix = path.substr(0, anchor_positions.front());
std::string common_suffix = path.substr(anchor_positions.back() + 1);
for (size_t i = 1; i < anchor_positions.size(); ++i)
{
std::string current_selection =
path.substr(anchor_positions[i-1] + 1, (anchor_positions[i] - anchor_positions[i-1] - 1));
std::string expanded_matcher = common_prefix + current_selection + common_suffix;
expandSelectorGlobImpl(expanded_matcher, for_match_paths_expanded);
}
}
}
std::vector<std::string> expandSelectionGlob(const std::string & path)
{
std::vector<std::string> result;
expandSelectorGlobImpl(path, result);
return result;
}
}

View File

@ -4,7 +4,11 @@
namespace DB
{
/* Parse globs in string and make a regexp for it.
*/
std::string makeRegexpPatternFromGlobs(const std::string & initial_str_with_globs);
/// Parse globs in string and make a regexp for it.
std::string makeRegexpPatternFromGlobs(const std::string & initial_str_with_globs);
/// Process {a,b,c...} globs:
/// Don't match it against regex, but generate a,b,c strings instead and process each of them separately.
/// E.g. for a string like `file{1,2,3}.csv` return vector of strings: {`file1.csv`,`file2.csv`,`file3.csv`}
std::vector<std::string> expandSelectionGlob(const std::string & path);
}

View File

@ -416,7 +416,7 @@ private:
std::to_integer<UInt32>(bytes.front()) & MAX_ZERO_BYTE_COUNT);
if (zero_byte_count1 > VALUE_SIZE || zero_byte_count2 > VALUE_SIZE) [[unlikely]]
throw Exception(ErrorCodes::CANNOT_DECOMPRESS, "Invalid compressed data");
throw Exception(ErrorCodes::CANNOT_DECOMPRESS, "Invalid zero byte count(s): {} and {}", zero_byte_count1, zero_byte_count2);
size_t tail_size1 = VALUE_SIZE - zero_byte_count1;
size_t tail_size2 = VALUE_SIZE - zero_byte_count2;
@ -424,7 +424,7 @@ private:
size_t expected_size = 0;
if (__builtin_add_overflow(tail_size1, tail_size2, &expected_size)
|| __builtin_add_overflow(expected_size, 1, &expected_size)) [[unlikely]]
throw Exception(ErrorCodes::CANNOT_DECOMPRESS, "Invalid compressed data");
throw Exception(ErrorCodes::CANNOT_DECOMPRESS, "Overflow occurred while calculating expected size");
if (bytes.size() < expected_size) [[unlikely]]
throw Exception(ErrorCodes::CANNOT_DECOMPRESS, "Unexpected end of encoded sequence");

View File

@ -140,6 +140,7 @@ class IColumn;
\
M(UInt64, alter_sync, 1, "Wait for actions to manipulate the partitions. 0 - do not wait, 1 - wait for execution only of itself, 2 - wait for everyone.", 0) ALIAS(replication_alter_partitions_sync) \
M(Int64, replication_wait_for_inactive_replica_timeout, 120, "Wait for inactive replica to execute ALTER/OPTIMIZE. Time in seconds, 0 - do not wait, negative - wait for unlimited time.", 0) \
M(Bool, alter_move_to_space_execute_async, false, "Execute ALTER TABLE MOVE ... TO [DISK|VOLUME] asynchronously", 0) \
\
M(LoadBalancing, load_balancing, LoadBalancing::RANDOM, "Which replicas (among healthy replicas) to preferably send a query to (on the first attempt) for distributed processing.", 0) \
M(UInt64, load_balancing_first_offset, 0, "Which replica to preferably send a query when FIRST_OR_RANDOM load balancing strategy is used.", 0) \
@ -364,16 +365,16 @@ class IColumn;
M(UInt64, max_bytes_to_read, 0, "Limit on read bytes (after decompression) from the most 'deep' sources. That is, only in the deepest subquery. When reading from a remote server, it is only checked on a remote server.", 0) \
M(OverflowMode, read_overflow_mode, OverflowMode::THROW, "What to do when the limit is exceeded.", 0) \
\
M(UInt64, max_rows_to_read_leaf, 0, "Limit on read rows on the leaf nodes for distributed queries. Limit is applied for local reads only excluding the final merge stage on the root node. Note, the setting is unstable with prefer_localhost_replica=1.", 0) \
M(UInt64, max_bytes_to_read_leaf, 0, "Limit on read bytes (after decompression) on the leaf nodes for distributed queries. Limit is applied for local reads only excluding the final merge stage on the root node. Note, the setting is unstable with prefer_localhost_replica=1.", 0) \
M(UInt64, max_rows_to_read_leaf, 0, "Limit on read rows on the leaf nodes for distributed queries. Limit is applied for local reads only, excluding the final merge stage on the root node. Note, the setting is unstable with prefer_localhost_replica=1.", 0) \
M(UInt64, max_bytes_to_read_leaf, 0, "Limit on read bytes (after decompression) on the leaf nodes for distributed queries. Limit is applied for local reads only, excluding the final merge stage on the root node. Note, the setting is unstable with prefer_localhost_replica=1.", 0) \
M(OverflowMode, read_overflow_mode_leaf, OverflowMode::THROW, "What to do when the leaf limit is exceeded.", 0) \
\
M(UInt64, max_rows_to_group_by, 0, "If aggregation during GROUP BY is generating more than specified number of rows (unique GROUP BY keys), the behavior will be determined by the 'group_by_overflow_mode' which by default is - throw an exception, but can be also switched to an approximate GROUP BY mode.", 0) \
M(UInt64, max_rows_to_group_by, 0, "If aggregation during GROUP BY is generating more than the specified number of rows (unique GROUP BY keys), the behavior will be determined by the 'group_by_overflow_mode' which by default is - throw an exception, but can be also switched to an approximate GROUP BY mode.", 0) \
M(OverflowModeGroupBy, group_by_overflow_mode, OverflowMode::THROW, "What to do when the limit is exceeded.", 0) \
M(UInt64, max_bytes_before_external_group_by, 0, "If memory usage during GROUP BY operation is exceeding this threshold in bytes, activate the 'external aggregation' mode (spill data to disk). Recommended value is half of available system memory.", 0) \
\
M(UInt64, max_rows_to_sort, 0, "If more than specified amount of records have to be processed for ORDER BY operation, the behavior will be determined by the 'sort_overflow_mode' which by default is - throw an exception", 0) \
M(UInt64, max_bytes_to_sort, 0, "If more than specified amount of (uncompressed) bytes have to be processed for ORDER BY operation, the behavior will be determined by the 'sort_overflow_mode' which by default is - throw an exception", 0) \
M(UInt64, max_rows_to_sort, 0, "If more than the specified amount of records have to be processed for ORDER BY operation, the behavior will be determined by the 'sort_overflow_mode' which by default is - throw an exception", 0) \
M(UInt64, max_bytes_to_sort, 0, "If more than the specified amount of (uncompressed) bytes have to be processed for ORDER BY operation, the behavior will be determined by the 'sort_overflow_mode' which by default is - throw an exception", 0) \
M(OverflowMode, sort_overflow_mode, OverflowMode::THROW, "What to do when the limit is exceeded.", 0) \
M(UInt64, max_bytes_before_external_sort, 0, "If memory usage during ORDER BY operation is exceeding this threshold in bytes, activate the 'external sorting' mode (spill data to disk). Recommended value is half of available system memory.", 0) \
M(UInt64, max_bytes_before_remerge_sort, 1000000000, "In case of ORDER BY with LIMIT, when memory usage is higher than specified threshold, perform additional steps of merging blocks before final merge to keep just top LIMIT rows.", 0) \
@ -384,8 +385,10 @@ class IColumn;
M(OverflowMode, result_overflow_mode, OverflowMode::THROW, "What to do when the limit is exceeded.", 0) \
\
/* TODO: Check also when merging and finalizing aggregate functions. */ \
M(Seconds, max_execution_time, 0, "If query run time exceeded the specified number of seconds, the behavior will be determined by the 'timeout_overflow_mode' which by default is - throw an exception. Note that the timeout is checked and query can stop only in designated places during data processing. It currently cannot stop during merging of aggregation states or during query analysis, and the actual run time will be higher than the value of this setting.", 0) \
M(Seconds, max_execution_time, 0, "If query runtime exceeds the specified number of seconds, the behavior will be determined by the 'timeout_overflow_mode', which by default is - throw an exception. Note that the timeout is checked and query can stop only in designated places during data processing. It currently cannot stop during merging of aggregation states or during query analysis, and the actual run time will be higher than the value of this setting.", 0) \
M(OverflowMode, timeout_overflow_mode, OverflowMode::THROW, "What to do when the limit is exceeded.", 0) \
M(Seconds, max_execution_time_leaf, 0, "Similar semantic to max_execution_time but only apply on leaf node for distributed queries, the time out behavior will be determined by 'timeout_overflow_mode_leaf' which by default is - throw an exception", 0) \
M(OverflowMode, timeout_overflow_mode_leaf, OverflowMode::THROW, "What to do when the leaf limit is exceeded.", 0) \
\
M(UInt64, min_execution_speed, 0, "Minimum number of execution rows per second.", 0) \
M(UInt64, max_execution_speed, 0, "Maximum number of execution rows per second.", 0) \
@ -399,7 +402,7 @@ class IColumn;
\
M(UInt64, max_sessions_for_user, 0, "Maximum number of simultaneous sessions for a user.", 0) \
\
M(UInt64, max_subquery_depth, 100, "If a query has more than specified number of nested subqueries, throw an exception. This allows you to have a sanity check to protect the users of your cluster from going insane with their queries.", 0) \
M(UInt64, max_subquery_depth, 100, "If a query has more than the specified number of nested subqueries, throw an exception. This allows you to have a sanity check to protect the users of your cluster from going insane with their queries.", 0) \
M(UInt64, max_analyze_depth, 5000, "Maximum number of analyses performed by interpreter.", 0) \
M(UInt64, max_ast_depth, 1000, "Maximum depth of query syntax tree. Checked after parsing.", 0) \
M(UInt64, max_ast_elements, 50000, "Maximum size of query syntax tree in number of nodes. Checked after parsing.", 0) \
@ -614,6 +617,8 @@ class IColumn;
M(Bool, mutations_execute_subqueries_on_initiator, false, "If true scalar subqueries are executed on initiator and replaced to literals in UPDATE and DELETE queries", 0) \
M(UInt64, mutations_max_literal_size_to_replace, 16384, "The maximum size of serialized literal in bytes to replace in UPDATE and DELETE queries", 0) \
\
M(Float, create_replicated_merge_tree_fault_injection_probability, 0.0f, "The probability of a fault injection during table creation after creating metadata in ZooKeeper", 0) \
\
M(Bool, use_query_cache, false, "Enable the query cache", 0) \
M(Bool, enable_writes_to_query_cache, true, "Enable storing results of SELECT queries in the query cache", 0) \
M(Bool, enable_reads_from_query_cache, true, "Enable reading results of SELECT queries from the query cache", 0) \

View File

@ -1083,12 +1083,14 @@ void DatabaseReplicated::recoverLostReplica(const ZooKeeperPtr & current_zookeep
}
LOG_INFO(log, "All tables are created successfully");
if (max_log_ptr_at_creation != 0)
chassert(max_log_ptr_at_creation || our_log_ptr);
UInt32 first_entry_to_mark_finished = new_replica ? max_log_ptr_at_creation : our_log_ptr;
if (first_entry_to_mark_finished)
{
/// If the replica is new and some of the queries applied during recovery
/// where issued after the replica was created, then other nodes might be
/// waiting for this node to notify them that the query was applied.
for (UInt32 ptr = max_log_ptr_at_creation; ptr <= max_log_ptr; ++ptr)
for (UInt32 ptr = first_entry_to_mark_finished; ptr <= max_log_ptr; ++ptr)
{
auto entry_name = DDLTaskBase::getLogEntryName(ptr);
auto path = fs::path(zookeeper_path) / "log" / entry_name / "finished" / getFullReplicaName();

View File

@ -17,6 +17,7 @@ enum class NumpyDataTypeIndex
UInt16,
UInt32,
UInt64,
Float16,
Float32,
Float64,
String,
@ -79,6 +80,7 @@ public:
{
switch (size)
{
case 2: type_index = NumpyDataTypeIndex::Float16; break;
case 4: type_index = NumpyDataTypeIndex::Float32; break;
case 8: type_index = NumpyDataTypeIndex::Float64; break;
default:

View File

@ -0,0 +1,94 @@
#if defined(SANITIZE_COVERAGE)
#include <DataTypes/DataTypeArray.h>
#include <DataTypes/DataTypesNumber.h>
#include <Columns/ColumnArray.h>
#include <Columns/ColumnVector.h>
#include <Columns/ColumnConst.h>
#include <Functions/FunctionFactory.h>
#include <Functions/IFunction.h>
#include <Interpreters/Context.h>
#include <base/coverage.h>
namespace DB
{
namespace
{
enum class Kind
{
Current,
All
};
/** If ClickHouse is build with coverage instrumentation, returns an array
* of currently accumulated (`coverage`) / all possible (`coverageAll`) unique code addresses.
*/
class FunctionCoverage : public IFunction
{
private:
Kind kind;
public:
String getName() const override
{
return kind == Kind::Current ? "coverage" : "coverageAll";
}
explicit FunctionCoverage(Kind kind_) : kind(kind_)
{
}
bool isSuitableForShortCircuitArgumentsExecution(const DataTypesWithConstInfo & /*arguments*/) const override
{
return false;
}
size_t getNumberOfArguments() const override
{
return 0;
}
bool isDeterministic() const override
{
return false;
}
DataTypePtr getReturnTypeImpl(const DataTypes & /*arguments*/) const override
{
return std::make_shared<DataTypeArray>(std::make_shared<DataTypeUInt64>());
}
ColumnPtr executeImpl(const ColumnsWithTypeAndName &, const DataTypePtr &, size_t input_rows_count) const override
{
auto coverage_table = kind == Kind::Current ? getCoverage() : getAllInstrumentedAddresses();
auto column_addresses = ColumnUInt64::create();
auto & data = column_addresses->getData();
for (auto ptr : coverage_table)
if (ptr)
data.push_back(ptr);
auto column_array = ColumnArray::create(
std::move(column_addresses),
ColumnArray::ColumnOffsets::create(1, data.size()));
return ColumnConst::create(std::move(column_array), input_rows_count);
}
};
}
REGISTER_FUNCTION(Coverage)
{
factory.registerFunction("coverage", [](ContextPtr){ return std::make_unique<FunctionToOverloadResolverAdaptor>(std::make_shared<FunctionCoverage>(Kind::Current)); });
factory.registerFunction("coverageAll", [](ContextPtr){ return std::make_unique<FunctionToOverloadResolverAdaptor>(std::make_shared<FunctionCoverage>(Kind::All)); });
}
}
#endif

View File

@ -664,11 +664,20 @@ ReturnType readDateTextFallback(LocalDate & date, ReadBuffer & buf);
template <typename ReturnType = void>
inline ReturnType readDateTextImpl(LocalDate & date, ReadBuffer & buf)
{
static constexpr bool throw_exception = std::is_same_v<ReturnType, void>;
/// Optimistic path, when whole value is in buffer.
if (!buf.eof() && buf.position() + 10 <= buf.buffer().end())
{
char * pos = buf.position();
auto error = [&]
{
if constexpr (throw_exception)
throw Exception(ErrorCodes::CANNOT_PARSE_DATE, "Cannot parse date here: {}", String(buf.position(), 10));
return ReturnType(false);
};
/// YYYY-MM-DD
/// YYYY-MM-D
/// YYYY-M-DD
@ -677,6 +686,9 @@ inline ReturnType readDateTextImpl(LocalDate & date, ReadBuffer & buf)
/// The delimiters can be arbitrary characters, like YYYY/MM!DD, but obviously not digits.
if (!isNumericASCII(pos[0]) || !isNumericASCII(pos[1]) || !isNumericASCII(pos[2]) || !isNumericASCII(pos[3]))
return error();
UInt16 year = (pos[0] - '0') * 1000 + (pos[1] - '0') * 100 + (pos[2] - '0') * 10 + (pos[3] - '0');
UInt8 month;
UInt8 day;
@ -685,12 +697,18 @@ inline ReturnType readDateTextImpl(LocalDate & date, ReadBuffer & buf)
if (isNumericASCII(pos[-1]))
{
/// YYYYMMDD
if (!isNumericASCII(pos[0]) || !isNumericASCII(pos[1]) || !isNumericASCII(pos[2]))
return error();
month = (pos[-1] - '0') * 10 + (pos[0] - '0');
day = (pos[1] - '0') * 10 + (pos[2] - '0');
pos += 3;
}
else
{
if (!isNumericASCII(pos[0]))
return error();
month = pos[0] - '0';
if (isNumericASCII(pos[1]))
{
@ -700,8 +718,8 @@ inline ReturnType readDateTextImpl(LocalDate & date, ReadBuffer & buf)
else
pos += 2;
if (isNumericASCII(pos[-1]))
return ReturnType(false);
if (isNumericASCII(pos[-1]) || !isNumericASCII(pos[0]))
return error();
day = pos[0] - '0';
if (isNumericASCII(pos[1]))

View File

@ -1,20 +1,21 @@
#include <Interpreters/ClusterProxy/SelectStreamFactory.h>
#include <Interpreters/Cluster.h>
#include <Storages/StorageReplicatedMergeTree.h>
#include <Storages/VirtualColumnUtils.h>
#include <Common/Exception.h>
#include <Common/ProfileEvents.h>
#include <Common/checkStackSize.h>
#include <Common/logger_useful.h>
#include <Common/FailPoint.h>
#include <TableFunctions/TableFunctionFactory.h>
#include <IO/ConnectionTimeouts.h>
#include <Interpreters/ClusterProxy/SelectStreamFactory.h>
#include <Interpreters/Cluster.h>
#include <Interpreters/AddDefaultDatabaseVisitor.h>
#include <Interpreters/RequiredSourceColumnsVisitor.h>
#include <Interpreters/TranslateQualifiedNamesVisitor.h>
#include <DataTypes/ObjectUtils.h>
#include <Client/IConnections.h>
#include <Common/logger_useful.h>
#include <Common/FailPoint.h>
#include <Parsers/ASTSelectQuery.h>
#include <Parsers/ASTSetQuery.h>
#include <Processors/QueryPlan/QueryPlan.h>
#include <Processors/QueryPlan/ReadFromRemote.h>
#include <Processors/QueryPlan/ExpressionStep.h>
@ -22,6 +23,7 @@
#include <Processors/QueryPlan/DistributedCreateLocalPlan.h>
#include <Processors/QueryPlan/Optimizations/QueryPlanOptimizationSettings.h>
namespace ProfileEvents
{
extern const Event DistributedConnectionMissingTable;
@ -121,6 +123,7 @@ void SelectStreamFactory::createForShard(
if (it != objects_by_shard.end())
replaceMissedSubcolumnsByConstants(storage_snapshot->object_columns, it->second, query_ast);
auto emplace_local_stream = [&]()
{
local_plans.emplace_back(createLocalPlan(

View File

@ -141,6 +141,14 @@ ContextMutablePtr updateSettingsForCluster(const Cluster & cluster,
new_settings.allow_experimental_parallel_reading_from_replicas = false;
}
if (settings.max_execution_time_leaf.value > 0)
{
/// Replace 'max_execution_time' of this sub-query with 'max_execution_time_leaf' and 'timeout_overflow_mode'
/// with 'timeout_overflow_mode_leaf'
new_settings.max_execution_time = settings.max_execution_time_leaf;
new_settings.timeout_overflow_mode = settings.timeout_overflow_mode_leaf;
}
auto new_context = Context::createCopy(context);
new_context->setSettings(new_settings);
return new_context;

View File

@ -1448,6 +1448,21 @@ bool InterpreterCreateQuery::doCreateTable(ASTCreateQuery & create,
"ATTACH ... FROM ... query is not supported for {} table engine, "
"because such tables do not store any data on disk. Use CREATE instead.", res->getName());
auto * replicated_storage = typeid_cast<StorageReplicatedMergeTree *>(res.get());
if (replicated_storage)
{
const auto probability = getContext()->getSettingsRef().create_replicated_merge_tree_fault_injection_probability;
std::bernoulli_distribution fault(probability);
if (fault(thread_local_rng))
{
/// We emulate the case when the exception was thrown in StorageReplicatedMergeTree constructor
if (!create.attach)
replicated_storage->dropIfEmpty();
throw Coordination::Exception(Coordination::Error::ZCONNECTIONLOSS, "Fault injected (during table creation)");
}
}
database->createTable(getContext(), create.getTable(), res, query_ptr);
/// Move table data to the proper place. Wo do not move data earlier to avoid situations

View File

@ -57,6 +57,7 @@
#include <Parsers/ASTSystemQuery.h>
#include <Parsers/ASTCreateQuery.h>
#include <Common/ThreadFuzzer.h>
#include <base/coverage.h>
#include <csignal>
#include <algorithm>
#include <unistd.h>
@ -687,6 +688,12 @@ BlockIO InterpreterSystemQuery::execute()
FailPointInjection::disableFailPoint(query.fail_point_name);
break;
}
case Type::RESET_COVERAGE:
{
getContext()->checkAccess(AccessType::SYSTEM);
resetCoverage();
break;
}
default:
throw Exception(ErrorCodes::BAD_ARGUMENTS, "Unknown type of SYSTEM query");
}
@ -1301,6 +1308,7 @@ AccessRightsElements InterpreterSystemQuery::getRequiredAccessForDDLOnCluster()
case Type::START_THREAD_FUZZER:
case Type::ENABLE_FAILPOINT:
case Type::DISABLE_FAILPOINT:
case Type::RESET_COVERAGE:
case Type::UNKNOWN:
case Type::END: break;
}

View File

@ -53,6 +53,18 @@ bool PredicateExpressionsOptimizer::optimize(ASTSelectQuery & select_query)
return false;
}
static bool hasInputTableFunction(const ASTPtr & expr)
{
if (const auto * func = typeid_cast<const ASTFunction *>(expr.get()); func && func->name == "input")
return true;
for (const auto & child : expr->children)
if (hasInputTableFunction(child))
return true;
return false;
}
std::vector<ASTs> PredicateExpressionsOptimizer::extractTablesPredicates(const ASTPtr & where, const ASTPtr & prewhere)
{
std::vector<ASTs> tables_predicates(tables_with_columns.size());
@ -72,6 +84,11 @@ std::vector<ASTs> PredicateExpressionsOptimizer::extractTablesPredicates(const A
return {}; /// Not optimized when predicate contains stateful function or indeterministic function or window functions
}
/// Skip predicate like `... IN (SELECT ... FROM input())` because
/// it can be duplicated but we can't execute `input()` twice.
if (hasInputTableFunction(predicate_expression))
return {};
if (!expression_info.is_array_join)
{
if (expression_info.unique_reference_tables_pos.size() == 1)

View File

@ -86,6 +86,7 @@ public:
START_PULLING_REPLICATION_LOG,
STOP_CLEANUP,
START_CLEANUP,
RESET_COVERAGE,
END
};

View File

@ -448,14 +448,14 @@ bool ParserSystemQuery::parseImpl(IParser::Pos & pos, ASTPtr & node, Expected &
}
case Type::DROP_FORMAT_SCHEMA_CACHE:
{
if (ParserKeyword{"FOR"}.ignore(pos, expected))
{
if (ParserKeyword{"Protobuf"}.ignore(pos, expected))
res->schema_cache_format = "Protobuf";
else
return false;
}
break;
if (ParserKeyword{"FOR"}.ignore(pos, expected))
{
if (ParserKeyword{"Protobuf"}.ignore(pos, expected))
res->schema_cache_format = "Protobuf";
else
return false;
}
break;
}
case Type::UNFREEZE:
{

View File

@ -436,8 +436,8 @@ QueryTreeNodePtr buildSubqueryToReadColumnsFromTableExpression(const NamesAndTyp
auto query_node = std::make_shared<QueryNode>(std::move(context_copy));
query_node->resolveProjectionColumns(projection_columns);
query_node->getProjection().getNodes() = std::move(subquery_projection_nodes);
query_node->resolveProjectionColumns(projection_columns);
query_node->getJoinTree() = table_expression;
query_node->setIsSubquery(true);

View File

@ -1,20 +1,16 @@
#include <cmath>
#include <string>
#include <vector>
#include <Processors/Formats/Impl/NpyRowInputFormat.h>
#include <DataTypes/DataTypeString.h>
#include <Common/assert_cast.h>
#include <Common/Exception.h>
#include <DataTypes/DataTypeArray.h>
#include <DataTypes/DataTypesNumber.h>
#include <Formats/FormatFactory.h>
#include <Formats/NumpyDataTypes.h>
#include <Columns/ColumnFixedString.h>
#include <Columns/ColumnString.h>
#include <Columns/ColumnArray.h>
#include <Columns/ColumnsNumber.h>
#include <DataTypes/IDataType.h>
#include <IO/ReadBuffer.h>
#include <Processors/Formats/IRowInputFormat.h>
#include <boost/algorithm/string/split.hpp>
#include <IO/ReadBufferFromString.h>
@ -34,6 +30,46 @@ namespace ErrorCodes
namespace
{
float convertFloat16ToFloat32(uint16_t float16_value)
{
uint16_t sign = (float16_value >> 15) & 0x1;
uint16_t exponent = (float16_value >> 10) & 0x1F;
uint16_t fraction = float16_value & 0x3FF;
if (exponent == 0 && fraction == 0)
{
uint32_t float32_value = sign << 31;
return std::bit_cast<float>(float32_value);
}
// Handling special cases for exponent
if (exponent == 0x1F)
{
// NaN or Infinity in float16
return (fraction == 0) ? std::numeric_limits<float>::infinity() : std::numeric_limits<float>::quiet_NaN();
}
// Convert exponent from float16 to float32 format
int32_t new_exponent = static_cast<int32_t>(exponent) - 15 + 127;
// Constructing the float32 representation
uint32_t float32_value = (static_cast<uint32_t>(sign) << 31) |
(static_cast<uint32_t>(new_exponent) << 23) |
(static_cast<uint32_t>(fraction) << 13);
// Interpret the binary representation as a float
float result;
std::memcpy(&result, &float32_value, sizeof(float));
// Determine decimal places dynamically based on the magnitude of the number
int decimal_places = std::max(0, 6 - static_cast<int>(std::log10(std::abs(result))));
// Truncate the decimal part to the determined number of decimal places
float multiplier = static_cast<float>(std::pow(10.0f, decimal_places));
result = std::round(result * multiplier) / multiplier;
return result;
}
DataTypePtr getDataTypeFromNumpyType(const std::shared_ptr<NumpyDataType> & numpy_type)
{
switch (numpy_type->getTypeIndex())
@ -54,6 +90,8 @@ DataTypePtr getDataTypeFromNumpyType(const std::shared_ptr<NumpyDataType> & nump
return std::make_shared<DataTypeUInt32>();
case NumpyDataTypeIndex::UInt64:
return std::make_shared<DataTypeUInt64>();
case NumpyDataTypeIndex::Float16:
return std::make_shared<DataTypeFloat32>();
case NumpyDataTypeIndex::Float32:
return std::make_shared<DataTypeFloat32>();
case NumpyDataTypeIndex::Float64:
@ -265,6 +303,17 @@ NpyRowInputFormat::NpyRowInputFormat(ReadBuffer & in_, Block header_, Params par
nested_type = getNestedType(types[0]);
}
size_t NpyRowInputFormat::countRows(size_t max_block_size)
{
size_t count;
if (counted_rows + max_block_size <= size_t(header.shape[0]))
count = max_block_size;
else
count = header.shape[0] - counted_rows;
counted_rows += count;
return count;
}
template <typename ColumnValue, typename DataValue>
void NpyRowInputFormat::readBinaryValueAndInsert(MutableColumnPtr column, NumpyDataType::Endianness endianness)
{
@ -273,7 +322,18 @@ void NpyRowInputFormat::readBinaryValueAndInsert(MutableColumnPtr column, NumpyD
readBinaryBigEndian(value, *in);
else
readBinaryLittleEndian(value, *in);
assert_cast<ColumnVector<ColumnValue> &>(*column).insertValue(static_cast<ColumnValue>(value));
assert_cast<ColumnVector<ColumnValue> &>(*column).insertValue((static_cast<ColumnValue>(value)));
}
template <typename ColumnValue>
void NpyRowInputFormat::readBinaryValueAndInsertFloat16(MutableColumnPtr column, NumpyDataType::Endianness endianness)
{
uint16_t value;
if (endianness == NumpyDataType::Endianness::BIG)
readBinaryBigEndian(value, *in);
else
readBinaryLittleEndian(value, *in);
assert_cast<ColumnVector<ColumnValue> &>(*column).insertValue(static_cast<ColumnValue>(convertFloat16ToFloat32(value)));
}
template <typename T>
@ -300,6 +360,7 @@ void NpyRowInputFormat::readAndInsertFloat(IColumn * column, const DataTypePtr &
{
switch (npy_type.getTypeIndex())
{
case NumpyDataTypeIndex::Float16: readBinaryValueAndInsertFloat16<T>(column->getPtr(), npy_type.getEndianness()); break;
case NumpyDataTypeIndex::Float32: readBinaryValueAndInsert<T, Float32>(column->getPtr(), npy_type.getEndianness()); break;
case NumpyDataTypeIndex::Float64: readBinaryValueAndInsert<T, Float64>(column->getPtr(), npy_type.getEndianness()); break;
default:
@ -395,13 +456,18 @@ NpySchemaReader::NpySchemaReader(ReadBuffer & in_)
NamesAndTypesList NpySchemaReader::readSchema()
{
NumpyHeader header = parseHeader(in);
header = parseHeader(in);
DataTypePtr nested_type = getDataTypeFromNumpyType(header.numpy_type);
DataTypePtr result_type = createNestedArrayType(nested_type, header.shape.size());
return {{"array", result_type}};
}
std::optional<size_t> NpySchemaReader::readNumberOrRows()
{
return header.shape[0];
}
void registerInputFormatNpy(FormatFactory & factory)
{
factory.registerInputFormat("Npy", [](

View File

@ -29,6 +29,9 @@ public:
String getName() const override { return "NpyRowInputFormat"; }
private:
bool supportsCountRows() const override { return true; }
size_t countRows(size_t max_block_size) override;
void readPrefix() override;
bool readRow(MutableColumns & columns, RowReadExtension &) override;
void readData(MutableColumns & columns);
@ -45,12 +48,16 @@ private:
template <typename ColumnValue, typename DataValue>
void readBinaryValueAndInsert(MutableColumnPtr column, NumpyDataType::Endianness endianness);
template <typename ColumnValue>
void readBinaryValueAndInsertFloat16(MutableColumnPtr column, NumpyDataType::Endianness endianness);
void readRows(MutableColumns & columns);
void readValue(IColumn * column);
DataTypePtr nested_type;
NumpyHeader header;
size_t counted_rows = 0;
};
class NpySchemaReader : public ISchemaReader
@ -59,7 +66,9 @@ public:
explicit NpySchemaReader(ReadBuffer & in_);
private:
std::optional<size_t> readNumberOrRows() override;
NamesAndTypesList readSchema() override;
NumpyHeader header;
};
}

View File

@ -75,59 +75,6 @@ namespace ErrorCodes
}
namespace
{
/// Forward-declare to use in expandSelector()
std::vector<StorageHDFS::PathWithInfo> LSWithRegexpMatching(const String & path_for_ls,
const HDFSFSPtr & fs,
const String & for_match);
/// Process {a,b,c...} globs separately: don't match it against regex, but generate a,b,c strings instead.
std::vector<StorageHDFS::PathWithInfo> expandSelector(const String & path_for_ls,
const HDFSFSPtr & fs,
const String & for_match)
{
std::vector<size_t> anchor_positions = {};
bool opened = false, closed = false;
for (std::string::const_iterator it = for_match.begin(); it != for_match.end(); it++)
{
if (*it == '{')
{
anchor_positions.push_back(std::distance(for_match.begin(), it));
opened = true;
}
else if (*it == '}')
{
anchor_positions.push_back(std::distance(for_match.begin(), it));
closed = true;
break;
}
else if (*it == ',')
{
if (!opened)
throw Exception(ErrorCodes::BAD_ARGUMENTS,
"Unexpected ''' found in path '{}' at position {}.", for_match, std::distance(for_match.begin(), it));
anchor_positions.push_back(std::distance(for_match.begin(), it));
}
}
if (!opened || !closed)
throw Exception(ErrorCodes::BAD_ARGUMENTS,
"Invalid {{}} glob in path {}.", for_match);
std::vector<StorageHDFS::PathWithInfo> ret = {};
std::string common_prefix = for_match.substr(0, anchor_positions[0]);
std::string common_suffix = for_match.substr(anchor_positions[anchor_positions.size()-1] + 1);
for (size_t i = 1; i < anchor_positions.size(); ++i)
{
std::string expanded_matcher = common_prefix
+ for_match.substr(anchor_positions[i-1] + 1, (anchor_positions[i] - anchor_positions[i-1] - 1))
+ common_suffix;
std::vector<StorageHDFS::PathWithInfo> result_part = LSWithRegexpMatching(path_for_ls, fs, expanded_matcher);
ret.insert(ret.end(), result_part.begin(), result_part.end());
}
return ret;
}
/* Recursive directory listing with matched paths as a result.
* Have the same method in StorageFile.
*/
@ -136,20 +83,24 @@ namespace
const HDFSFSPtr & fs,
const String & for_match)
{
/// regexp for {expr1,expr2,expr3} or {M..N}, where M and N - non-negative integers, expr's should be without "{", "}", "*" and ","
static const re2::RE2 enum_or_range(R"({([\d]+\.\.[\d]+|[^{}*,]+,[^{}*]*[^{}*,])})");
std::string_view for_match_view(for_match);
std::string_view matched;
if (RE2::FindAndConsume(&for_match_view, enum_or_range, &matched))
{
std::string buffer(matched);
if (buffer.find(',') != std::string::npos)
return expandSelector(path_for_ls, fs, for_match);
}
std::vector<StorageHDFS::PathWithInfo> result;
const size_t first_glob_pos = for_match.find_first_of("*?{");
if (first_glob_pos == std::string::npos)
{
const String path = fs::path(path_for_ls + for_match.substr(1)).lexically_normal();
HDFSFileInfo ls;
ls.file_info = hdfsGetPathInfo(fs.get(), path.c_str());
if (ls.file_info != nullptr) // NOLINT
{
result.push_back(StorageHDFS::PathWithInfo{
String(path),
StorageHDFS::PathInfo{ls.file_info->mLastMod, static_cast<size_t>(ls.file_info->mSize)}});
}
return result;
}
const size_t end_of_path_without_globs = for_match.substr(0, first_glob_pos).rfind('/');
const String suffix_with_globs = for_match.substr(end_of_path_without_globs); /// begin with '/'
const String prefix_without_globs = path_for_ls + for_match.substr(1, end_of_path_without_globs); /// ends with '/'
@ -171,7 +122,7 @@ namespace
throw Exception(
ErrorCodes::ACCESS_DENIED, "Cannot list directory {}: {}", prefix_without_globs, String(hdfsGetLastError()));
}
std::vector<StorageHDFS::PathWithInfo> result;
if (!ls.file_info && ls.length > 0)
throw Exception(ErrorCodes::LOGICAL_ERROR, "file_info shouldn't be null");
for (int i = 0; i < ls.length; ++i)
@ -222,7 +173,15 @@ namespace
HDFSBuilderWrapper builder = createHDFSBuilder(uri_without_path + "/", context->getGlobalContext()->getConfigRef());
HDFSFSPtr fs = createHDFSFS(builder.get());
auto res = LSWithRegexpMatching("/", fs, path_from_uri);
Strings paths = expandSelectionGlob(path_from_uri);
std::vector<StorageHDFS::PathWithInfo> res;
for (const auto & path : paths)
{
auto part_of_res = LSWithRegexpMatching("/", fs, path);
res.insert(res.end(), part_of_res.begin(), part_of_res.end());
}
return res;
}

View File

@ -67,10 +67,11 @@ void BackgroundJobsAssignee::scheduleFetchTask(ExecutableTaskPtr fetch_task)
}
void BackgroundJobsAssignee::scheduleMoveTask(ExecutableTaskPtr move_task)
bool BackgroundJobsAssignee::scheduleMoveTask(ExecutableTaskPtr move_task)
{
bool res = getContext()->getMovesExecutor()->trySchedule(move_task);
res ? trigger() : postpone();
return res;
}

View File

@ -67,7 +67,7 @@ public:
bool scheduleMergeMutateTask(ExecutableTaskPtr merge_task);
void scheduleFetchTask(ExecutableTaskPtr fetch_task);
void scheduleMoveTask(ExecutableTaskPtr move_task);
bool scheduleMoveTask(ExecutableTaskPtr move_task);
void scheduleCommonTask(ExecutableTaskPtr common_task, bool need_trigger);
/// Just call finish

View File

@ -192,6 +192,7 @@ namespace ErrorCodes
extern const int NOT_INITIALIZED;
extern const int SERIALIZATION_ERROR;
extern const int TOO_MANY_MUTATIONS;
extern const int CANNOT_SCHEDULE_TASK;
}
static void checkSuspiciousIndices(const ASTFunction * index_function)
@ -4836,17 +4837,36 @@ void MergeTreeData::movePartitionToDisk(const ASTPtr & partition, const String &
throw Exception(ErrorCodes::UNKNOWN_DISK, "All parts of partition '{}' are already on disk '{}'", partition_id, disk->getName());
}
MovePartsOutcome moves_outcome = movePartsToSpace(parts, std::static_pointer_cast<Space>(disk), local_context->getReadSettings(), local_context->getWriteSettings());
switch (moves_outcome)
if (parts_mover.moves_blocker.isCancelled())
throw Exception(ErrorCodes::ABORTED, "Cannot move parts because moves are manually disabled");
auto moving_tagger = checkPartsForMove(parts, std::static_pointer_cast<Space>(disk));
if (moving_tagger->parts_to_move.empty())
throw Exception(ErrorCodes::NO_SUCH_DATA_PART, "No parts to move are found in partition {}", partition_id);
const auto & query_settings = local_context->getSettingsRef();
std::future<MovePartsOutcome> moves_future = movePartsToSpace(moving_tagger, local_context->getReadSettings(), local_context->getWriteSettings(), query_settings.alter_move_to_space_execute_async);
if (query_settings.alter_move_to_space_execute_async && moves_future.wait_for(std::chrono::seconds(0)) != std::future_status::ready)
{
case MovePartsOutcome::MovesAreCancelled:
throw Exception(ErrorCodes::ABORTED, "Cannot move parts because moves are manually disabled");
case MovePartsOutcome::NothingToMove:
throw Exception(ErrorCodes::NO_SUCH_DATA_PART, "No parts to move are found in partition {}", partition_id);
case MovePartsOutcome::MoveWasPostponedBecauseOfZeroCopy:
throw Exception(ErrorCodes::PART_IS_TEMPORARILY_LOCKED, "Move was not finished, because zero copy mode is enabled and someone other is moving the same parts right now");
case MovePartsOutcome::PartsMoved:
break;
return;
}
else
{
auto moves_outcome = moves_future.get();
switch (moves_outcome)
{
case MovePartsOutcome::MovesAreCancelled:
throw Exception(ErrorCodes::ABORTED, "Cannot move parts because moves are manually disabled");
case MovePartsOutcome::NothingToMove:
throw Exception(ErrorCodes::NO_SUCH_DATA_PART, "No parts to move are found in partition {}", partition_id);
case MovePartsOutcome::MoveWasPostponedBecauseOfZeroCopy:
throw Exception(ErrorCodes::PART_IS_TEMPORARILY_LOCKED, "Move was not finished, because zero copy mode is enabled and someone other is moving the same parts right now");
case MovePartsOutcome::CannotScheduleMove:
throw Exception(ErrorCodes::CANNOT_SCHEDULE_TASK, "Cannot schedule move, no free threads, try to wait until all in-progress move finish or increase <background_move_pool_size>");
case MovePartsOutcome::PartsMoved:
break;
}
}
}
@ -4899,17 +4919,36 @@ void MergeTreeData::movePartitionToVolume(const ASTPtr & partition, const String
throw Exception(ErrorCodes::UNKNOWN_DISK, "All parts of partition '{}' are already on volume '{}'", partition_id, volume->getName());
}
MovePartsOutcome moves_outcome = movePartsToSpace(parts, std::static_pointer_cast<Space>(volume), local_context->getReadSettings(), local_context->getWriteSettings());
switch (moves_outcome)
if (parts_mover.moves_blocker.isCancelled())
throw Exception(ErrorCodes::ABORTED, "Cannot move parts because moves are manually disabled");
auto moving_tagger = checkPartsForMove(parts, std::static_pointer_cast<Space>(volume));
if (moving_tagger->parts_to_move.empty())
throw Exception(ErrorCodes::NO_SUCH_DATA_PART, "No parts to move are found in partition {}", partition_id);
const auto & query_settings = local_context->getSettingsRef();
std::future<MovePartsOutcome> moves_future = movePartsToSpace(moving_tagger, local_context->getReadSettings(), local_context->getWriteSettings(), query_settings.alter_move_to_space_execute_async);
if (query_settings.alter_move_to_space_execute_async && moves_future.wait_for(std::chrono::seconds(0)) != std::future_status::ready)
{
case MovePartsOutcome::MovesAreCancelled:
throw Exception(ErrorCodes::ABORTED, "Cannot move parts because moves are manually disabled");
case MovePartsOutcome::NothingToMove:
throw Exception(ErrorCodes::NO_SUCH_DATA_PART, "No parts to move are found in partition {}", partition_id);
case MovePartsOutcome::MoveWasPostponedBecauseOfZeroCopy:
throw Exception(ErrorCodes::PART_IS_TEMPORARILY_LOCKED, "Move was not finished, because zero copy mode is enabled and someone other is moving the same parts right now");
case MovePartsOutcome::PartsMoved:
break;
return;
}
else
{
auto moves_outcome = moves_future.get();
switch (moves_outcome)
{
case MovePartsOutcome::MovesAreCancelled:
throw Exception(ErrorCodes::ABORTED, "Cannot move parts because moves are manually disabled");
case MovePartsOutcome::NothingToMove:
throw Exception(ErrorCodes::NO_SUCH_DATA_PART, "No parts to move are found in partition {}", partition_id);
case MovePartsOutcome::MoveWasPostponedBecauseOfZeroCopy:
throw Exception(ErrorCodes::PART_IS_TEMPORARILY_LOCKED, "Move was not finished, because zero copy mode is enabled and someone other is moving the same parts right now");
case MovePartsOutcome::CannotScheduleMove:
throw Exception(ErrorCodes::CANNOT_SCHEDULE_TASK, "Cannot schedule move, no free threads, try to wait until all in-progress move finish or increase <background_move_pool_size>");
case MovePartsOutcome::PartsMoved:
break;
}
}
}
@ -7480,16 +7519,33 @@ bool MergeTreeData::areBackgroundMovesNeeded() const
return policy->getVolumes().size() == 1 && policy->getVolumes()[0]->getDisks().size() > 1;
}
MovePartsOutcome MergeTreeData::movePartsToSpace(const DataPartsVector & parts, SpacePtr space, const ReadSettings & read_settings, const WriteSettings & write_settings)
std::future<MovePartsOutcome> MergeTreeData::movePartsToSpace(const CurrentlyMovingPartsTaggerPtr & moving_tagger, const ReadSettings & read_settings, const WriteSettings & write_settings, bool async)
{
if (parts_mover.moves_blocker.isCancelled())
return MovePartsOutcome::MovesAreCancelled;
auto finish_move_promise = std::make_shared<std::promise<MovePartsOutcome>>();
auto finish_move_future = finish_move_promise->get_future();
auto moving_tagger = checkPartsForMove(parts, space);
if (moving_tagger->parts_to_move.empty())
return MovePartsOutcome::NothingToMove;
if (async)
{
bool is_scheduled = background_moves_assignee.scheduleMoveTask(std::make_shared<ExecutableLambdaAdapter>(
[this, finish_move_promise, moving_tagger, read_settings, write_settings] () mutable
{
auto outcome = moveParts(moving_tagger, read_settings, write_settings, /* wait_for_move_if_zero_copy= */ true);
return moveParts(moving_tagger, read_settings, write_settings, /* wait_for_move_if_zero_copy= */ true);
finish_move_promise->set_value(outcome);
return outcome == MovePartsOutcome::PartsMoved;
}, moves_assignee_trigger, getStorageID()));
if (!is_scheduled)
finish_move_promise->set_value(MovePartsOutcome::CannotScheduleMove);
}
else
{
auto outcome = moveParts(moving_tagger, read_settings, write_settings, /* wait_for_move_if_zero_copy= */ true);
finish_move_promise->set_value(outcome);
}
return finish_move_future;
}
MergeTreeData::CurrentlyMovingPartsTaggerPtr MergeTreeData::selectPartsForMove()

View File

@ -1359,8 +1359,6 @@ protected:
/// method has different implementations for replicated and non replicated
/// MergeTree because they store mutations in different way.
virtual std::map<int64_t, MutationCommands> getAlterMutationCommandsForPart(const DataPartPtr & part) const = 0;
/// Moves part to specified space, used in ALTER ... MOVE ... queries
MovePartsOutcome movePartsToSpace(const DataPartsVector & parts, SpacePtr space, const ReadSettings & read_settings, const WriteSettings & write_settings);
struct PartBackupEntries
{
@ -1513,6 +1511,9 @@ private:
using CurrentlyMovingPartsTaggerPtr = std::shared_ptr<CurrentlyMovingPartsTagger>;
/// Moves part to specified space, used in ALTER ... MOVE ... queries
std::future<MovePartsOutcome> movePartsToSpace(const CurrentlyMovingPartsTaggerPtr & moving_tagger, const ReadSettings & read_settings, const WriteSettings & write_settings, bool async);
/// Move selected parts to corresponding disks
MovePartsOutcome moveParts(const CurrentlyMovingPartsTaggerPtr & moving_tagger, const ReadSettings & read_settings, const WriteSettings & write_settings, bool wait_for_move_if_zero_copy);

View File

@ -18,6 +18,7 @@ enum class MovePartsOutcome
NothingToMove,
MovesAreCancelled,
MoveWasPostponedBecauseOfZeroCopy,
CannotScheduleMove,
};
/// Active part from storage and destination reservation where it has to be moved

View File

@ -106,60 +106,6 @@ namespace ErrorCodes
namespace
{
/// Forward-declare to use in expandSelector()
void listFilesWithRegexpMatchingImpl(
const std::string & path_for_ls,
const std::string & for_match,
size_t & total_bytes_to_read,
std::vector<std::string> & result,
bool recursive = false);
/// Process {a,b,c...} globs separately: don't match it against regex, but generate a,b,c strings instead.
void expandSelector(const std::string & path_for_ls,
const std::string & for_match,
size_t & total_bytes_to_read,
std::vector<std::string> & result,
bool recursive)
{
std::vector<size_t> anchor_positions = {};
bool opened = false, closed = false;
for (std::string::const_iterator it = for_match.begin(); it != for_match.end(); it++)
{
if (*it == '{')
{
anchor_positions.push_back(std::distance(for_match.begin(), it));
opened = true;
}
else if (*it == '}')
{
anchor_positions.push_back(std::distance(for_match.begin(), it));
closed = true;
break;
}
else if (*it == ',')
{
if (!opened)
throw Exception(ErrorCodes::BAD_ARGUMENTS,
"Unexpected ''' found in path '{}' at position {}.", for_match, std::distance(for_match.begin(), it));
anchor_positions.push_back(std::distance(for_match.begin(), it));
}
}
if (!opened || !closed)
throw Exception(ErrorCodes::BAD_ARGUMENTS,
"Invalid {{}} glob in path {}.", for_match);
std::string common_prefix = for_match.substr(0, anchor_positions[0]);
std::string common_suffix = for_match.substr(anchor_positions[anchor_positions.size()-1] + 1);
for (size_t i = 1; i < anchor_positions.size(); ++i)
{
std::string expanded_matcher = common_prefix
+ for_match.substr(anchor_positions[i-1] + 1, (anchor_positions[i] - anchor_positions[i-1] - 1))
+ common_suffix;
listFilesWithRegexpMatchingImpl(path_for_ls, expanded_matcher, total_bytes_to_read, result, recursive);
}
}
/* Recursive directory listing with matched paths as a result.
* Have the same method in StorageHDFS.
*/
@ -170,23 +116,23 @@ void listFilesWithRegexpMatchingImpl(
std::vector<std::string> & result,
bool recursive)
{
/// regexp for {expr1,expr2,expr3} or {M..N}, where M and N - non-negative integers, expr's should be without "{", "}", "*" and ","
static const re2::RE2 enum_or_range(R"({([\d]+\.\.[\d]+|[^{}*,]+,[^{}*]*[^{}*,])})");
std::string_view for_match_view(for_match);
std::string_view matched;
if (RE2::FindAndConsume(&for_match_view, enum_or_range, &matched))
{
std::string buffer(matched);
if (buffer.find(',') != std::string::npos)
{
expandSelector(path_for_ls, for_match, total_bytes_to_read, result, recursive);
return;
}
}
const size_t first_glob_pos = for_match.find_first_of("*?{");
if (first_glob_pos == std::string::npos)
{
try
{
fs::path path = fs::canonical(path_for_ls + for_match);
result.push_back(path.string());
}
catch (const std::exception &) // NOLINT
{
/// There is no such file, but we just ignore this.
/// throw Exception(ErrorCodes::FILE_DOESNT_EXIST, "File {} doesn't exist", for_match);
}
return;
}
const size_t end_of_path_without_globs = for_match.substr(0, first_glob_pos).rfind('/');
const std::string suffix_with_globs = for_match.substr(end_of_path_without_globs); /// begin with '/'
@ -201,7 +147,7 @@ void listFilesWithRegexpMatchingImpl(
throw Exception(ErrorCodes::CANNOT_COMPILE_REGEXP,
"Cannot compile regex from glob ({}): {}", for_match, matcher.error());
bool skip_regex = current_glob == "/*" ? true : false;
bool skip_regex = current_glob == "/*";
if (!recursive)
recursive = current_glob == "/**" ;
@ -239,18 +185,22 @@ void listFilesWithRegexpMatchingImpl(
else if (looking_for_directory && re2::RE2::FullMatch(file_name, matcher))
/// Recursion depth is limited by pattern. '*' works only for depth = 1, for depth = 2 pattern path is '*/*'. So we do not need additional check.
listFilesWithRegexpMatchingImpl(fs::path(full_path) / "", suffix_with_globs.substr(next_slash_after_glob_pos),
total_bytes_to_read, result);
total_bytes_to_read, result, false);
}
}
}
std::vector<std::string> listFilesWithRegexpMatching(
const std::string & path_for_ls,
const std::string & for_match,
size_t & total_bytes_to_read)
{
std::vector<std::string> result;
listFilesWithRegexpMatchingImpl(path_for_ls, for_match, total_bytes_to_read, result);
Strings for_match_paths_expanded = expandSelectionGlob(for_match);
for (const auto & for_match_expanded : for_match_paths_expanded)
listFilesWithRegexpMatchingImpl("/", for_match_expanded, total_bytes_to_read, result, false);
return result;
}
@ -415,7 +365,7 @@ Strings StorageFile::getPathsList(const String & table_path, const String & user
else
{
/// We list only non-directory files.
paths = listFilesWithRegexpMatching("/", path, total_bytes_to_read);
paths = listFilesWithRegexpMatching(path, total_bytes_to_read);
can_be_directory = false;
}

View File

@ -5,8 +5,11 @@
#include <memory>
#include <Processors/ISource.h>
#include <Processors/Sources/ThrowingExceptionSource.h>
#include <Processors/QueryPlan/ISourceStep.h>
#include <Processors/QueryPlan/QueryPlan.h>
#include <QueryPipeline/Pipe.h>
#include <QueryPipeline/QueryPipelineBuilder.h>
namespace DB
{
@ -14,6 +17,7 @@ namespace DB
namespace ErrorCodes
{
extern const int INVALID_USAGE_OF_INPUT;
extern const int LOGICAL_ERROR;
}
StorageInput::StorageInput(const StorageID & table_id, const ColumnsDescription & columns_)
@ -47,11 +51,33 @@ public:
void StorageInput::setPipe(Pipe pipe_)
{
pipe = std::move(pipe_);
was_pipe_initialized = true;
}
class ReadFromInput : public ISourceStep
{
public:
std::string getName() const override { return "ReadFromInput"; }
void initializePipeline(QueryPipelineBuilder & pipeline, const BuildQueryPipelineSettings &) override;
Pipe StorageInput::read(
const Names & /*column_names*/,
ReadFromInput(
Block sample_block,
Pipe pipe_,
StorageInput & storage_)
: ISourceStep(DataStream{.header = std::move(sample_block)})
, pipe(std::move(pipe_))
, storage(storage_)
{
}
private:
Pipe pipe;
StorageInput & storage;
};
void StorageInput::read(
QueryPlan & query_plan,
const Names & column_names,
const StorageSnapshotPtr & storage_snapshot,
SelectQueryInfo & /*query_info*/,
ContextPtr context,
@ -59,20 +85,43 @@ Pipe StorageInput::read(
size_t /*max_block_size*/,
size_t /*num_streams*/)
{
Pipes pipes;
storage_snapshot->check(column_names);
Block sample_block = storage_snapshot->metadata->getSampleBlock();
Pipe input_source_pipe;
auto query_context = context->getQueryContext();
/// It is TCP request if we have callbacks for input().
if (query_context->getInputBlocksReaderCallback())
{
/// Send structure to the client.
query_context->initializeInput(shared_from_this());
return Pipe(std::make_shared<StorageInputSource>(query_context, storage_snapshot->metadata->getSampleBlock()));
input_source_pipe = Pipe(std::make_shared<StorageInputSource>(query_context, sample_block));
}
if (pipe.empty())
auto reading = std::make_unique<ReadFromInput>(
std::move(sample_block),
std::move(input_source_pipe),
*this);
query_plan.addStep(std::move(reading));
}
void ReadFromInput::initializePipeline(QueryPipelineBuilder & pipeline, const BuildQueryPipelineSettings &)
{
if (!pipe.empty())
{
pipeline.init(std::move(pipe));
return;
}
if (!storage.was_pipe_initialized)
throw Exception(ErrorCodes::INVALID_USAGE_OF_INPUT, "Input stream is not initialized, input() must be used only in INSERT SELECT query");
return std::move(pipe);
if (storage.was_pipe_used)
throw Exception(ErrorCodes::LOGICAL_ERROR, "Trying to read from input() twice.");
pipeline.init(std::move(storage.pipe));
storage.was_pipe_used = true;
}
}

View File

@ -10,6 +10,7 @@ namespace DB
class StorageInput final : public IStorage
{
friend class ReadFromInput;
public:
StorageInput(const StorageID & table_id, const ColumnsDescription & columns_);
@ -18,7 +19,8 @@ public:
/// A table will read from this stream.
void setPipe(Pipe pipe_);
Pipe read(
void read(
QueryPlan & query_plan,
const Names & column_names,
const StorageSnapshotPtr & storage_snapshot,
SelectQueryInfo & query_info,
@ -29,5 +31,7 @@ public:
private:
Pipe pipe;
bool was_pipe_initialized = false;
bool was_pipe_used = false;
};
}

View File

@ -19,6 +19,8 @@
#include <Common/typeid_cast.h>
#include <Common/ThreadFuzzer.h>
#include <Core/ServerUUID.h>
#include <Disks/ObjectStorages/IMetadataStorage.h>
#include <base/sort.h>
@ -848,6 +850,9 @@ bool StorageReplicatedMergeTree::createTableIfNotExists(const StorageMetadataPtr
ops.emplace_back(zkutil::makeCreateRequest(replica_path + "/mutation_pointer", "",
zkutil::CreateMode::Persistent));
ops.emplace_back(zkutil::makeCreateRequest(replica_path + "/creator_info", toString(getStorageID().uuid) + "|" + toString(ServerUUID::get()),
zkutil::CreateMode::Persistent));
Coordination::Responses responses;
auto code = zookeeper->tryMulti(ops, responses);
if (code == Coordination::Error::ZNODEEXISTS)
@ -875,6 +880,80 @@ void StorageReplicatedMergeTree::createReplica(const StorageMetadataPtr & metada
LOG_DEBUG(log, "Creating replica {}", replica_path);
const String local_metadata = ReplicatedMergeTreeTableMetadata(*this, metadata_snapshot).toString();
const String local_columns = metadata_snapshot->getColumns().toString();
const String local_metadata_version = toString(metadata_snapshot->getMetadataVersion());
const String creator_info = toString(getStorageID().uuid) + "|" + toString(ServerUUID::get());
/// It is possible for the replica to fail after creating ZK nodes without saving local metadata.
/// Because of that we need to check whether the replica exists and is newly created.
/// For this we check that all nodes exist, the metadata of the table is the same, and other nodes are not modified.
std::vector<String> paths_exists = {
replica_path + "/host",
replica_path + "/log_pointer",
replica_path + "/queue",
replica_path + "/parts",
replica_path + "/flags",
replica_path + "/is_lost",
replica_path + "/metadata",
replica_path + "/columns",
replica_path + "/metadata_version",
replica_path + "/min_unprocessed_insert_time",
replica_path + "/max_processed_insert_time",
replica_path + "/mutation_pointer",
replica_path + "/creator_info"
};
auto response_exists = zookeeper->tryGet(paths_exists);
bool all_nodes_exist = true;
for (size_t i = 0; i < response_exists.size(); ++i)
{
if (response_exists[i].error != Coordination::Error::ZOK)
{
all_nodes_exist = false;
break;
}
}
if (all_nodes_exist)
{
size_t response_num = 0;
const auto & zk_host = response_exists[response_num++].data;
const auto & zk_log_pointer = response_exists[response_num++].data;
const auto & zk_queue = response_exists[response_num++].data;
const auto & zk_parts = response_exists[response_num++].data;
const auto & zk_flags = response_exists[response_num++].data;
const auto & zk_is_lost = response_exists[response_num++].data;
const auto & zk_metadata = response_exists[response_num++].data;
const auto & zk_columns = response_exists[response_num++].data;
const auto & zk_metadata_version = response_exists[response_num++].data;
const auto & zk_min_unprocessed_insert_time = response_exists[response_num++].data;
const auto & zk_max_processed_insert_time = response_exists[response_num++].data;
const auto & zk_mutation_pointer = response_exists[response_num++].data;
const auto & zk_creator_info = response_exists[response_num++].data;
if (zk_host.empty() &&
zk_log_pointer.empty() &&
zk_queue.empty() &&
zk_parts.empty() &&
zk_flags.empty() &&
(zk_is_lost == "0" || zk_is_lost == "1") &&
zk_metadata == local_metadata &&
zk_columns == local_columns &&
zk_metadata_version == local_metadata_version &&
zk_min_unprocessed_insert_time.empty() &&
zk_max_processed_insert_time.empty() &&
zk_mutation_pointer.empty() &&
zk_creator_info == creator_info)
{
LOG_DEBUG(log, "Empty replica {} exists, will use it", replica_path);
return;
}
}
Coordination::Error code;
do
@ -906,11 +985,11 @@ void StorageReplicatedMergeTree::createReplica(const StorageMetadataPtr & metada
zkutil::CreateMode::Persistent));
ops.emplace_back(zkutil::makeCreateRequest(replica_path + "/is_lost", is_lost_value,
zkutil::CreateMode::Persistent));
ops.emplace_back(zkutil::makeCreateRequest(replica_path + "/metadata", ReplicatedMergeTreeTableMetadata(*this, metadata_snapshot).toString(),
ops.emplace_back(zkutil::makeCreateRequest(replica_path + "/metadata", local_metadata,
zkutil::CreateMode::Persistent));
ops.emplace_back(zkutil::makeCreateRequest(replica_path + "/columns", metadata_snapshot->getColumns().toString(),
ops.emplace_back(zkutil::makeCreateRequest(replica_path + "/columns", local_columns,
zkutil::CreateMode::Persistent));
ops.emplace_back(zkutil::makeCreateRequest(replica_path + "/metadata_version", toString(metadata_snapshot->getMetadataVersion()),
ops.emplace_back(zkutil::makeCreateRequest(replica_path + "/metadata_version", local_metadata_version,
zkutil::CreateMode::Persistent));
/// The following 3 nodes were added in version 1.1.xxx, so we create them here, not in createNewZooKeeperNodes()
@ -921,6 +1000,9 @@ void StorageReplicatedMergeTree::createReplica(const StorageMetadataPtr & metada
ops.emplace_back(zkutil::makeCreateRequest(replica_path + "/mutation_pointer", "",
zkutil::CreateMode::Persistent));
ops.emplace_back(zkutil::makeCreateRequest(replica_path + "/creator_info", creator_info,
zkutil::CreateMode::Persistent));
/// Check version of /replicas to see if there are any replicas created at the same moment of time.
ops.emplace_back(zkutil::makeSetRequest(zookeeper_path + "/replicas", "last added replica: " + replica_name, replicas_stat.version));
@ -8890,7 +8972,7 @@ void StorageReplicatedMergeTree::createTableSharedID() const
else if (code == Coordination::Error::ZNONODE) /// table completely dropped, we can choose any id we want
{
id = toString(UUIDHelpers::Nil);
LOG_DEBUG(log, "Table was completely drop, we can use anything as ID (will use {})", id);
LOG_DEBUG(log, "Table was completely dropped, and we can use anything as ID (will use {})", id);
}
else if (code != Coordination::Error::ZOK)
{

View File

@ -1,5 +1,4 @@
00223_shard_distributed_aggregation_memory_efficient
00593_union_all_assert_columns_removed
00717_merge_and_distributed
00725_memory_tracking
01062_pm_all_join_with_block_continuation
@ -11,12 +10,9 @@
01244_optimize_distributed_group_by_sharding_key
01268_mv_scalars
01268_shard_avgweighted
01287_max_execution_speed
01455_shard_leaf_max_rows_bytes_to_read
01495_subqueries_in_with_statement
01560_merge_distributed_join
01584_distributed_buffer_cannot_find_column
01586_columns_pruning
01624_soft_constraints
01656_test_query_log_factories_info
01739_index_hint

1
tests/ci/worker/.gitignore vendored Normal file
View File

@ -0,0 +1 @@
generated_*init_runner.sh

View File

@ -0,0 +1,85 @@
#!/usr/bin/env bash
usage() {
echo "Usage: $0 ENVIRONMENT" >&2
echo "Valid values for ENVIRONMENT: staging, production" >&2
exit 1
}
case "$1" in
staging|production)
ENVIRONMENT="$1" ;;
--help)
usage ;;
*)
echo "Invalid argument" >&2
usage ;;
esac
cd "$(dirname "$0")" || exit 1
SOURCE_SCRIPT='init_runner.sh'
check_response() {
# Are we even in the interactive shell?
[ -t 1 ] || return 1
local request
request="$1"
read -rp "$request (y/N): " response
case "$response" in
[Yy])
return 0
# Your code to continue goes here
;;
*)
return 1
;;
esac
}
check_dirty() {
if [ -n "$(git status --porcelain=v2 "$SOURCE_SCRIPT")" ]; then
echo "The $SOURCE_SCRIPT has uncommited changes, won't deploy it" >&2
exit 1
fi
}
GIT_HASH=$(git log -1 --format=format:%H)
header() {
cat << EOF
#!/usr/bin/env bash
echo 'The $ENVIRONMENT script is generated from $SOURCE_SCRIPT, commit $GIT_HASH'
EOF
}
body() {
local first_line
first_line=$(sed -n '/^# THE SCRIPT START$/{=;q}' "$SOURCE_SCRIPT")
if [ -z "$first_line" ]; then
echo "The pattern '# THE SCRIPT START' is not found in $SOURCE_SCRIPT" >&2
exit 1
fi
tail "+$first_line" "$SOURCE_SCRIPT"
}
GENERATED_FILE="generated_${ENVIRONMENT}_${SOURCE_SCRIPT}"
{ header && body; } > "$GENERATED_FILE"
echo "The file $GENERATED_FILE is generated"
if check_response "Display the content of $GENERATED_FILE?"; then
if [ -z "$PAGER" ]; then
less "$GENERATED_FILE"
else
$PAGER "$GENERATED_FILE"
fi
fi
check_dirty
S3_OBJECT=${S3_OBJECT:-s3://github-runners-data/cloud-init/${ENVIRONMENT}.sh}
if check_response "Deploy the generated script to $S3_OBJECT?"; then
aws s3 mv "$GENERATED_FILE" "$S3_OBJECT"
fi

View File

@ -1,4 +1,46 @@
#!/usr/bin/env bash
cat > /dev/null << 'EOF'
The following content is embedded into the s3 object via the script
deploy-runner-init.sh {staging,production}
with additional helping information
In the `user data` you should define as the following text
between `### COPY BELOW` and `### COPY ABOVE`
### COPY BELOW
Content-Type: multipart/mixed; boundary="//"
MIME-Version: 1.0
--//
Content-Type: text/cloud-config; charset="us-ascii"
MIME-Version: 1.0
Content-Transfer-Encoding: 7bit
Content-Disposition: attachment; filename="cloud-config.txt"
#cloud-config
cloud_final_modules:
- [scripts-user, always]
--//
Content-Type: text/x-shellscript; charset="us-ascii"
MIME-Version: 1.0
Content-Transfer-Encoding: 7bit
Content-Disposition: attachment; filename="userdata.txt"
#!/bin/bash
INSTANCE_ID=$(ec2metadata --instance-id)
INIT_ENVIRONMENT=$(/usr/local/bin/aws ec2 describe-tags --filters "Name=resource-id,Values=$INSTANCE_ID" --query "Tags[?Key=='github:init-environment'].Value" --output text)
echo "Downloading and using $INIT_ENVIRONMENT cloud-init.sh"
aws s3 cp "s3://github-runners-data/cloud-init/${INIT_ENVIRONMENT:-production}.sh" /tmp/cloud-init.sh
chmod 0700 /tmp/cloud-init.sh
exec bash /tmp/cloud-init.sh
--//
### COPY ABOVE
EOF
# THE SCRIPT START
set -uo pipefail
####################################
@ -88,6 +130,23 @@ terminate_and_exit() {
declare -f terminate_and_exit >> /tmp/actions-hooks/common.sh
check_spot_instance_is_old() {
# This function should be executed ONLY BETWEEN runnings.
# It's unsafe to execute while the runner is working!
local LIFE_CYCLE
LIFE_CYCLE=$(curl -s --fail http://169.254.169.254/latest/meta-data/instance-life-cycle)
if [ "$LIFE_CYCLE" == "spot" ]; then
local UPTIME
UPTIME=$(< /proc/uptime)
UPTIME=${UPTIME%%.*}
if (( 3600 < UPTIME )); then
echo "The spot instance has uptime $UPTIME, it's time to shut it down"
return 0
fi
fi
return 1
}
check_proceed_spot_termination() {
# The function checks and proceeds spot instance termination if exists
# The event for spot instance termination
@ -104,7 +163,7 @@ check_proceed_spot_termination() {
if [ -n "$runner_pid" ]; then
# Kill the runner to not allow it cancelling the job
# shellcheck disable=SC2046
kill -9 $(list_children "$runner_pid")
kill -9 "$runner_pid" $(list_children "$runner_pid")
fi
sudo -u ubuntu ./config.sh remove --token "$(get_runner_token)"
terminate_and_exit
@ -119,6 +178,7 @@ no_terminating_metadata() {
# The event for rebalance recommendation. Not strict, so we have some room to make a decision here
if curl -s --fail http://169.254.169.254/latest/meta-data/events/recommendations/rebalance; then
echo 'Received recommendation to rebalance, checking the uptime'
local UPTIME
UPTIME=$(< /proc/uptime)
UPTIME=${UPTIME%%.*}
# We don't shutdown the instances younger than 30m
@ -260,14 +320,17 @@ while true; do
# If runner is not active, check that it needs to terminate itself
echo "Checking if the instance suppose to terminate"
no_terminating_metadata || terminate_on_event
check_spot_instance_is_old && terminate_and_exit
check_proceed_spot_termination
echo "Going to configure runner"
sudo -u ubuntu ./config.sh --url $RUNNER_URL --token "$(get_runner_token)" --ephemeral \
sudo -u ubuntu ./config.sh --url $RUNNER_URL --token "$(get_runner_token)" \
--ephemeral --disableupdate --unattended \
--runnergroup Default --labels "$LABELS" --work _work --name "$INSTANCE_ID"
echo "Another one check to avoid race between runner and infrastructure"
no_terminating_metadata || terminate_on_event
check_spot_instance_is_old && terminate_and_exit
check_proceed_spot_termination
echo "Run"
@ -275,7 +338,7 @@ while true; do
ACTIONS_RUNNER_HOOK_JOB_STARTED=/tmp/actions-hooks/pre-run.sh \
ACTIONS_RUNNER_HOOK_JOB_COMPLETED=/tmp/actions-hooks/post-run.sh \
./run.sh &
sleep 15
sleep 10
else
echo "Runner is working with pid $runner_pid, checking the metadata in background"
check_proceed_spot_termination
@ -291,8 +354,8 @@ while true; do
terminate_and_exit
fi
fi
sleep 5
fi
sleep 5
done
# vim:ts=4:sw=4

View File

@ -9,7 +9,7 @@ set -xeuo pipefail
echo "Running prepare script"
export DEBIAN_FRONTEND=noninteractive
export RUNNER_VERSION=2.304.0
export RUNNER_VERSION=2.311.0
export RUNNER_HOME=/home/ubuntu/actions-runner
deb_arch() {
@ -56,12 +56,12 @@ apt-get install --yes --no-install-recommends \
qemu-user-static \
unzip
# Install docker
curl -fsSL https://download.docker.com/linux/ubuntu/gpg | gpg --dearmor -o /usr/share/keyrings/docker-archive-keyring.gpg
echo "deb [arch=$(deb_arch) signed-by=/usr/share/keyrings/docker-archive-keyring.gpg] https://download.docker.com/linux/ubuntu $(lsb_release -cs) stable" | tee /etc/apt/sources.list.d/docker.list > /dev/null
apt-get update
apt-get install --yes --no-install-recommends docker-ce docker-buildx-plugin docker-ce-cli containerd.io
usermod -aG docker ubuntu
@ -81,6 +81,14 @@ cat <<EOT > /etc/docker/daemon.json
}
EOT
# Install azure-cli
curl -sLS https://packages.microsoft.com/keys/microsoft.asc | gpg --dearmor -o /etc/apt/keyrings/microsoft.gpg
AZ_DIST=$(lsb_release -cs)
echo "deb [arch=$(dpkg --print-architecture) signed-by=/etc/apt/keyrings/microsoft.gpg] https://packages.microsoft.com/repos/azure-cli/ $AZ_DIST main" | tee /etc/apt/sources.list.d/azure-cli.list
apt-get update
apt-get install --yes --no-install-recommends azure-cli
# Increase the limit on number of virtual memory mappings to aviod 'Cannot mmap' error
echo "vm.max_map_count = 2097152" > /etc/sysctl.d/01-increase-map-counts.conf
@ -88,10 +96,12 @@ systemctl restart docker
# buildx builder is user-specific
sudo -u ubuntu docker buildx version
sudo -u ubuntu docker buildx rm default-builder || : # if it's the second attempt
sudo -u ubuntu docker buildx create --use --name default-builder
pip install boto3 pygithub requests urllib3 unidiff dohq-artifactory
rm -rf $RUNNER_HOME # if it's the second attempt
mkdir -p $RUNNER_HOME && cd $RUNNER_HOME
RUNNER_ARCHIVE="actions-runner-linux-$(runner_arch)-$RUNNER_VERSION.tar.gz"
@ -130,3 +140,44 @@ systemctl enable amazon-cloudwatch-agent.service
# The following line is used in aws TOE check.
touch /var/tmp/clickhouse-ci-ami.success
# END OF THE SCRIPT
# TOE description
# name: CIInfrastructurePrepare
# description: instals the infrastructure for ClickHouse CI runners
# schemaVersion: 1.0
#
# phases:
# - name: build
# steps:
# - name: DownloadRemoteScript
# maxAttempts: 3
# action: WebDownload
# onFailure: Abort
# inputs:
# - source: https://github.com/ClickHouse/ClickHouse/raw/653da5f00219c088af66d97a8f1ea3e35e798268/tests/ci/worker/prepare-ci-ami.sh
# destination: /tmp/prepare-ci-ami.sh
# - name: RunScript
# maxAttempts: 3
# action: ExecuteBash
# onFailure: Abort
# inputs:
# commands:
# - bash -x '{{build.DownloadRemoteScript.inputs[0].destination}}'
#
#
# - name: validate
# steps:
# - name: RunScript
# maxAttempts: 3
# action: ExecuteBash
# onFailure: Abort
# inputs:
# commands:
# - ls /var/tmp/clickhouse-ci-ami.success
# - name: Cleanup
# action: DeleteFile
# onFailure: Abort
# maxAttempts: 3
# inputs:
# - path: /var/tmp/clickhouse-ci-ami.success

View File

@ -1173,6 +1173,25 @@ class TestCase:
description_full += result.reason.value
description_full += result.description
if (
args.collect_per_test_coverage
and BuildFlags.SANITIZE_COVERAGE in args.build_flags
):
clickhouse_execute(
args,
f"INSERT INTO system.coverage SELECT now(), '{self.case}', coverage()",
retry_error_codes=True,
)
coverage = clickhouse_execute(
args,
"SELECT length(coverage())",
retry_error_codes=True,
).decode()
description_full += f" Coverage: {coverage}"
description_full += "\n"
if result.status == TestStatus.FAIL and self.testcase_args:
@ -1231,6 +1250,17 @@ class TestCase:
+ pattern
)
# We want to calculate per-test code coverage. That's why we reset it before each test.
if (
args.collect_per_test_coverage
and BuildFlags.SANITIZE_COVERAGE in args.build_flags
):
clickhouse_execute(
args,
"SYSTEM RESET COVERAGE",
retry_error_codes=True,
)
command = pattern.format(**params)
proc = Popen(command, shell=True, env=os.environ)
@ -1872,6 +1902,7 @@ class BuildFlags:
UNDEFINED = "ubsan"
MEMORY = "msan"
DEBUG = "debug"
SANITIZE_COVERAGE = "sanitize-coverage"
RELEASE = "release"
ORDINARY_DATABASE = "ordinary-database"
POLYMORPHIC_PARTS = "polymorphic-parts"
@ -1891,6 +1922,8 @@ def collect_build_flags(args):
result.append(BuildFlags.UNDEFINED)
elif b"-fsanitize=memory" in value:
result.append(BuildFlags.MEMORY)
elif b"-DSANITIZE_COVERAGE=1" in value:
result.append(BuildFlags.SANITIZE_COVERAGE)
value = clickhouse_execute(
args, "SELECT value FROM system.build_options WHERE name = 'BUILD_TYPE'"
@ -2072,6 +2105,8 @@ def reportCoverageFor(args, what, query, permissive=False):
return True
# This is high-level coverage on per-component basis (functions, data types, etc.)
# Don't be confused with the code coverage.
def reportCoverage(args):
clickhouse_execute(args, "SYSTEM FLUSH LOGS")
@ -2334,6 +2369,28 @@ def main(args):
print(f"Failed to create databases for tests: {e}")
server_died.set()
if (
args.collect_per_test_coverage
and BuildFlags.SANITIZE_COVERAGE in args.build_flags
):
clickhouse_execute(
args,
"""
CREATE TABLE IF NOT EXISTS system.coverage
(
time DateTime,
test_name String,
coverage Array(UInt64)
) ENGINE = MergeTree ORDER BY test_name;
""",
)
# Coverage collected at the system startup before running any tests:
clickhouse_execute(
args,
"INSERT INTO system.coverage SELECT now(), '', coverage()",
)
total_tests_run = 0
for suite in sorted(os.listdir(base_dir), key=suite_key_func):
@ -2678,6 +2735,12 @@ def parse_args():
default=False,
help="Check what high-level server components were covered by tests",
)
parser.add_argument(
"--collect-per-test-coverage",
action="store_true",
default=False,
help="Create `system.coverage` table on the server and collect information about low-level code coverage on a per test basis there",
)
parser.add_argument(
"--report-logs-stats",
action="store_true",

View File

@ -37,39 +37,59 @@ class KeeperException(Exception):
class KeeperClient(object):
SEPARATOR = b"\a\a\a\a\n"
def __init__(self, bin_path: str, host: str, port: int):
def __init__(self, bin_path: str, host: str, port: int, connection_tries=30):
self.bin_path = bin_path
self.host = host
self.port = port
self.proc = subprocess.Popen(
[
bin_path,
"keeper-client",
"--host",
host,
"--port",
str(port),
"--log-level",
"error",
"--tests-mode",
"--no-confirmation",
],
stdin=subprocess.PIPE,
stdout=subprocess.PIPE,
stderr=subprocess.PIPE,
)
retry_count = 0
self.poller = select.epoll()
self.poller.register(self.proc.stdout)
self.poller.register(self.proc.stderr)
while True:
try:
self.proc = subprocess.Popen(
[
bin_path,
"keeper-client",
"--host",
host,
"--port",
str(port),
"--log-level",
"error",
"--tests-mode",
"--no-confirmation",
],
stdin=subprocess.PIPE,
stdout=subprocess.PIPE,
stderr=subprocess.PIPE,
)
self._fd_nums = {
self.proc.stdout.fileno(): self.proc.stdout,
self.proc.stderr.fileno(): self.proc.stderr,
}
self.poller = select.epoll()
self.poller.register(self.proc.stdout)
self.poller.register(self.proc.stderr)
self.stopped = False
self._fd_nums = {
self.proc.stdout.fileno(): self.proc.stdout,
self.proc.stderr.fileno(): self.proc.stderr,
}
self.stopped = False
self.get("/keeper", 60.0)
break
except Exception as e:
retry_count += 1
if (
"All connection tries failed while connecting to ZooKeeper"
in str(e)
and retry_count < connection_tries
):
print(
f"Got exception while connecting to Keeper: {e}\nWill reconnect, reconnect count = {retry_count}"
)
time.sleep(1)
else:
raise
def execute_query(self, query: str, timeout: float = 60.0) -> str:
output = io.BytesIO()
@ -94,7 +114,7 @@ class KeeperClient(object):
output.write(chunk)
elif file == self.proc.stderr:
assert self.proc.stdout.readline() == self.SEPARATOR
self.proc.stdout.readline()
raise KeeperException(self.proc.stderr.readline().strip().decode())
else:
@ -221,13 +241,12 @@ NOT_SERVING_REQUESTS_ERROR_MSG = "This instance is not currently serving request
def wait_until_connected(cluster, node, port=9181, timeout=30.0):
elapsed = 0.0
start = time.time()
while send_4lw_cmd(cluster, node, "mntr", port) == NOT_SERVING_REQUESTS_ERROR_MSG:
time.sleep(0.1)
elapsed += 0.1
if elapsed >= timeout:
if time.time() - start > timeout:
raise Exception(
f"{timeout}s timeout while waiting for {node.name} to start serving requests"
)
@ -280,14 +299,16 @@ def wait_configs_equal(left_config: str, right_zk: KeeperClient, timeout: float
Check whether get /keeper/config result in left_config is equal
to get /keeper/config on right_zk ZK connection.
"""
elapsed: float = 0.0
while sorted(left_config.split("\n")) != sorted(
get_config_str(right_zk).split("\n")
):
start = time.time()
left_config = sorted(left_config.split("\n"))
while True:
right_config = sorted(get_config_str(right_zk).split("\n"))
if left_config == right_config:
return
time.sleep(1)
elapsed += 1
if elapsed >= timeout:
if time.time() - start > timeout:
raise Exception(
f"timeout while checking nodes configs to get equal. "
f"Left: {left_config}, right: {get_config_str(right_zk)}"
f"Left: {left_config}, right: {right_config}"
)

View File

@ -0,0 +1,17 @@
<clickhouse>
<profiles>
<default>
<max_query_size from_env="MAX_QUERY_SIZE" />
</default>
</profiles>
<users>
<default>
<password></password>
<profile>default</profile>
<quota>default</quota>
</default>
<include incl="users_1" />
<include incl="users_2" />
</users>
</clickhouse>

View File

@ -0,0 +1,17 @@
<clickhouse>
<profiles>
<default>
<max_query_size>424242</max_query_size>
</default>
</profiles>
<users>
<default>
<password></password>
<profile>default</profile>
<quota>default</quota>
</default>
<include incl="users_1" />
<include incl="users_2" />
</users>
</clickhouse>

View File

@ -30,6 +30,15 @@ node6 = cluster.add_instance(
},
main_configs=["configs/include_from_source.xml"],
)
node7 = cluster.add_instance(
"node7",
user_configs=[
"configs/000-config_with_env_subst.xml",
"configs/010-env_subst_override.xml",
],
env_variables={"MAX_QUERY_SIZE": "121212"},
instance_env_variables=True,
) # overridden with 424242
@pytest.fixture(scope="module")
@ -78,6 +87,10 @@ def test_config(start_cluster):
node6.query("select value from system.settings where name = 'max_query_size'")
== "99999\n"
)
assert (
node7.query("select value from system.settings where name = 'max_query_size'")
== "424242\n"
)
def test_include_config(start_cluster):

View File

@ -1,13 +1,7 @@
import pytest
from helpers.cluster import ClickHouseCluster
import helpers.keeper_utils as keeper_utils
import random
import string
import os
import time
from multiprocessing.dummy import Pool
from helpers.network import PartitionManager
from helpers.test_tools import assert_eq_with_retry
cluster = ClickHouseCluster(__file__)
node1 = cluster.add_instance(
@ -82,6 +76,13 @@ def test_single_node_broken_log(started_cluster):
node1_conn.close()
node1.stop_clickhouse()
# wait until cluster stabilizes with a new leader
while not keeper_utils.is_leader(
started_cluster, node2
) and not keeper_utils.is_leader(started_cluster, node3):
time.sleep(1)
node1.exec_in_container(
[
"truncate",

View File

@ -3,6 +3,7 @@
import pytest
from helpers.cluster import ClickHouseCluster, ClickHouseInstance
from os.path import join, dirname, realpath
import time
import helpers.keeper_utils as ku
import typing as tp
@ -83,6 +84,12 @@ def test_reconfig_replace_leader(started_cluster):
assert "node3" in config
assert "node4" not in config
# wait until cluster stabilizes with a new leader
while not ku.is_leader(started_cluster, node2) and not ku.is_leader(
started_cluster, node3
):
time.sleep(1)
# additional 20s wait before removing leader
ku.wait_configs_equal(config, zk2, timeout=50)

View File

@ -0,0 +1 @@
#!/usr/bin/env python3

View File

@ -0,0 +1,38 @@
<?xml version="1.0" encoding="utf-8"?>
<clickhouse>
<storage_configuration>
<disks>
<default/>
<s3>
<type>s3</type>
<endpoint>http://minio1:9001/root/data/</endpoint>
<access_key_id>minio</access_key_id>
<secret_access_key>minio123</secret_access_key>
</s3>
<broken_s3>
<type>s3</type>
<endpoint>http://resolver:8083/root/data/</endpoint>
<access_key_id>minio</access_key_id>
<secret_access_key>minio123</secret_access_key>
</broken_s3>
</disks>
<policies>
<slow_s3>
<volumes>
<main>
<disk>default</disk>
</main>
<broken>
<disk>broken_s3</disk>
</broken>
</volumes>
<move_factor>0.0</move_factor>
</slow_s3>
</policies>
</storage_configuration>
</clickhouse>

View File

@ -0,0 +1,133 @@
#!/usr/bin/env python3
import logging
import time
import os
import pytest
from helpers.cluster import ClickHouseCluster
from helpers.mock_servers import start_s3_mock, start_mock_servers
from helpers.utility import generate_values, replace_config, SafeThread
from helpers.wait_for_helpers import wait_for_delete_inactive_parts
from helpers.wait_for_helpers import wait_for_delete_empty_parts
from helpers.wait_for_helpers import wait_for_merges
SCRIPT_DIR = os.path.dirname(os.path.realpath(__file__))
@pytest.fixture(scope="module")
def init_broken_s3(cluster):
yield start_s3_mock(cluster, "broken_s3", "8083")
@pytest.fixture(scope="function")
def broken_s3(init_broken_s3):
init_broken_s3.reset()
yield init_broken_s3
@pytest.fixture(scope="module")
def cluster():
try:
cluster = ClickHouseCluster(__file__)
cluster.add_instance(
"node",
main_configs=[
"configs/storage_policy.xml",
],
with_minio=True,
)
cluster.start()
logging.info("Cluster started")
yield cluster
finally:
cluster.shutdown()
def test_async_alter_move(cluster, broken_s3):
node = cluster.instances["node"]
node.query(
"""
CREATE TABLE moving_table_async
(
key UInt64,
data String
)
ENGINE MergeTree()
ORDER BY tuple()
SETTINGS storage_policy = 'slow_s3'
"""
)
node.query(
"INSERT INTO moving_table_async SELECT number, randomPrintableASCII(1000) FROM numbers(10000)"
)
broken_s3.setup_slow_answers(
timeout=5,
count=1000000,
)
node.query(
"ALTER TABLE moving_table_async MOVE PARTITION tuple() TO DISK 'broken_s3'",
settings={"alter_move_to_space_execute_async": True},
timeout=10,
)
# not flaky, just introduce some wait
time.sleep(3)
for i in range(100):
count = node.query(
"SELECT count() FROM system.moves where table = 'moving_table_async'"
)
if count == "1\n":
break
time.sleep(0.1)
else:
assert False, "Cannot find any moving background operation"
def test_sync_alter_move(cluster, broken_s3):
node = cluster.instances["node"]
node.query(
"""
CREATE TABLE moving_table_sync
(
key UInt64,
data String
)
ENGINE MergeTree()
ORDER BY tuple()
SETTINGS storage_policy = 'slow_s3'
"""
)
node.query(
"INSERT INTO moving_table_sync SELECT number, randomPrintableASCII(1000) FROM numbers(10000)"
)
broken_s3.reset()
node.query(
"ALTER TABLE moving_table_sync MOVE PARTITION tuple() TO DISK 'broken_s3'",
timeout=30,
)
# not flaky, just introduce some wait
time.sleep(3)
assert (
node.query("SELECT count() FROM system.moves where table = 'moving_table_sync'")
== "0\n"
)
assert (
node.query(
"SELECT disk_name FROM system.parts WHERE table = 'moving_table_sync'"
)
== "broken_s3\n"
)

View File

@ -27,10 +27,6 @@
<query>SELECT avg(num_f) FROM perf_avg FORMAT Null</query>
<query>SELECT avgWeighted(num_f, num) FROM perf_avg FORMAT Null</query>
<query>SELECT avgWeighted(num_f, num_f) FROM perf_avg FORMAT Null</query>
<query>SELECT avgWeighted(num_f, num_u) FROM perf_avg FORMAT Null</query>
<query>SELECT avgWeighted(num_u, num_f) FROM perf_avg FORMAT Null</query>
<query>SELECT avgWeighted(num_u, num) FROM perf_avg FORMAT Null</query>
<query>SELECT avgWeighted(num_u, num_u) FROM perf_avg FORMAT Null</query>
<query>SELECT avgWeighted(num_f, num_f) FROM perf_avg FORMAT Null</query>
<query>SELECT avgWeighted(toNullable(num_f), num_f) FROM perf_avg FORMAT Null</query>

View File

@ -1,4 +1,4 @@
SELECT toDate('07-08-2019'); -- { serverError 6 }
SELECT toDate('07-08-2019'); -- { serverError 38 }
SELECT toDate('2019-0708'); -- { serverError 38 }
SELECT toDate('201907-08'); -- { serverError 38 }
SELECT toDate('2019^7^8');
@ -6,5 +6,5 @@ SELECT toDate('2019^7^8');
CREATE TEMPORARY TABLE test (d Date);
INSERT INTO test VALUES ('2018-01-01');
SELECT * FROM test WHERE d >= '07-08-2019'; -- { serverError 53 }
SELECT * FROM test WHERE d >= '07-08-2019'; -- { serverError 38 }
SELECT * FROM test WHERE d >= '2019-07-08';

View File

@ -16,6 +16,8 @@ columns
columns
columns
columns
creator_info
creator_info
failed_parts
failed_parts
flags

View File

@ -7,6 +7,7 @@ block_numbers
blocks
columns
columns
creator_info
failed_parts
flags
host
@ -49,6 +50,7 @@ block_numbers
blocks
columns
columns
creator_info
failed_parts
flags
host

View File

@ -14,36 +14,29 @@ Header: avgWeighted(x, y) Nullable(Float64)
Header: x_0 Nullable(UInt8)
y_1 UInt8
Union
Header: NULL Nullable(UInt8)
x Nullable(UInt8)
Header: x Nullable(UInt8)
y UInt8
Expression (Conversion before UNION)
Header: NULL Nullable(UInt8)
x Nullable(UInt8)
Header: x Nullable(UInt8)
y UInt8
Expression (Project names)
Header: NULL Nullable(Nothing)
x UInt8
Header: x UInt8
y UInt8
Expression (Projection)
Header: NULL_Nullable(Nothing) Nullable(Nothing)
255_UInt8 UInt8
Header: 255_UInt8 UInt8
1_UInt8 UInt8
Expression (Change column names to column identifiers)
Header: dummy_0 UInt8
ReadFromStorage (SystemOne)
Header: dummy UInt8
Expression (Conversion before UNION)
Header: NULL Nullable(UInt8)
x Nullable(UInt8)
Header: x Nullable(UInt8)
y UInt8
Expression (Project names)
Header: y UInt8
x Nullable(Nothing)
Header: x Nullable(Nothing)
y UInt8
Expression (Projection)
Header: 1_UInt8 UInt8
NULL_Nullable(Nothing) Nullable(Nothing)
Header: NULL_Nullable(Nothing) Nullable(Nothing)
1_UInt8 UInt8
Expression (Change column names to column identifiers)
Header: dummy_0 UInt8

View File

@ -0,0 +1,4 @@
-- Tags: no-fasttest
SELECT count() FROM cluster('test_cluster_two_shards', view( SELECT * FROM numbers(100000000000) )) SETTINGS max_execution_time_leaf = 1; -- { serverError 159 }
-- Can return partial result
SELECT count() FROM cluster('test_cluster_two_shards', view( SELECT * FROM numbers(100000000000) )) FORMAT Null SETTINGS max_execution_time_leaf = 1, timeout_overflow_mode_leaf = 'break';

View File

@ -125,18 +125,13 @@ QUERY id: 0
QUERY id: 3, is_subquery: 1
PROJECTION COLUMNS
a UInt8
sum(b) UInt64
PROJECTION
LIST id: 4, nodes: 2
LIST id: 4, nodes: 1
COLUMN id: 5, column_name: a, result_type: UInt8, source_id: 6
FUNCTION id: 7, function_name: sum, function_type: aggregate, result_type: UInt64
ARGUMENTS
LIST id: 8, nodes: 1
COLUMN id: 9, column_name: b, result_type: UInt8, source_id: 6
JOIN TREE
TABLE id: 6, table_name: default.test_rewrite_uniq_to_count
GROUP BY
LIST id: 10, nodes: 1
LIST id: 7, nodes: 1
COLUMN id: 5, column_name: a, result_type: UInt8, source_id: 6
SETTINGS allow_experimental_analyzer=1
6. test group by with subquery alias
@ -162,18 +157,13 @@ QUERY id: 0
QUERY id: 3, alias: t, is_subquery: 1
PROJECTION COLUMNS
a UInt8
sum(b) UInt64
PROJECTION
LIST id: 4, nodes: 2
LIST id: 4, nodes: 1
COLUMN id: 5, column_name: a, result_type: UInt8, source_id: 6
FUNCTION id: 7, function_name: sum, function_type: aggregate, result_type: UInt64
ARGUMENTS
LIST id: 8, nodes: 1
COLUMN id: 9, column_name: b, result_type: UInt8, source_id: 6
JOIN TREE
TABLE id: 6, table_name: default.test_rewrite_uniq_to_count
GROUP BY
LIST id: 10, nodes: 1
LIST id: 7, nodes: 1
COLUMN id: 5, column_name: a, result_type: UInt8, source_id: 6
SETTINGS allow_experimental_analyzer=1
7. test group by with compound column name
@ -199,18 +189,13 @@ QUERY id: 0
QUERY id: 3, alias: t, is_subquery: 1
PROJECTION COLUMNS
alias_of_a UInt8
sum(b) UInt64
PROJECTION
LIST id: 4, nodes: 2
LIST id: 4, nodes: 1
COLUMN id: 5, column_name: a, result_type: UInt8, source_id: 6
FUNCTION id: 7, function_name: sum, function_type: aggregate, result_type: UInt64
ARGUMENTS
LIST id: 8, nodes: 1
COLUMN id: 9, column_name: b, result_type: UInt8, source_id: 6
JOIN TREE
TABLE id: 6, table_name: default.test_rewrite_uniq_to_count
GROUP BY
LIST id: 10, nodes: 1
LIST id: 7, nodes: 1
COLUMN id: 5, column_name: a, result_type: UInt8, source_id: 6
SETTINGS allow_experimental_analyzer=1
8. test group by with select expression alias
@ -236,17 +221,12 @@ QUERY id: 0
QUERY id: 3, alias: t, is_subquery: 1
PROJECTION COLUMNS
alias_of_a UInt8
sum(b) UInt64
PROJECTION
LIST id: 4, nodes: 2
LIST id: 4, nodes: 1
COLUMN id: 5, column_name: a, result_type: UInt8, source_id: 6
FUNCTION id: 7, function_name: sum, function_type: aggregate, result_type: UInt64
ARGUMENTS
LIST id: 8, nodes: 1
COLUMN id: 9, column_name: b, result_type: UInt8, source_id: 6
JOIN TREE
TABLE id: 6, table_name: default.test_rewrite_uniq_to_count
GROUP BY
LIST id: 10, nodes: 1
LIST id: 7, nodes: 1
COLUMN id: 5, column_name: a, result_type: UInt8, source_id: 6
SETTINGS allow_experimental_analyzer=1

View File

@ -0,0 +1,4 @@
1
1
1
1

View File

@ -0,0 +1,42 @@
#!/usr/bin/env bash
# Tags: zookeeper, no-parallel, no-ordinary-database
CUR_DIR=$(cd "$(dirname "${BASH_SOURCE[0]}")" && pwd)
# shellcheck source=../shell_config.sh
. "$CUR_DIR"/../shell_config.sh
${CLICKHOUSE_CLIENT} -q "DROP TABLE IF EXISTS test_exception_replicated SYNC"
UUID=$(${CLICKHOUSE_CLIENT} --query "SELECT reinterpretAsUUID(currentDatabase())")
#### 1 - There is only one replica
${CLICKHOUSE_CLIENT} --create_replicated_merge_tree_fault_injection_probability=1 \
-q "CREATE TABLE test_exception_replicated UUID '$UUID' (date Date) ENGINE=ReplicatedMergeTree('/clickhouse/tables/$CLICKHOUSE_TEST_ZOOKEEPER_PREFIX/recreate', 'r1') ORDER BY date" 2>&1 | grep -cm1 "Fault injected"
# We will see that the replica is empty and throw the same 'Fault injected' exception as before
${CLICKHOUSE_CLIENT} --create_replicated_merge_tree_fault_injection_probability=1 \
-q "CREATE TABLE test_exception_replicated UUID '$UUID' (date Date) ENGINE=ReplicatedMergeTree('/clickhouse/tables/$CLICKHOUSE_TEST_ZOOKEEPER_PREFIX/recreate', 'r1') ORDER BY date" 2>&1 | grep -cm1 "Fault injected"
# We will succeed
${CLICKHOUSE_CLIENT} \
-q "CREATE TABLE test_exception_replicated UUID '$UUID' (date Date) ENGINE=ReplicatedMergeTree('/clickhouse/tables/$CLICKHOUSE_TEST_ZOOKEEPER_PREFIX/recreate', 'r1') ORDER BY date"
${CLICKHOUSE_CLIENT} -q "DROP TABLE test_exception_replicated SYNC"
#### 2 - There are two replicas
${CLICKHOUSE_CLIENT} --create_replicated_merge_tree_fault_injection_probability=1 \
-q "CREATE TABLE test_exception_replicated UUID '$UUID' (date Date) ENGINE=ReplicatedMergeTree('/clickhouse/tables/$CLICKHOUSE_TEST_ZOOKEEPER_PREFIX/recreate', 'r1') ORDER BY date" 2>&1 | grep -cm1 "Fault injected"
${CLICKHOUSE_CLIENT} --create_replicated_merge_tree_fault_injection_probability=1 \
-q "CREATE TABLE test_exception_replicated_2 (date Date) ENGINE=ReplicatedMergeTree('/clickhouse/tables/$CLICKHOUSE_TEST_ZOOKEEPER_PREFIX/recreate', 'r2') ORDER BY date" 2>&1 | grep -cm1 "Fault injected"
# We will succeed
${CLICKHOUSE_CLIENT} \
-q "CREATE TABLE test_exception_replicated UUID '$UUID' (date Date) ENGINE=ReplicatedMergeTree('/clickhouse/tables/$CLICKHOUSE_TEST_ZOOKEEPER_PREFIX/recreate', 'r1') ORDER BY date"
# The trash from the second replica creation will not prevent us from dropping the table fully, so we delete it separately
${CLICKHOUSE_CLIENT} -q "SYSTEM DROP REPLICA 'r2' FROM TABLE test_exception_replicated"
${CLICKHOUSE_CLIENT} -q "DROP TABLE test_exception_replicated SYNC"

View File

@ -84,3 +84,8 @@ c
0
0
1
[2.199219,1.099609,3.300781]
[4.25,3.34961,6.628906]
inf
nan
0

View File

@ -56,3 +56,7 @@ $CLICKHOUSE_LOCAL -q "select * from file('$CURDIR/data_npy/one_dim_str.npy', Npy
$CLICKHOUSE_LOCAL -q "select * from file('$CURDIR/data_npy/one_dim_unicode.npy', Npy, 'value Float32')" 2>&1 | grep -c "BAD_ARGUMENTS"
$CLICKHOUSE_LOCAL -q "select * from file('$CURDIR/data_npy/complex.npy')" 2>&1 | grep -c "BAD_ARGUMENTS"
$CLICKHOUSE_LOCAL -q "select * from file('$CURDIR/data_npy/float_16.npy')"
$CLICKHOUSE_LOCAL -q "select * from file('$CURDIR/data_npy/npy_inf_nan_null.npy')"

View File

@ -0,0 +1,9 @@
3
3
3
array Int64
3
1000000
1000000
array Int64
1000000

View File

@ -0,0 +1,19 @@
#!/usr/bin/env bash
CURDIR=$(cd "$(dirname "${BASH_SOURCE[0]}")" && pwd)
# shellcheck source=../shell_config.sh
. "$CURDIR"/../shell_config.sh
$CLICKHOUSE_LOCAL -q "select count() from file('$CURDIR/data_npy/one_dim.npy') settings optimize_count_from_files=0"
$CLICKHOUSE_LOCAL -q "select count() from file('$CURDIR/data_npy/one_dim.npy') settings optimize_count_from_files=1"
$CLICKHOUSE_LOCAL -q "select count() from file('$CURDIR/data_npy/one_dim.npy', auto, 'array Int64') settings optimize_count_from_files=1"
$CLICKHOUSE_LOCAL -nm -q "
desc file('$CURDIR/data_npy/one_dim.npy');
select number_of_rows from system.schema_inference_cache where format='Npy';
"
$CLICKHOUSE_LOCAL -q "select count() from file('$CURDIR/data_npy/npy_big.npy') settings optimize_count_from_files=0"
$CLICKHOUSE_LOCAL -q "select count() from file('$CURDIR/data_npy/npy_big.npy') settings optimize_count_from_files=1"
$CLICKHOUSE_LOCAL -nm -q "
desc file('$CURDIR/data_npy/npy_big.npy');
select number_of_rows from system.schema_inference_cache where format='Npy';
"

View File

@ -0,0 +1,15 @@
#!/usr/bin/expect -f
log_user 0
set timeout 60
match_max 100000
spawn bash -c "clickhouse-local"
expect ":) "
send -- "SET send_logs_level = 't'\r"
expect "Exception on client:"
expect ":) "
send -- "exit\r"
expect eof

View File

@ -0,0 +1,8 @@
0
0
0
0
0
0
0
0

View File

@ -0,0 +1,22 @@
SET allow_experimental_analyzer = 1;
DROP TABLE IF EXISTS test_table;
CREATE TABLE test_table
(
id UInt64,
value String
) ENGINE = MergeTree ORDER BY id;
INSERT INTO test_table VALUES (0, 'Value_0');
SET max_columns_to_read = 1;
SELECT id FROM (SELECT * FROM test_table);
SELECT id FROM (SELECT * FROM (SELECT * FROM test_table));
SELECT id FROM (SELECT * FROM test_table UNION ALL SELECT * FROM test_table);
SELECT id FROM (SELECT id, value FROM test_table);
SELECT id FROM (SELECT id, value FROM (SELECT id, value FROM test_table));
SELECT id FROM (SELECT id, value FROM test_table UNION ALL SELECT id, value FROM test_table);
DROP TABLE test_table;

View File

@ -0,0 +1,40 @@
#!/usr/bin/env bash
# Tags: no-random-merge-tree-settings
CURDIR=$(cd "$(dirname "${BASH_SOURCE[0]}")" && pwd)
# shellcheck source=../shell_config.sh
. "$CURDIR"/../shell_config.sh
$CLICKHOUSE_CLIENT -nm -q "
CREATE TABLE IF NOT EXISTS ts_data_double_raw
(
device_id UInt32 NOT NULL CODEC(ZSTD),
data_item_id UInt32 NOT NULL CODEC(ZSTD),
data_time DateTime64(3, 'UTC') NOT NULL CODEC(Delta, ZSTD),
data_value Float64 NOT NULL CODEC(Delta, ZSTD),
is_deleted Bool CODEC(ZSTD),
ingestion_time DateTime64(3, 'UTC') NOT NULL CODEC(Delta, ZSTD)
)
ENGINE = ReplacingMergeTree
PARTITION BY toYYYYMM(data_time)
ORDER BY (device_id, data_item_id, data_time)
SETTINGS index_granularity = 8192;
CREATE VIEW ts_data_double AS
SELECT
device_id,
data_item_id,
data_time,
argMax(data_value, ingestion_time) data_value,
max(ingestion_time) version,
argMax(is_deleted, ingestion_time) is_deleted
FROM ts_data_double_raw
GROUP BY device_id, data_item_id, data_time
HAVING is_deleted = 0;
INSERT INTO ts_data_double_raw VALUES (100, 1, fromUnixTimestamp64Milli(1697547086760), 3.6, false, fromUnixTimestamp64Milli(1)), (100, 1, fromUnixTimestamp64Milli(1697547086761), 4.6, false, fromUnixTimestamp64Milli(1));
INSERT INTO ts_data_double_raw VALUES (100, 1, fromUnixTimestamp64Milli(1697547086760), 3.6, true, fromUnixTimestamp64Milli(5)), (100, 1, fromUnixTimestamp64Milli(1697547086761), 4.6, false, fromUnixTimestamp64Milli(4));
"
$CLICKHOUSE_CLIENT -q "select 1697547086760 format RowBinary" | ${CLICKHOUSE_CURL} -sS "${CLICKHOUSE_URL}&query=INSERT%20INTO%20ts_data_double_raw%20%28device_id%2C%20data_item_id%2C%20data_time%2C%20data_value%2C%20is_deleted%2C%20ingestion_time%29%0ASELECT%0A%20%20%20device_id%2C%0A%20%20%20data_item_id%2C%0A%20%20%20data_time%2C%0A%20%20%20data_value%2C%0A%20%20%201%2C%20%20--%20mark%20as%20deleted%0A%20%20%20fromUnixTimestamp64Milli%281697547088995%2C%20%27UTC%27%29%20--%20all%20inserted%20records%20have%20new%20ingestion%20time%0AFROM%20ts_data_double%0AWHERE%20%28device_id%20%3D%20100%29%20AND%20%28data_item_id%20%3D%201%29%0A%20%20%20%20AND%20%28data_time%20%3E%3D%20fromUnixTimestamp64Milli%280%2C%20%27UTC%27%29%29%0A%20%20%20%20AND%20%28data_time%20%3C%3D%20fromUnixTimestamp64Milli%281697547086764%2C%20%27UTC%27%29%29%0A%20%20%20%20AND%20version%20%3C%20fromUnixTimestamp64Milli%281697547088995%2C%20%27UTC%27%29%0A%20%20%20%20AND%20%28toUnixTimestamp64Milli%28data_time%29%20IN%20%28SELECT%20timestamp%20FROM%20input%28%27timestamp%20UInt64%27%29%29%29%20SETTINGS%20insert_quorum%3D1%0A%20FORMAT%20RowBinary" --data-binary @-

View File

@ -0,0 +1,5 @@
2020-01-02 SomeString
2020-01-02 SomeString
2020-01-02 SomeString
2020-01-02 SomeString
2020-01-02 SomeString

Some files were not shown because too many files have changed in this diff Show More