Merge remote-tracking branch 'upstream/master' into fix-upgrade-check

This commit is contained in:
kssenii 2023-08-21 19:28:19 +02:00
commit 951081c8fb
91 changed files with 2117 additions and 386 deletions

View File

@ -147,7 +147,7 @@ target_compile_definitions(_libarchive PUBLIC
target_compile_options(_libarchive PRIVATE "-Wno-reserved-macro-identifier")
if (TARGET ch_contrib::xz)
target_compile_definitions(_libarchive PUBLIC HAVE_LZMA_H=1)
target_compile_definitions(_libarchive PUBLIC HAVE_LZMA_H=1 HAVE_LIBLZMA=1)
target_link_libraries(_libarchive PRIVATE ch_contrib::xz)
endif()
@ -156,6 +156,16 @@ if (TARGET ch_contrib::zlib)
target_link_libraries(_libarchive PRIVATE ch_contrib::zlib)
endif()
if (TARGET ch_contrib::zstd)
target_compile_definitions(_libarchive PUBLIC HAVE_ZSTD_H=1 HAVE_LIBZSTD=1)
target_link_libraries(_libarchive PRIVATE ch_contrib::zstd)
endif()
if (TARGET ch_contrib::bzip2)
target_compile_definitions(_libarchive PUBLIC HAVE_BZLIB_H=1)
target_link_libraries(_libarchive PRIVATE ch_contrib::bzip2)
endif()
if (OS_LINUX)
target_compile_definitions(
_libarchive PUBLIC

@ -1 +1 @@
Subproject commit d857c707fccd50423bea1c4710dc469cf89607a9
Subproject commit e7b8befca85c8b847614432dba250c22d35fbae0

View File

@ -1,18 +1,16 @@
if (APPLE OR NOT ARCH_AMD64 OR SANITIZE STREQUAL "undefined")
if (APPLE OR SANITIZE STREQUAL "undefined")
set (ENABLE_EMBEDDED_COMPILER_DEFAULT OFF)
else()
set (ENABLE_EMBEDDED_COMPILER_DEFAULT ON)
endif()
option (ENABLE_EMBEDDED_COMPILER "Enable support for 'compile_expressions' option for query execution" ${ENABLE_EMBEDDED_COMPILER_DEFAULT})
option (ENABLE_EMBEDDED_COMPILER "Enable support for JIT compilation during query execution" ${ENABLE_EMBEDDED_COMPILER_DEFAULT})
if (NOT ENABLE_EMBEDDED_COMPILER)
message(STATUS "Not using LLVM")
return()
endif()
# TODO: Enable compilation on AArch64
set (LLVM_VERSION "15.0.0bundled")
set (LLVM_INCLUDE_DIRS
"${ClickHouse_SOURCE_DIR}/contrib/llvm-project/llvm/include"
@ -58,18 +56,30 @@ set (REQUIRED_LLVM_LIBRARIES
LLVMDemangle
)
# if (ARCH_AMD64)
if (ARCH_AMD64)
set (LLVM_TARGETS_TO_BUILD "X86" CACHE INTERNAL "")
list(APPEND REQUIRED_LLVM_LIBRARIES LLVMX86Info LLVMX86Desc LLVMX86CodeGen)
# elseif (ARCH_AARCH64)
# list(APPEND REQUIRED_LLVM_LIBRARIES LLVMAArch64Info LLVMAArch64Desc LLVMAArch64CodeGen)
# endif ()
elseif (ARCH_AARCH64)
set (LLVM_TARGETS_TO_BUILD "AArch64" CACHE INTERNAL "")
list(APPEND REQUIRED_LLVM_LIBRARIES LLVMAArch64Info LLVMAArch64Desc LLVMAArch64CodeGen)
elseif (ARCH_PPC64LE)
set (LLVM_TARGETS_TO_BUILD "PowerPC" CACHE INTERNAL "")
list(APPEND REQUIRED_LLVM_LIBRARIES LLVMPowerPCInfo LLVMPowerPCDesc LLVMPowerPCCodeGen)
elseif (ARCH_S390X)
set (LLVM_TARGETS_TO_BUILD "SystemZ" CACHE INTERNAL "")
list(APPEND REQUIRED_LLVM_LIBRARIES LLVMSystemZInfo LLVMSystemZDesc LLVMSystemZCodeGen)
elseif (ARCH_RISCV64)
set (LLVM_TARGETS_TO_BUILD "RISCV" CACHE INTERNAL "")
list(APPEND REQUIRED_LLVM_LIBRARIES LLVMRISCVInfo LLVMRISCVDesc LLVMRISCVCodeGen)
endif ()
message (STATUS "LLVM TARGETS TO BUILD ${LLVM_TARGETS_TO_BUILD}")
set (CMAKE_INSTALL_RPATH "ON") # Do not adjust RPATH in llvm, since then it will not be able to find libcxx/libcxxabi/libunwind
set (LLVM_COMPILER_CHECKED 1 CACHE INTERNAL "") # Skip internal compiler selection
set (LLVM_ENABLE_EH 1 CACHE INTERNAL "") # With exception handling
set (LLVM_ENABLE_RTTI 1 CACHE INTERNAL "")
set (LLVM_ENABLE_PIC 0 CACHE INTERNAL "")
set (LLVM_TARGETS_TO_BUILD "X86" CACHE STRING "") # for x86 + ARM: "X86;AArch64"
# Omit unnecessary stuff (just the options which are ON by default)
set(LLVM_ENABLE_BACKTRACES 0 CACHE INTERNAL "")
@ -99,15 +109,12 @@ set(LLVM_ENABLE_BINDINGS 0 CACHE INTERNAL "")
set (LLVM_SOURCE_DIR "${ClickHouse_SOURCE_DIR}/contrib/llvm-project/llvm")
set (LLVM_BINARY_DIR "${ClickHouse_BINARY_DIR}/contrib/llvm-project/llvm")
# Since we always use toolchain files to generate hermatic builds, cmake will
# think it's a cross compilation, and LLVM will try to configure NATIVE LLVM
# targets with all tests enabled, which will slow down cmake configuration and
# compilation (You'll see Building native llvm-tblgen...). Let's disable the
# cross compiling indicator for now.
#
# TODO We should let cmake know whether it's indeed a cross compilation in the
# first place.
set (CMAKE_CROSSCOMPILING 0)
message (STATUS "LLVM CMAKE CROSS COMPILING ${CMAKE_CROSSCOMPILING}")
if (CMAKE_CROSSCOMPILING)
set (LLVM_HOST_TRIPLE "${CMAKE_C_COMPILER_TARGET}" CACHE INTERNAL "")
message (STATUS "CROSS COMPILING SET LLVM HOST TRIPLE ${LLVM_HOST_TRIPLE}")
endif()
add_subdirectory ("${LLVM_SOURCE_DIR}" "${LLVM_BINARY_DIR}")
set_directory_properties (PROPERTIES

View File

@ -21,6 +21,7 @@
<!-- disable JIT for perf tests -->
<compile_expressions>0</compile_expressions>
<compile_aggregate_expressions>0</compile_aggregate_expressions>
<compile_sort_description>0</compile_sort_description>
<!-- Don't fail some prewarm queries too early -->
<timeout_before_checking_execution_speed>60</timeout_before_checking_execution_speed>

View File

@ -0,0 +1,45 @@
---
sidebar_position: 1
sidebar_label: 2023
---
# 2023 Changelog
### ClickHouse release v23.3.9.55-lts (b9c5c8622d3) FIXME as compared to v23.3.8.21-lts (1675f2264f3)
#### Performance Improvement
* Backported in [#52213](https://github.com/ClickHouse/ClickHouse/issues/52213): Do not store blocks in `ANY` hash join if nothing is inserted. [#48633](https://github.com/ClickHouse/ClickHouse/pull/48633) ([vdimir](https://github.com/vdimir)).
* Backported in [#52826](https://github.com/ClickHouse/ClickHouse/issues/52826): Fix incorrect projection analysis which invalidates primary keys. This issue only exists when `query_plan_optimize_primary_key = 1, query_plan_optimize_projection = 1` . This fixes [#48823](https://github.com/ClickHouse/ClickHouse/issues/48823) . This fixes [#51173](https://github.com/ClickHouse/ClickHouse/issues/51173) . [#52308](https://github.com/ClickHouse/ClickHouse/pull/52308) ([Amos Bird](https://github.com/amosbird)).
#### Build/Testing/Packaging Improvement
* Backported in [#53019](https://github.com/ClickHouse/ClickHouse/issues/53019): Packing inline cache into docker images sometimes causes strange special effects. Since we don't use it at all, it's good to go. [#53008](https://github.com/ClickHouse/ClickHouse/pull/53008) ([Mikhail f. Shiryaev](https://github.com/Felixoid)).
* Backported in [#53288](https://github.com/ClickHouse/ClickHouse/issues/53288): The compiler's profile data (`-ftime-trace`) is uploaded to ClickHouse Cloud., the second attempt after [#53100](https://github.com/ClickHouse/ClickHouse/issues/53100). [#53213](https://github.com/ClickHouse/ClickHouse/pull/53213) ([Alexey Milovidov](https://github.com/alexey-milovidov)).
* Backported in [#53461](https://github.com/ClickHouse/ClickHouse/issues/53461): Preserve environment parameters in `clickhouse start` command. Fixes [#51962](https://github.com/ClickHouse/ClickHouse/issues/51962). [#53418](https://github.com/ClickHouse/ClickHouse/pull/53418) ([Mikhail f. Shiryaev](https://github.com/Felixoid)).
#### Bug Fix (user-visible misbehavior in an official stable release)
* Fix optimization to move functions before sorting. [#51481](https://github.com/ClickHouse/ClickHouse/pull/51481) ([Nikolai Kochetov](https://github.com/KochetovNicolai)).
* Fix Block structure mismatch in Pipe::unitePipes for FINAL [#51492](https://github.com/ClickHouse/ClickHouse/pull/51492) ([Nikita Taranov](https://github.com/nickitat)).
* Fix binary arithmetic for Nullable(IPv4) [#51642](https://github.com/ClickHouse/ClickHouse/pull/51642) ([Yakov Olkhovskiy](https://github.com/yakov-olkhovskiy)).
* Support IPv4 and IPv6 as dictionary attributes [#51756](https://github.com/ClickHouse/ClickHouse/pull/51756) ([Yakov Olkhovskiy](https://github.com/yakov-olkhovskiy)).
* Fix ORDER BY tuple of WINDOW functions [#52145](https://github.com/ClickHouse/ClickHouse/pull/52145) ([Alexey Milovidov](https://github.com/alexey-milovidov)).
* Disable expression templates for time intervals [#52335](https://github.com/ClickHouse/ClickHouse/pull/52335) ([Alexander Tokmakov](https://github.com/tavplubix)).
* Fix `countSubstrings()` hang with empty needle and a column haystack [#52409](https://github.com/ClickHouse/ClickHouse/pull/52409) ([Sergei Trifonov](https://github.com/serxa)).
* Fixed inserting into Buffer engine [#52440](https://github.com/ClickHouse/ClickHouse/pull/52440) ([Vasily Nemkov](https://github.com/Enmk)).
* The implementation of AnyHash was non-conformant. [#52448](https://github.com/ClickHouse/ClickHouse/pull/52448) ([Alexey Milovidov](https://github.com/alexey-milovidov)).
* init and destroy ares channel on demand.. [#52634](https://github.com/ClickHouse/ClickHouse/pull/52634) ([Arthur Passos](https://github.com/arthurpassos)).
* Fix crash in function `tuple` with one sparse column argument [#52659](https://github.com/ClickHouse/ClickHouse/pull/52659) ([Anton Popov](https://github.com/CurtizJ)).
* clickhouse-keeper: fix implementation of server with poll() [#52833](https://github.com/ClickHouse/ClickHouse/pull/52833) ([Andy Fiddaman](https://github.com/citrus-it)).
* Fix password leak in show create mysql table [#52962](https://github.com/ClickHouse/ClickHouse/pull/52962) ([Duc Canh Le](https://github.com/canhld94)).
* Fix incorrect normal projection AST format [#53347](https://github.com/ClickHouse/ClickHouse/pull/53347) ([Amos Bird](https://github.com/amosbird)).
* Fix loading lazy database during system.table select query [#53372](https://github.com/ClickHouse/ClickHouse/pull/53372) ([SmitaRKulkarni](https://github.com/SmitaRKulkarni)).
* Fix wrong columns order for queries with parallel FINAL. [#53489](https://github.com/ClickHouse/ClickHouse/pull/53489) ([Nikolai Kochetov](https://github.com/KochetovNicolai)).
* Fix: interpolate expression takes source column instead of same name aliased from select expression. [#53572](https://github.com/ClickHouse/ClickHouse/pull/53572) ([Yakov Olkhovskiy](https://github.com/yakov-olkhovskiy)).
#### NOT FOR CHANGELOG / INSIGNIFICANT
* Fix crash in comparison functions due to incorrect query analysis [#52172](https://github.com/ClickHouse/ClickHouse/pull/52172) ([Alexey Milovidov](https://github.com/alexey-milovidov)).
* Fix deadlocks in StorageTableFunctionProxy [#52626](https://github.com/ClickHouse/ClickHouse/pull/52626) ([Alexander Tokmakov](https://github.com/tavplubix)).
* Disable test_reverse_dns_query/test.py [#53195](https://github.com/ClickHouse/ClickHouse/pull/53195) ([Alexander Tokmakov](https://github.com/tavplubix)).
* Disable test_host_regexp_multiple_ptr_records/test.py [#53211](https://github.com/ClickHouse/ClickHouse/pull/53211) ([Alexander Tokmakov](https://github.com/tavplubix)).

View File

@ -13,7 +13,7 @@ If more than one table is required, it is highly recommended to use the [Materia
``` sql
CREATE TABLE postgresql_db.postgresql_replica (key UInt64, value UInt64)
ENGINE = MaterializedPostgreSQL('postgres1:5432', 'postgres_database', 'postgresql_replica', 'postgres_user', 'postgres_password')
ENGINE = MaterializedPostgreSQL('postgres1:5432', 'postgres_database', 'postgresql_table', 'postgres_user', 'postgres_password')
PRIMARY KEY key;
```

View File

@ -196,6 +196,7 @@ SELECT * FROM nestedt FORMAT TSV
- [input_format_tsv_skip_first_lines](/docs/en/operations/settings/settings-formats.md/#input_format_tsv_skip_first_lines) - skip specified number of lines at the beginning of data. Default value - `0`.
- [input_format_tsv_detect_header](/docs/en/operations/settings/settings-formats.md/#input_format_tsv_detect_header) - automatically detect header with names and types in TSV format. Default value - `true`.
- [input_format_tsv_skip_trailing_empty_lines](/docs/en/operations/settings/settings-formats.md/#input_format_tsv_skip_trailing_empty_lines) - skip trailing empty lines at the end of data. Default value - `false`.
- [input_format_tsv_allow_variable_number_of_columns](/docs/en/operations/settings/settings-formats.md/#input_format_tsv_allow_variable_number_of_columns) - allow variable number of columns in TSV format, ignore extra columns and use default values on missing columns. Default value - `false`.
## TabSeparatedRaw {#tabseparatedraw}
@ -473,7 +474,7 @@ The CSV format supports the output of totals and extremes the same way as `TabSe
- [input_format_csv_skip_trailing_empty_lines](/docs/en/operations/settings/settings-formats.md/#input_format_csv_skip_trailing_empty_lines) - skip trailing empty lines at the end of data. Default value - `false`.
- [input_format_csv_trim_whitespaces](/docs/en/operations/settings/settings-formats.md/#input_format_csv_trim_whitespaces) - trim spaces and tabs in non-quoted CSV strings. Default value - `true`.
- [input_format_csv_allow_whitespace_or_tab_as_delimiter](/docs/en/operations/settings/settings-formats.md/# input_format_csv_allow_whitespace_or_tab_as_delimiter) - Allow to use whitespace or tab as field delimiter in CSV strings. Default value - `false`.
- [input_format_csv_allow_variable_number_of_columns](/docs/en/operations/settings/settings-formats.md/#input_format_csv_allow_variable_number_of_columns) - ignore extra columns in CSV input (if file has more columns than expected) and treat missing fields in CSV input as default values. Default value - `false`.
- [input_format_csv_allow_variable_number_of_columns](/docs/en/operations/settings/settings-formats.md/#input_format_csv_allow_variable_number_of_columns) - allow variable number of columns in CSV format, ignore extra columns and use default values on missing columns. Default value - `false`.
- [input_format_csv_use_default_on_bad_values](/docs/en/operations/settings/settings-formats.md/#input_format_csv_use_default_on_bad_values) - Allow to set default value to column when CSV field deserialization failed on bad value. Default value - `false`.
## CSVWithNames {#csvwithnames}
@ -502,9 +503,10 @@ the types from input data will be compared with the types of the corresponding c
Similar to [Template](#format-template), but it prints or reads all names and types of columns and uses escaping rule from [format_custom_escaping_rule](/docs/en/operations/settings/settings-formats.md/#format_custom_escaping_rule) setting and delimiters from [format_custom_field_delimiter](/docs/en/operations/settings/settings-formats.md/#format_custom_field_delimiter), [format_custom_row_before_delimiter](/docs/en/operations/settings/settings-formats.md/#format_custom_row_before_delimiter), [format_custom_row_after_delimiter](/docs/en/operations/settings/settings-formats.md/#format_custom_row_after_delimiter), [format_custom_row_between_delimiter](/docs/en/operations/settings/settings-formats.md/#format_custom_row_between_delimiter), [format_custom_result_before_delimiter](/docs/en/operations/settings/settings-formats.md/#format_custom_result_before_delimiter) and [format_custom_result_after_delimiter](/docs/en/operations/settings/settings-formats.md/#format_custom_result_after_delimiter) settings, not from format strings.
If setting [input_format_custom_detect_header](/docs/en/operations/settings/settings-formats.md/#input_format_custom_detect_header) is enabled, ClickHouse will automatically detect header with names and types if any.
If setting [input_format_tsv_skip_trailing_empty_lines](/docs/en/operations/settings/settings-formats.md/#input_format_custom_detect_header) is enabled, trailing empty lines at the end of file will be skipped.
Additional settings:
- [input_format_custom_detect_header](/docs/en/operations/settings/settings-formats.md/#input_format_custom_detect_header) - enables automatic detection of header with names and types if any. Default value - `true`.
- [input_format_custom_skip_trailing_empty_lines](/docs/en/operations/settings/settings-formats.md/#input_format_custom_skip_trailing_empty_lines) - skip trailing empty lines at the end of file . Default value - `false`.
- [input_format_custom_allow_variable_number_of_columns](/docs/en/operations/settings/settings-formats.md/#input_format_custom_allow_variable_number_of_columns) - allow variable number of columns in CustomSeparated format, ignore extra columns and use default values on missing columns. Default value - `false`.
There is also `CustomSeparatedIgnoreSpaces` format, which is similar to [TemplateIgnoreSpaces](#templateignorespaces).
@ -1262,6 +1264,7 @@ SELECT * FROM json_each_row_nested
- [input_format_json_named_tuples_as_objects](/docs/en/operations/settings/settings-formats.md/#input_format_json_named_tuples_as_objects) - parse named tuple columns as JSON objects. Default value - `true`.
- [input_format_json_defaults_for_missing_elements_in_named_tuple](/docs/en/operations/settings/settings-formats.md/#input_format_json_defaults_for_missing_elements_in_named_tuple) - insert default values for missing elements in JSON object while parsing named tuple. Default value - `true`.
- [input_format_json_ignore_unknown_keys_in_named_tuple](/docs/en/operations/settings/settings-formats.md/#input_format_json_ignore_unknown_keys_in_named_tuple) - Ignore unknown keys in json object for named tuples. Default value - `false`.
- [input_format_json_compact_allow_variable_number_of_columns](/docs/en/operations/settings/settings-formats.md/#input_format_json_compact_allow_variable_number_of_columns) - allow variable number of columns in JSONCompact/JSONCompactEachRow format, ignore extra columns and use default values on missing columns. Default value - `false`.
- [output_format_json_quote_64bit_integers](/docs/en/operations/settings/settings-formats.md/#output_format_json_quote_64bit_integers) - controls quoting of 64-bit integers in JSON output format. Default value - `true`.
- [output_format_json_quote_64bit_floats](/docs/en/operations/settings/settings-formats.md/#output_format_json_quote_64bit_floats) - controls quoting of 64-bit floats in JSON output format. Default value - `false`.
- [output_format_json_quote_denormals](/docs/en/operations/settings/settings-formats.md/#output_format_json_quote_denormals) - enables '+nan', '-nan', '+inf', '-inf' outputs in JSON output format. Default value - `false`.

View File

@ -21,6 +21,11 @@ In most cases it is recommended to use an appropriate tool or library instead of
- [ODBC driver](../interfaces/odbc.md)
- [C++ client library](../interfaces/cpp.md)
ClickHouse server provides embedded visual interfaces for power users:
- Play UI: open `/play` in the browser;
- Advanced Dashboard: open `/dashboard` in the browser;
There are also a wide range of third-party libraries for working with ClickHouse:
- [Client libraries](../interfaces/third-party/client-libraries.md)

View File

@ -627,6 +627,13 @@ Column type should be String. If value is empty, default names `row_{i}`will be
Default value: ''.
### input_format_json_compact_allow_variable_number_of_columns {#input_format_json_compact_allow_variable_number_of_columns}
Allow variable number of columns in rows in JSONCompact/JSONCompactEachRow input formats.
Ignore extra columns in rows with more columns than expected and treat missing columns as default values.
Disabled by default.
## TSV format settings {#tsv-format-settings}
### input_format_tsv_empty_as_default {#input_format_tsv_empty_as_default}
@ -764,6 +771,13 @@ When enabled, trailing empty lines at the end of TSV file will be skipped.
Disabled by default.
### input_format_tsv_allow_variable_number_of_columns {#input_format_tsv_allow_variable_number_of_columns}
Allow variable number of columns in rows in TSV input format.
Ignore extra columns in rows with more columns than expected and treat missing columns as default values.
Disabled by default.
## CSV format settings {#csv-format-settings}
### format_csv_delimiter {#format_csv_delimiter}
@ -955,9 +969,11 @@ Result
```text
" string "
```
### input_format_csv_allow_variable_number_of_columns {#input_format_csv_allow_variable_number_of_columns}
ignore extra columns in CSV input (if file has more columns than expected) and treat missing fields in CSV input as default values.
Allow variable number of columns in rows in CSV input format.
Ignore extra columns in rows with more columns than expected and treat missing columns as default values.
Disabled by default.
@ -1571,6 +1587,13 @@ When enabled, trailing empty lines at the end of file in CustomSeparated format
Disabled by default.
### input_format_custom_allow_variable_number_of_columns {#input_format_custom_allow_variable_number_of_columns}
Allow variable number of columns in rows in CustomSeparated input format.
Ignore extra columns in rows with more columns than expected and treat missing columns as default values.
Disabled by default.
## Regexp format settings {#regexp-format-settings}
### format_regexp_escaping_rule {#format_regexp_escaping_rule}

View File

@ -1819,6 +1819,72 @@ Result:
└────────────────────────────────────┘
```
## toUTCTimestamp
Convert DateTime/DateTime64 type value from other time zone to UTC timezone timestamp
**Syntax**
``` sql
toUTCTimestamp(time_val, time_zone)
```
**Arguments**
- `time_val` — A DateTime/DateTime64 type const value or a expression . [DateTime/DateTime64 types](../../sql-reference/data-types/datetime.md)
- `time_zone` — A String type const value or a expression represent the time zone. [String types](../../sql-reference/data-types/string.md)
**Returned value**
- DateTime/DateTime64 in text form
**Example**
``` sql
SELECT toUTCTimestamp(toDateTime('2023-03-16'), 'Asia/Shanghai');
```
Result:
``` text
┌─toUTCTimestamp(toDateTime('2023-03-16'),'Asia/Shanghai')┐
│ 2023-03-15 16:00:00 │
└─────────────────────────────────────────────────────────┘
```
## fromUTCTimestamp
Convert DateTime/DateTime64 type value from UTC timezone to other time zone timestamp
**Syntax**
``` sql
fromUTCTimestamp(time_val, time_zone)
```
**Arguments**
- `time_val` — A DateTime/DateTime64 type const value or a expression . [DateTime/DateTime64 types](../../sql-reference/data-types/datetime.md)
- `time_zone` — A String type const value or a expression represent the time zone. [String types](../../sql-reference/data-types/string.md)
**Returned value**
- DateTime/DateTime64 in text form
**Example**
``` sql
SELECT fromUTCTimestamp(toDateTime64('2023-03-16 10:00:00', 3), 'Asia/Shanghai');
```
Result:
``` text
┌─fromUTCTimestamp(toDateTime64('2023-03-16 10:00:00',3),'Asia/Shanghai')─┐
│ 2023-03-16 18:00:00.000 │
└─────────────────────────────────────────────────────────────────────────┘
```
## Related content
- Blog: [Working with time series data in ClickHouse](https://clickhouse.com/blog/working-with-time-series-data-and-functions-ClickHouse)

View File

@ -443,9 +443,9 @@ SYSTEM STOP LISTEN [ON CLUSTER cluster_name] [QUERIES ALL | QUERIES DEFAULT | QU
```
- If `CUSTOM 'protocol'` modifier is specified, the custom protocol with the specified name defined in the protocols section of the server configuration will be stopped.
- If `QUERIES ALL` modifier is specified, all protocols are stopped.
- If `QUERIES DEFAULT` modifier is specified, all default protocols are stopped.
- If `QUERIES CUSTOM` modifier is specified, all custom protocols are stopped.
- If `QUERIES ALL [EXCEPT .. [,..]]` modifier is specified, all protocols are stopped, unless specified with `EXCEPT` clause.
- If `QUERIES DEFAULT [EXCEPT .. [,..]]` modifier is specified, all default protocols are stopped, unless specified with `EXCEPT` clause.
- If `QUERIES CUSTOM [EXCEPT .. [,..]]` modifier is specified, all custom protocols are stopped, unless specified with `EXCEPT` clause.
### SYSTEM START LISTEN

View File

@ -2072,6 +2072,9 @@ void Server::createServers(
for (const auto & protocol : protocols)
{
if (!server_type.shouldStart(ServerType::Type::CUSTOM, protocol))
continue;
std::string prefix = "protocols." + protocol + ".";
std::string port_name = prefix + "port";
std::string description {"<undefined> protocol"};
@ -2081,9 +2084,6 @@ void Server::createServers(
if (!config.has(prefix + "port"))
continue;
if (!server_type.shouldStart(ServerType::Type::CUSTOM, port_name))
continue;
std::vector<std::string> hosts;
if (config.has(prefix + "host"))
hosts.push_back(config.getString(prefix + "host"));

View File

@ -11,6 +11,7 @@
--background: linear-gradient(to bottom, #00CCFF, #00D0D0);
--chart-background: white;
--shadow-color: rgba(0, 0, 0, 0.25);
--moving-shadow-color: rgba(0, 0, 0, 0.5);
--input-shadow-color: rgba(0, 255, 0, 1);
--error-color: red;
--auth-error-color: white;
@ -34,6 +35,7 @@
--background: #151C2C;
--chart-background: #1b2834;
--shadow-color: rgba(0, 0, 0, 0);
--moving-shadow-color: rgba(255, 255, 255, 0.25);
--input-shadow-color: rgba(255, 128, 0, 0.25);
--error-color: #F66;
--legend-background: rgba(255, 255, 255, 0.25);
@ -91,6 +93,21 @@
position: relative;
}
.chart-maximized {
flex: 1 100%;
height: 75vh
}
.chart-moving {
z-index: 11;
box-shadow: 0 0 2rem var(--moving-shadow-color);
}
.chart-displaced {
opacity: 75%;
filter: blur(1px);
}
.chart div { position: absolute; }
.inputs {
@ -230,8 +247,8 @@
filter: contrast(125%);
}
#add, #reload {
padding: .25rem 0.5rem;
#add, #reload, #edit {
padding: 0.25rem 0.5rem;
text-align: center;
font-weight: bold;
user-select: none;
@ -244,13 +261,10 @@
margin-right: 1rem !important;
margin-left: 0rem;
margin-bottom: 1rem;
height: 3ex;
}
/* .unconnected #reload {
margin-left: 3px;
} */
#add:hover, #reload:hover {
#add:hover, #reload:hover, #edit:hover {
background: var(--button-background-color);
}
@ -306,6 +320,7 @@
}
.chart-buttons a {
margin-right: 0.25rem;
user-select: none;
}
.chart-buttons a:hover {
color: var(--chart-button-hover-color);
@ -333,18 +348,21 @@
padding: 2rem;
}
.query-editor textarea {
grid-row: 1;
grid-column: 1 / span 2;
z-index: 11;
textarea {
padding: 0.5rem;
outline: none;
border: none;
font-size: 12pt;
border-bottom: 1px solid var(--edit-title-border);
background: var(--chart-background);
color: var(--text-color);
resize: none;
}
.query-editor textarea {
grid-row: 1;
grid-column: 1 / span 2;
z-index: 11;
border-bottom: 1px solid var(--edit-title-border);
margin: 0;
}
@ -367,10 +385,41 @@
filter: contrast(125%);
}
.edit-cancel {
cursor: pointer;
background: var(--new-chart-background-color);
}
.edit-cancel:hover {
filter: contrast(125%);
}
.nowrap {
white-space: pre;
}
#mass-editor {
display: none;
grid-template-columns: auto fit-content(10%) fit-content(10%);
grid-template-rows: auto fit-content(10%);
row-gap: 1rem;
column-gap: 1rem;
}
#mass-editor-textarea {
width: 100%;
height: 100%;
grid-row: 1;
grid-column: 1 / span 3;
}
#mass-editor input {
padding: 0.5rem;
}
#mass-editor-message {
color: var(--auth-error-color);
}
/* Source: https://cdn.jsdelivr.net/npm/uplot@1.6.21/dist/uPlot.min.css
* It is copy-pasted to lower the number of requests.
*/
@ -389,6 +438,7 @@
</div>
<div id="button-options">
<span class="nowrap themes"><span id="toggle-dark">🌚</span><span id="toggle-light">🌞</span></span>
<input id="edit" type="button" value="✎" style="display: none;">
<input id="add" type="button" value="Add chart" style="display: none;">
<input id="reload" type="button" value="Reload">
<div id="chart-params"></div>
@ -397,6 +447,12 @@
<div id="auth-error"></div>
</div>
<div id="charts"></div>
<div id="mass-editor">
<textarea id="mass-editor-textarea" spellcheck="false" data-gramm="false"></textarea>
<span id="mass-editor-message">&nbsp;</span>
<input type="submit" id="mass-editor-cancel" class="edit-cancel" value="Cancel">
<input type="submit" id="mass-editor-confirm" class="edit-confirm" value="Apply">
</div>
<script>
/** Implementation note: it might be more natural to use some reactive framework.
@ -405,9 +461,7 @@
*
* TODO:
* - zoom on the graphs should work on touch devices;
* - add mass edit capability (edit the JSON config as a whole);
* - compress the state for URL's #hash;
* - save/load JSON configuration;
* - footer with "about" or a link to source code;
* - allow to configure a table on a server to save the dashboards;
* - multiple lines on chart;
@ -418,11 +472,13 @@
let host = 'https://play.clickhouse.com/';
let user = 'explorer';
let password = '';
let add_http_cors_header = true;
/// If it is hosted on server, assume that it is the address of ClickHouse.
if (location.protocol != 'file:') {
host = location.origin;
user = 'default';
add_http_cors_header = false;
}
const errorCodeMessageMap = {
@ -702,6 +758,7 @@ function insertChart(i) {
query_editor_textarea.spellcheck = false;
query_editor_textarea.value = q.query;
query_editor_textarea.placeholder = 'Query';
query_editor_textarea.setAttribute('data-gramm', false);
query_editor.appendChild(query_editor_textarea);
let query_editor_title = document.createElement('input');
@ -756,6 +813,92 @@ function insertChart(i) {
let edit_buttons = document.createElement('div');
edit_buttons.className = 'chart-buttons';
let move = document.createElement('a');
let move_text = document.createTextNode('✥');
move.appendChild(move_text);
let is_dragging = false;
move.addEventListener('mousedown', e => {
const idx = getCurrentIndex();
is_dragging = true;
chart.className = 'chart chart-moving';
let offset_x = e.clientX;
let offset_y = e.clientY;
let displace_idx = null;
let displace_chart = null;
function mouseup(e) {
is_dragging = false;
chart.className = 'chart';
chart.style.left = null;
chart.style.top = null;
if (displace_idx !== null) {
const elem = queries[idx];
queries.splice(idx, 1);
queries.splice(displace_idx, 0, elem);
displace_chart.className = 'chart';
drawAll();
}
}
function mousemove(e) {
if (!is_dragging) {
document.body.removeEventListener('mousemove', mousemove);
document.body.removeEventListener('mouseup', mouseup);
return;
}
let x = e.clientX - offset_x;
let y = e.clientY - offset_y;
chart.style.left = `${x}px`;
chart.style.top = `${y}px`;
displace_idx = null;
displace_chart = null;
let current_idx = -1;
for (const elem of charts.querySelectorAll('.chart')) {
++current_idx;
if (current_idx == idx) {
continue;
}
const this_rect = chart.getBoundingClientRect();
const this_center_x = this_rect.left + this_rect.width / 2;
const this_center_y = this_rect.top + this_rect.height / 2;
const elem_rect = elem.getBoundingClientRect();
if (this_center_x >= elem_rect.left && this_center_x <= elem_rect.right
&& this_center_y >= elem_rect.top && this_center_y <= elem_rect.bottom) {
elem.className = 'chart chart-displaced';
displace_idx = current_idx;
displace_chart = elem;
} else {
elem.className = 'chart';
}
}
}
document.body.addEventListener('mouseup', mouseup);
document.body.addEventListener('mousemove', mousemove);
});
let maximize = document.createElement('a');
let maximize_text = document.createTextNode('🗖');
maximize.appendChild(maximize_text);
maximize.addEventListener('click', e => {
const idx = getCurrentIndex();
chart.className = (chart.className == 'chart' ? 'chart chart-maximized' : 'chart');
resize();
});
let edit = document.createElement('a');
let edit_text = document.createTextNode('✎');
edit.appendChild(edit_text);
@ -788,6 +931,8 @@ function insertChart(i) {
saveState();
});
edit_buttons.appendChild(move);
edit_buttons.appendChild(maximize);
edit_buttons.appendChild(edit);
edit_buttons.appendChild(trash);
@ -815,6 +960,66 @@ document.getElementById('reload').addEventListener('click', e => {
reloadAll();
});
let mass_editor_active = false;
function showMassEditor() {
document.getElementById('charts').style.display = 'none';
let editor_div = document.getElementById('mass-editor');
editor_div.style.display = 'grid';
let editor = document.getElementById('mass-editor-textarea');
editor.value = JSON.stringify({params: params, queries: queries}, null, 2);
mass_editor_active = true;
}
function hideMassEditor() {
document.getElementById('mass-editor').style.display = 'none';
document.getElementById('charts').style.display = 'flex';
mass_editor_active = false;
}
function massEditorApplyChanges() {
let editor = document.getElementById('mass-editor-textarea');
({params, queries} = JSON.parse(editor.value));
hideMassEditor();
regenerate();
drawAll();
saveState();
}
document.getElementById('edit').addEventListener('click', e => {
if (mass_editor_active) {
massEditorApplyChanges();
hideMassEditor();
} else {
showMassEditor();
}
});
document.getElementById('mass-editor-confirm').addEventListener('click', e => {
massEditorApplyChanges();
hideMassEditor();
});
document.getElementById('mass-editor-cancel').addEventListener('click', e => {
hideMassEditor();
});
document.getElementById('mass-editor-textarea').addEventListener('input', e => {
let message = document.getElementById('mass-editor-message').firstChild;
message.data = '';
if (e.target.value != '') {
try { JSON.parse(e.target.value) } catch (e) {
message.data = e.toString();
}
}
});
function legendAsTooltipPlugin({ className, style = { background: "var(--legend-background)" } } = {}) {
let legendEl;
@ -865,8 +1070,6 @@ function legendAsTooltipPlugin({ className, style = { background: "var(--legend-
};
}
let add_http_cors_header = false;
async function draw(idx, chart, url_params, query) {
if (plots[idx]) {
plots[idx].destroy();
@ -906,12 +1109,14 @@ async function draw(idx, chart, url_params, query) {
}
if (error) {
const errorMatch = errorMessages.find(({ regex }) => error.match(regex))
const match = error.match(errorMatch.regex)
const message = errorMatch.messageFunc(match)
const errorMatch = errorMessages.find(({ regex }) => error.match(regex));
if (!errorMatch) {
throw new Error(error);
}
const match = error.match(errorMatch.regex);
const message = errorMatch.messageFunc(match);
if (message) {
const authError = new Error(message)
throw authError
throw new Error(message);
}
}
@ -978,23 +1183,23 @@ async function draw(idx, chart, url_params, query) {
}
function showAuthError(message) {
const charts = document.querySelector('#charts');
const charts = document.getElementById('charts');
charts.style.height = '0px';
charts.style.opacity = '0';
const add = document.querySelector('#add');
add.style.display = 'none';
document.getElementById('add').style.display = 'none';
document.getElementById('edit').style.display = 'none';
const authError = document.querySelector('#auth-error');
const authError = document.getElementById('auth-error');
authError.textContent = message;
authError.style.display = 'flex';
}
function hideAuthError() {
const charts = document.querySelector('#charts');
const charts = document.getElementById('charts');
charts.style.height = 'auto';
charts.style.opacity = '1';
const authError = document.querySelector('#auth-error');
const authError = document.getElementById('auth-error');
authError.textContent = '';
authError.style.display = 'none';
}
@ -1025,11 +1230,11 @@ async function drawAll() {
if (results.includes(true)) {
const element = document.querySelector('.inputs');
element.classList.remove('unconnected');
const add = document.querySelector('#add');
add.style.display = 'block';
document.getElementById('add').style.display = 'inline-block';
document.getElementById('edit').style.display = 'inline-block';
}
else {
const charts = document.querySelector('#charts')
const charts = document.getElementById('charts')
charts.style.height = '0px';
}
})
@ -1048,14 +1253,14 @@ new ResizeObserver(resize).observe(document.body);
function disableReloadButton() {
const reloadButton = document.getElementById('reload')
reloadButton.value = 'Reloading...'
reloadButton.value = 'Reloading'
reloadButton.disabled = true
reloadButton.classList.add('disabled')
}
function disableRunButton() {
const runButton = document.getElementById('run')
runButton.value = 'Reloading...'
runButton.value = 'Reloading'
runButton.disabled = true
runButton.classList.add('disabled')
}

View File

@ -465,7 +465,7 @@
<input class="monospace shadow" id="url" type="text" value="http://localhost:8123/" placeholder="url" /><input class="monospace shadow" id="user" type="text" value="default" placeholder="user" /><input class="monospace shadow" id="password" type="password" placeholder="password" />
</div>
<div id="query_div">
<textarea autofocus spellcheck="false" class="monospace shadow" id="query"></textarea>
<textarea autofocus spellcheck="false" data-gramm="false" class="monospace shadow" id="query"></textarea>
</div>
<div id="run_div">
<button class="shadow" id="run">Run</button>

View File

@ -375,7 +375,7 @@ void BackupImpl::readBackupMetadata()
if (!archive_reader->fileExists(".backup"))
throw Exception(ErrorCodes::BACKUP_NOT_FOUND, "Archive {} is not a backup", backup_name_for_logging);
setCompressedSize();
in = archive_reader->readFile(".backup");
in = archive_reader->readFile(".backup", /*throw_on_not_found=*/true);
}
else
{
@ -685,7 +685,7 @@ std::unique_ptr<SeekableReadBuffer> BackupImpl::readFileImpl(const SizeAndChecks
{
/// Make `read_buffer` if there is data for this backup entry in this backup.
if (use_archive)
read_buffer = archive_reader->readFile(info.data_file_name);
read_buffer = archive_reader->readFile(info.data_file_name, /*throw_on_not_found=*/true);
else
read_buffer = reader->readFile(info.data_file_name);
}

View File

@ -644,7 +644,7 @@ class IColumn;
M(Bool, database_replicated_always_detach_permanently, false, "Execute DETACH TABLE as DETACH TABLE PERMANENTLY if database engine is Replicated", 0) \
M(Bool, database_replicated_allow_only_replicated_engine, false, "Allow to create only Replicated tables in database with engine Replicated", 0) \
M(Bool, database_replicated_allow_replicated_engine_arguments, true, "Allow to create only Replicated tables in database with engine Replicated with explicit arguments", 0) \
M(DistributedDDLOutputMode, distributed_ddl_output_mode, DistributedDDLOutputMode::THROW, "Format of distributed DDL query result", 0) \
M(DistributedDDLOutputMode, distributed_ddl_output_mode, DistributedDDLOutputMode::THROW, "Format of distributed DDL query result, one of: 'none', 'throw', 'null_status_on_timeout', 'never_throw'", 0) \
M(UInt64, distributed_ddl_entry_format_version, 5, "Compatibility version of distributed DDL (ON CLUSTER) queries", 0) \
\
M(UInt64, external_storage_max_read_rows, 0, "Limit maximum number of rows when table with external engine should flush history data. Now supported only for MySQL table engine, database engine, dictionary and MaterializedMySQL. If equal to 0, this setting is disabled", 0) \
@ -894,6 +894,10 @@ class IColumn;
M(Bool, input_format_csv_allow_whitespace_or_tab_as_delimiter, false, "Allow to use spaces and tabs(\\t) as field delimiter in the CSV strings", 0) \
M(Bool, input_format_csv_trim_whitespaces, true, "Trims spaces and tabs (\\t) characters at the beginning and end in CSV strings", 0) \
M(Bool, input_format_csv_use_default_on_bad_values, false, "Allow to set default value to column when CSV field deserialization failed on bad value", 0) \
M(Bool, input_format_csv_allow_variable_number_of_columns, false, "Ignore extra columns in CSV input (if file has more columns than expected) and treat missing fields in CSV input as default values", 0) \
M(Bool, input_format_tsv_allow_variable_number_of_columns, false, "Ignore extra columns in TSV input (if file has more columns than expected) and treat missing fields in TSV input as default values", 0) \
M(Bool, input_format_custom_allow_variable_number_of_columns, false, "Ignore extra columns in CustomSeparated input (if file has more columns than expected) and treat missing fields in CustomSeparated input as default values", 0) \
M(Bool, input_format_json_compact_allow_variable_number_of_columns, false, "Ignore extra columns in JSONCompact(EachRow) input (if file has more columns than expected) and treat missing fields in JSONCompact(EachRow) input as default values", 0) \
M(Bool, input_format_tsv_detect_header, true, "Automatically detect header with names and types in TSV format", 0) \
M(Bool, input_format_custom_detect_header, true, "Automatically detect header with names and types in CustomSeparated format", 0) \
M(Bool, input_format_parquet_skip_columns_with_unsupported_types_in_schema_inference, false, "Skip columns with unsupported types while schema inference for format Parquet", 0) \
@ -1042,7 +1046,6 @@ class IColumn;
M(Bool, regexp_dict_allow_hyperscan, true, "Allow regexp_tree dictionary using Hyperscan library.", 0) \
\
M(Bool, dictionary_use_async_executor, false, "Execute a pipeline for reading from a dictionary with several threads. It's supported only by DIRECT dictionary with CLICKHOUSE source.", 0) \
M(Bool, input_format_csv_allow_variable_number_of_columns, false, "Ignore extra columns in CSV input (if file has more columns than expected) and treat missing fields in CSV input as default values", 0) \
M(Bool, precise_float_parsing, false, "Prefer more precise (but slower) float parsing algorithm", 0) \
// End of FORMAT_FACTORY_SETTINGS

View File

@ -10,6 +10,7 @@
#include <IO/ReadHelpers.h>
#include <IO/WriteHelpers.h>
#include <IO/parseDateTimeBestEffort.h>
#include <IO/ReadBufferFromString.h>
namespace DB
{
@ -145,12 +146,29 @@ void SerializationDateTime::deserializeTextCSV(IColumn & column, ReadBuffer & is
char maybe_quote = *istr.position();
if (maybe_quote == '\'' || maybe_quote == '\"')
{
++istr.position();
readText(x, istr, settings, time_zone, utc_time_zone);
if (maybe_quote == '\'' || maybe_quote == '\"')
readText(x, istr, settings, time_zone, utc_time_zone);
assertChar(maybe_quote, istr);
}
else
{
if (settings.csv.delimiter != ',' || settings.date_time_input_format == FormatSettings::DateTimeInputFormat::Basic)
{
readText(x, istr, settings, time_zone, utc_time_zone);
}
/// Best effort parsing supports datetime in format like "01.01.2000, 00:00:00"
/// and can mistakenly read comma as a part of datetime.
/// For example data "...,01.01.2000,some string,..." cannot be parsed correctly.
/// To fix this problem we first read CSV string and then try to parse it as datetime.
else
{
String datetime_str;
readCSVString(datetime_str, istr, settings.csv);
ReadBufferFromString buf(datetime_str);
readText(x, buf, settings, time_zone, utc_time_zone);
}
}
if (x < 0)
x = 0;

View File

@ -9,6 +9,7 @@
#include <IO/WriteBufferFromString.h>
#include <IO/WriteHelpers.h>
#include <IO/parseDateTimeBestEffort.h>
#include <IO/ReadBufferFromString.h>
namespace DB
{
@ -143,12 +144,29 @@ void SerializationDateTime64::deserializeTextCSV(IColumn & column, ReadBuffer &
char maybe_quote = *istr.position();
if (maybe_quote == '\'' || maybe_quote == '\"')
{
++istr.position();
readText(x, scale, istr, settings, time_zone, utc_time_zone);
if (maybe_quote == '\'' || maybe_quote == '\"')
readText(x, scale, istr, settings, time_zone, utc_time_zone);
assertChar(maybe_quote, istr);
}
else
{
if (settings.csv.delimiter != ',' || settings.date_time_input_format == FormatSettings::DateTimeInputFormat::Basic)
{
readText(x, scale, istr, settings, time_zone, utc_time_zone);
}
/// Best effort parsing supports datetime in format like "01.01.2000, 00:00:00"
/// and can mistakenly read comma as a part of datetime.
/// For example data "...,01.01.2000,some string,..." cannot be parsed correctly.
/// To fix this problem we first read CSV string and then try to parse it as datetime.
else
{
String datetime_str;
readCSVString(datetime_str, istr, settings.csv);
ReadBufferFromString buf(datetime_str);
readText(x, scale, buf, settings, time_zone, utc_time_zone);
}
}
assert_cast<ColumnType &>(column).getData().push_back(x);
}

View File

@ -23,11 +23,10 @@ StoragePtr IDatabase::getTable(const String & name, ContextPtr context) const
return storage;
TableNameHints hints(this->shared_from_this(), context);
std::vector<String> names = hints.getHints(name);
if (!names.empty())
{
if (names.empty())
throw Exception(ErrorCodes::UNKNOWN_TABLE, "Table {}.{} does not exist", backQuoteIfNeed(getDatabaseName()), backQuoteIfNeed(name));
else
throw Exception(ErrorCodes::UNKNOWN_TABLE, "Table {}.{} does not exist. Maybe you meant {}?", backQuoteIfNeed(getDatabaseName()), backQuoteIfNeed(name), backQuoteIfNeed(names[0]));
}
else throw Exception(ErrorCodes::UNKNOWN_TABLE, "Table {}.{} does not exist", backQuoteIfNeed(getDatabaseName()), backQuoteIfNeed(name));
}
std::vector<std::pair<ASTPtr, StoragePtr>> IDatabase::getTablesForBackup(const FilterByNameFunction &, const ContextPtr &) const

View File

@ -86,6 +86,7 @@ FormatSettings getFormatSettings(ContextPtr context, const Settings & settings)
format_settings.custom.row_between_delimiter = settings.format_custom_row_between_delimiter;
format_settings.custom.try_detect_header = settings.input_format_custom_detect_header;
format_settings.custom.skip_trailing_empty_lines = settings.input_format_custom_skip_trailing_empty_lines;
format_settings.custom.allow_variable_number_of_columns = settings.input_format_custom_allow_variable_number_of_columns;
format_settings.date_time_input_format = settings.date_time_input_format;
format_settings.date_time_output_format = settings.date_time_output_format;
format_settings.interval.output_format = settings.interval_output_format;
@ -115,6 +116,7 @@ FormatSettings getFormatSettings(ContextPtr context, const Settings & settings)
format_settings.json.validate_utf8 = settings.output_format_json_validate_utf8;
format_settings.json_object_each_row.column_for_object_name = settings.format_json_object_each_row_column_for_object_name;
format_settings.json.allow_object_type = context->getSettingsRef().allow_experimental_object_type;
format_settings.json.compact_allow_variable_number_of_columns = settings.input_format_json_compact_allow_variable_number_of_columns;
format_settings.null_as_default = settings.input_format_null_as_default;
format_settings.decimal_trailing_zeros = settings.output_format_decimal_trailing_zeros;
format_settings.parquet.row_group_rows = settings.output_format_parquet_row_group_size;
@ -163,6 +165,7 @@ FormatSettings getFormatSettings(ContextPtr context, const Settings & settings)
format_settings.tsv.skip_first_lines = settings.input_format_tsv_skip_first_lines;
format_settings.tsv.try_detect_header = settings.input_format_tsv_detect_header;
format_settings.tsv.skip_trailing_empty_lines = settings.input_format_tsv_skip_trailing_empty_lines;
format_settings.tsv.allow_variable_number_of_columns = settings.input_format_tsv_allow_variable_number_of_columns;
format_settings.values.accurate_types_of_literals = settings.input_format_values_accurate_types_of_literals;
format_settings.values.deduce_templates_of_expressions = settings.input_format_values_deduce_templates_of_expressions;
format_settings.values.interpret_expressions = settings.input_format_values_interpret_expressions;

View File

@ -175,6 +175,7 @@ struct FormatSettings
EscapingRule escaping_rule = EscapingRule::Escaped;
bool try_detect_header = true;
bool skip_trailing_empty_lines = false;
bool allow_variable_number_of_columns = false;
} custom;
struct
@ -197,6 +198,7 @@ struct FormatSettings
bool validate_types_from_metadata = true;
bool validate_utf8 = false;
bool allow_object_type = false;
bool compact_allow_variable_number_of_columns = false;
} json;
struct
@ -317,6 +319,7 @@ struct FormatSettings
UInt64 skip_first_lines = 0;
bool try_detect_header = true;
bool skip_trailing_empty_lines = false;
bool allow_variable_number_of_columns = false;
} tsv;
struct

View File

@ -0,0 +1,141 @@
#include <Columns/ColumnConst.h>
#include <Columns/ColumnsDateTime.h>
#include <Columns/ColumnString.h>
#include <Columns/ColumnsNumber.h>
#include <Columns/IColumn.h>
#include <Common/DateLUT.h>
#include <Common/LocalDateTime.h>
#include <Common/logger_useful.h>
#include <Core/DecimalFunctions.h>
#include <DataTypes/DataTypeDateTime.h>
#include <DataTypes/DataTypeDateTime64.h>
#include <DataTypes/DataTypeNullable.h>
#include <DataTypes/TimezoneMixin.h>
#include <Functions/FunctionFactory.h>
#include <Functions/IFunction.h>
#include <Functions/FunctionHelpers.h>
#include <Interpreters/Context.h>
namespace DB
{
namespace ErrorCodes
{
extern const int ILLEGAL_COLUMN;
extern const int ILLEGAL_TYPE_OF_ARGUMENT;
extern const int NUMBER_OF_ARGUMENTS_DOESNT_MATCH;
}
namespace
{
template <typename Name>
class UTCTimestampTransform : public IFunction
{
public:
static FunctionPtr create(ContextPtr) { return std::make_shared<UTCTimestampTransform>(); }
static constexpr auto name = Name::name;
String getName() const override { return name; }
size_t getNumberOfArguments() const override { return 2; }
bool useDefaultImplementationForConstants() const override { return true; }
bool isSuitableForShortCircuitArgumentsExecution(const DataTypesWithConstInfo & /*arguments*/) const override { return true; }
ColumnNumbers getArgumentsThatAreAlwaysConstant() const override { return {1}; }
DataTypePtr getReturnTypeImpl(const DataTypes & arguments) const override
{
if (arguments.size() != 2)
throw Exception(ErrorCodes::NUMBER_OF_ARGUMENTS_DOESNT_MATCH, "Function {}'s arguments number must be 2.", name);
WhichDataType which_type_first(arguments[0]);
WhichDataType which_type_second(arguments[1]);
if (!which_type_first.isDateTime() && !which_type_first.isDateTime64())
throw Exception(ErrorCodes::ILLEGAL_TYPE_OF_ARGUMENT, "Function {}'s 1st argument type must be datetime.", name);
if (dynamic_cast<const TimezoneMixin *>(arguments[0].get())->hasExplicitTimeZone())
throw Exception(ErrorCodes::ILLEGAL_TYPE_OF_ARGUMENT, "Function {}'s 1st argument should not have explicit time zone.", name);
if (!which_type_second.isString())
throw Exception(ErrorCodes::ILLEGAL_TYPE_OF_ARGUMENT, "Function {}'s 2nd argument type must be string.", name);
DataTypePtr date_time_type;
if (which_type_first.isDateTime())
date_time_type = std::make_shared<DataTypeDateTime>();
else
{
const DataTypeDateTime64 * date_time_64 = static_cast<const DataTypeDateTime64 *>(arguments[0].get());
date_time_type = std::make_shared<DataTypeDateTime64>(date_time_64->getScale());
}
return date_time_type;
}
ColumnPtr executeImpl(const ColumnsWithTypeAndName & arguments, const DataTypePtr & result_type, size_t) const override
{
if (arguments.size() != 2)
throw Exception(ErrorCodes::NUMBER_OF_ARGUMENTS_DOESNT_MATCH, "Function {}'s arguments number must be 2.", name);
ColumnWithTypeAndName arg1 = arguments[0];
ColumnWithTypeAndName arg2 = arguments[1];
const auto * time_zone_const_col = checkAndGetColumnConstData<ColumnString>(arg2.column.get());
if (!time_zone_const_col)
throw Exception(ErrorCodes::ILLEGAL_COLUMN, "Illegal column {} of 2nd argument of function {}. Excepted const(String).", arg2.column->getName(), name);
String time_zone_val = time_zone_const_col->getDataAt(0).toString();
auto column = result_type->createColumn();
if (WhichDataType(arg1.type).isDateTime())
{
const auto * date_time_col = checkAndGetColumn<ColumnDateTime>(arg1.column.get());
for (size_t i = 0; i < date_time_col->size(); ++i)
{
UInt32 date_time_val = date_time_col->getElement(i);
LocalDateTime date_time(date_time_val, Name::to ? DateLUT::instance("UTC") : DateLUT::instance(time_zone_val));
time_t time_val = date_time.to_time_t(Name::from ? DateLUT::instance("UTC") : DateLUT::instance(time_zone_val));
column->insert(time_val);
}
}
else if (WhichDataType(arg1.type).isDateTime64())
{
const auto * date_time_col = checkAndGetColumn<ColumnDateTime64>(arg1.column.get());
const DataTypeDateTime64 * date_time_type = static_cast<const DataTypeDateTime64 *>(arg1.type.get());
Int64 scale_multiplier = DecimalUtils::scaleMultiplier<Int64>(date_time_type->getScale());
for (size_t i = 0; i < date_time_col->size(); ++i)
{
DateTime64 date_time_val = date_time_col->getElement(i);
Int64 seconds = date_time_val.value / scale_multiplier;
Int64 micros = date_time_val.value % scale_multiplier;
LocalDateTime date_time(seconds, Name::to ? DateLUT::instance("UTC") : DateLUT::instance(time_zone_val));
time_t time_val = date_time.to_time_t(Name::from ? DateLUT::instance("UTC") : DateLUT::instance(time_zone_val));
DateTime64 date_time_64(time_val * scale_multiplier + micros);
column->insert(date_time_64);
}
}
else
throw Exception(ErrorCodes::ILLEGAL_TYPE_OF_ARGUMENT, "Function {}'s 1st argument can only be datetime/datatime64. ", name);
return column;
}
};
struct NameToUTCTimestamp
{
static constexpr auto name = "toUTCTimestamp";
static constexpr auto from = false;
static constexpr auto to = true;
};
struct NameFromUTCTimestamp
{
static constexpr auto name = "fromUTCTimestamp";
static constexpr auto from = true;
static constexpr auto to = false;
};
using ToUTCTimestampFunction = UTCTimestampTransform<NameToUTCTimestamp>;
using FromUTCTimestampFunction = UTCTimestampTransform<NameFromUTCTimestamp>;
}
REGISTER_FUNCTION(UTCTimestampTransform)
{
factory.registerFunction<ToUTCTimestampFunction>();
factory.registerFunction<FromUTCTimestampFunction>();
factory.registerAlias("to_utc_timestamp", NameToUTCTimestamp::name, FunctionFactory::CaseInsensitive);
factory.registerAlias("from_utc_timestamp", NameFromUTCTimestamp::name, FunctionFactory::CaseInsensitive);
}
}

View File

@ -50,8 +50,8 @@ public:
/// Starts reading a file from the archive. The function returns a read buffer,
/// you can read that buffer to extract uncompressed data from the archive.
/// Several read buffers can be used at the same time in parallel.
virtual std::unique_ptr<ReadBufferFromFileBase> readFile(const String & filename) = 0;
virtual std::unique_ptr<ReadBufferFromFileBase> readFile(NameFilter filter) = 0;
virtual std::unique_ptr<ReadBufferFromFileBase> readFile(const String & filename, bool throw_on_not_found) = 0;
virtual std::unique_ptr<ReadBufferFromFileBase> readFile(NameFilter filter, bool throw_on_not_found) = 0;
/// It's possible to convert a file enumerator to a read buffer and vice versa.
virtual std::unique_ptr<ReadBufferFromFileBase> readFile(std::unique_ptr<FileEnumerator> enumerator) = 0;

View File

@ -155,7 +155,7 @@ private:
archive_read_support_filter_all(archive);
archive_read_support_format_all(archive);
if (archive_read_open_filename(archive, path_to_archive.c_str(), 10240) != ARCHIVE_OK)
throw Exception(ErrorCodes::CANNOT_UNPACK_ARCHIVE, "Couldn't open archive: {}", quoteString(path_to_archive));
throw Exception(ErrorCodes::CANNOT_UNPACK_ARCHIVE, "Couldn't open archive {}: {}", quoteString(path_to_archive), archive_error_string(archive));
}
catch (...)
{
@ -293,17 +293,21 @@ std::unique_ptr<LibArchiveReader::FileEnumerator> LibArchiveReader::firstFile()
return std::make_unique<FileEnumeratorImpl>(std::move(handle));
}
std::unique_ptr<ReadBufferFromFileBase> LibArchiveReader::readFile(const String & filename)
std::unique_ptr<ReadBufferFromFileBase> LibArchiveReader::readFile(const String & filename, bool throw_on_not_found)
{
return readFile([&](const std::string & file) { return file == filename; });
return readFile([&](const std::string & file) { return file == filename; }, throw_on_not_found);
}
std::unique_ptr<ReadBufferFromFileBase> LibArchiveReader::readFile(NameFilter filter)
std::unique_ptr<ReadBufferFromFileBase> LibArchiveReader::readFile(NameFilter filter, bool throw_on_not_found)
{
Handle handle(path_to_archive, lock_on_reading);
if (!handle.locateFile(filter))
throw Exception(
ErrorCodes::CANNOT_UNPACK_ARCHIVE, "Couldn't unpack archive {}: no file found satisfying the filter", path_to_archive);
{
if (throw_on_not_found)
throw Exception(
ErrorCodes::CANNOT_UNPACK_ARCHIVE, "Couldn't unpack archive {}: no file found satisfying the filter", path_to_archive);
return nullptr;
}
return std::make_unique<ReadBufferFromLibArchive>(std::move(handle), path_to_archive);
}

View File

@ -34,8 +34,8 @@ public:
/// Starts reading a file from the archive. The function returns a read buffer,
/// you can read that buffer to extract uncompressed data from the archive.
/// Several read buffers can be used at the same time in parallel.
std::unique_ptr<ReadBufferFromFileBase> readFile(const String & filename) override;
std::unique_ptr<ReadBufferFromFileBase> readFile(NameFilter filter) override;
std::unique_ptr<ReadBufferFromFileBase> readFile(const String & filename, bool throw_on_not_found) override;
std::unique_ptr<ReadBufferFromFileBase> readFile(NameFilter filter, bool throw_on_not_found) override;
/// It's possible to convert a file enumerator to a read buffer and vice versa.
std::unique_ptr<ReadBufferFromFileBase> readFile(std::unique_ptr<FileEnumerator> enumerator) override;

View File

@ -75,21 +75,22 @@ public:
RawHandle getRawHandle() const { return raw_handle; }
std::shared_ptr<ZipArchiveReader> getReader() const { return reader; }
void locateFile(const String & file_name_)
bool locateFile(const String & file_name_)
{
resetFileInfo();
bool case_sensitive = true;
int err = unzLocateFile(raw_handle, file_name_.c_str(), reinterpret_cast<unzFileNameComparer>(static_cast<size_t>(case_sensitive)));
if (err == UNZ_END_OF_LIST_OF_FILE)
showError("File " + quoteString(file_name_) + " not found");
return false;
file_name = file_name_;
return true;
}
void locateFile(NameFilter filter)
bool locateFile(NameFilter filter)
{
int err = unzGoToFirstFile(raw_handle);
if (err == UNZ_END_OF_LIST_OF_FILE)
showError("No file was found satisfying the filter");
return false;
do
{
@ -97,12 +98,12 @@ public:
resetFileInfo();
retrieveFileInfo();
if (filter(getFileName()))
return;
return true;
err = unzGoToNextFile(raw_handle);
} while (err != UNZ_END_OF_LIST_OF_FILE);
showError("No file was found satisfying the filter");
return false;
}
bool tryLocateFile(const String & file_name_)
@ -513,7 +514,9 @@ bool ZipArchiveReader::fileExists(const String & filename)
ZipArchiveReader::FileInfo ZipArchiveReader::getFileInfo(const String & filename)
{
auto handle = acquireHandle();
handle.locateFile(filename);
if (!handle.locateFile(filename))
showError(fmt::format("File {} was not found in archive", quoteString(filename)));
return handle.getFileInfo();
}
@ -525,17 +528,31 @@ std::unique_ptr<ZipArchiveReader::FileEnumerator> ZipArchiveReader::firstFile()
return std::make_unique<FileEnumeratorImpl>(std::move(handle));
}
std::unique_ptr<ReadBufferFromFileBase> ZipArchiveReader::readFile(const String & filename)
std::unique_ptr<ReadBufferFromFileBase> ZipArchiveReader::readFile(const String & filename, bool throw_on_not_found)
{
auto handle = acquireHandle();
handle.locateFile(filename);
if (!handle.locateFile(filename))
{
if (throw_on_not_found)
showError(fmt::format("File {} was not found in archive", quoteString(filename)));
return nullptr;
}
return std::make_unique<ReadBufferFromZipArchive>(std::move(handle));
}
std::unique_ptr<ReadBufferFromFileBase> ZipArchiveReader::readFile(NameFilter filter)
std::unique_ptr<ReadBufferFromFileBase> ZipArchiveReader::readFile(NameFilter filter, bool throw_on_not_found)
{
auto handle = acquireHandle();
handle.locateFile(filter);
if (!handle.locateFile(filter))
{
if (throw_on_not_found)
showError(fmt::format("No file satisfying filter in archive"));
return nullptr;
}
return std::make_unique<ReadBufferFromZipArchive>(std::move(handle));
}

View File

@ -41,8 +41,8 @@ public:
/// Starts reading a file from the archive. The function returns a read buffer,
/// you can read that buffer to extract uncompressed data from the archive.
/// Several read buffers can be used at the same time in parallel.
std::unique_ptr<ReadBufferFromFileBase> readFile(const String & filename) override;
std::unique_ptr<ReadBufferFromFileBase> readFile(NameFilter filter) override;
std::unique_ptr<ReadBufferFromFileBase> readFile(const String & filename, bool throw_on_not_found) override;
std::unique_ptr<ReadBufferFromFileBase> readFile(NameFilter filter, bool throw_on_not_found) override;
/// It's possible to convert a file enumerator to a read buffer and vice versa.
std::unique_ptr<ReadBufferFromFileBase> readFile(std::unique_ptr<FileEnumerator> enumerator) override;

View File

@ -24,6 +24,18 @@ std::shared_ptr<IArchiveReader> createArchiveReader(
[[maybe_unused]] const std::function<std::unique_ptr<SeekableReadBuffer>()> & archive_read_function,
[[maybe_unused]] size_t archive_size)
{
using namespace std::literals;
static constexpr std::array tar_extensions
{
".tar"sv,
".tar.gz"sv,
".tgz"sv,
".tar.zst"sv,
".tzst"sv,
".tar.xz"sv,
".tar.bz2"sv
};
if (path_to_archive.ends_with(".zip") || path_to_archive.ends_with(".zipx"))
{
#if USE_MINIZIP
@ -32,7 +44,8 @@ std::shared_ptr<IArchiveReader> createArchiveReader(
throw Exception(ErrorCodes::SUPPORT_IS_DISABLED, "minizip library is disabled");
#endif
}
else if (path_to_archive.ends_with(".tar") || path_to_archive.ends_with("tar.gz"))
else if (std::any_of(
tar_extensions.begin(), tar_extensions.end(), [&](const auto extension) { return path_to_archive.ends_with(extension); }))
{
#if USE_LIBARCHIVE
return std::make_shared<TarArchiveReader>(path_to_archive);

View File

@ -113,11 +113,11 @@ TEST_P(ArchiveReaderAndWriterTest, EmptyArchive)
EXPECT_FALSE(reader->fileExists("nofile.txt"));
expectException(ErrorCodes::CANNOT_UNPACK_ARCHIVE, "File 'nofile.txt' not found",
expectException(ErrorCodes::CANNOT_UNPACK_ARCHIVE, "File 'nofile.txt' was not found in archive",
[&]{ reader->getFileInfo("nofile.txt"); });
expectException(ErrorCodes::CANNOT_UNPACK_ARCHIVE, "File 'nofile.txt' not found",
[&]{ reader->readFile("nofile.txt"); });
expectException(ErrorCodes::CANNOT_UNPACK_ARCHIVE, "File 'nofile.txt' was not found in archive",
[&]{ reader->readFile("nofile.txt", /*throw_on_not_found=*/true); });
EXPECT_EQ(reader->firstFile(), nullptr);
}
@ -145,7 +145,7 @@ TEST_P(ArchiveReaderAndWriterTest, SingleFileInArchive)
EXPECT_GT(file_info.compressed_size, 0);
{
auto in = reader->readFile("a.txt");
auto in = reader->readFile("a.txt", /*throw_on_not_found=*/true);
String str;
readStringUntilEOF(str, *in);
EXPECT_EQ(str, contents);
@ -215,14 +215,14 @@ TEST_P(ArchiveReaderAndWriterTest, TwoFilesInArchive)
EXPECT_EQ(reader->getFileInfo("b/c.txt").uncompressed_size, c_contents.size());
{
auto in = reader->readFile("a.txt");
auto in = reader->readFile("a.txt", /*throw_on_not_found=*/true);
String str;
readStringUntilEOF(str, *in);
EXPECT_EQ(str, a_contents);
}
{
auto in = reader->readFile("b/c.txt");
auto in = reader->readFile("b/c.txt", /*throw_on_not_found=*/true);
String str;
readStringUntilEOF(str, *in);
EXPECT_EQ(str, c_contents);
@ -230,7 +230,7 @@ TEST_P(ArchiveReaderAndWriterTest, TwoFilesInArchive)
{
/// Read a.txt again.
auto in = reader->readFile("a.txt");
auto in = reader->readFile("a.txt", /*throw_on_not_found=*/true);
String str;
readStringUntilEOF(str, *in);
EXPECT_EQ(str, a_contents);
@ -302,14 +302,14 @@ TEST_P(ArchiveReaderAndWriterTest, InMemory)
EXPECT_EQ(reader->getFileInfo("b.txt").uncompressed_size, b_contents.size());
{
auto in = reader->readFile("a.txt");
auto in = reader->readFile("a.txt", /*throw_on_not_found=*/true);
String str;
readStringUntilEOF(str, *in);
EXPECT_EQ(str, a_contents);
}
{
auto in = reader->readFile("b.txt");
auto in = reader->readFile("b.txt", /*throw_on_not_found=*/true);
String str;
readStringUntilEOF(str, *in);
EXPECT_EQ(str, b_contents);
@ -317,7 +317,7 @@ TEST_P(ArchiveReaderAndWriterTest, InMemory)
{
/// Read a.txt again.
auto in = reader->readFile("a.txt");
auto in = reader->readFile("a.txt", /*throw_on_not_found=*/true);
String str;
readStringUntilEOF(str, *in);
EXPECT_EQ(str, a_contents);
@ -343,19 +343,19 @@ TEST_P(ArchiveReaderAndWriterTest, Password)
/// Try to read without a password.
expectException(ErrorCodes::CANNOT_UNPACK_ARCHIVE, "Password is required",
[&]{ reader->readFile("a.txt"); });
[&]{ reader->readFile("a.txt", /*throw_on_not_found=*/true); });
{
/// Try to read with a wrong password.
reader->setPassword("123Qwe");
expectException(ErrorCodes::CANNOT_UNPACK_ARCHIVE, "Wrong password",
[&]{ reader->readFile("a.txt"); });
[&]{ reader->readFile("a.txt", /*throw_on_not_found=*/true); });
}
{
/// Reading with the right password is successful.
reader->setPassword("Qwe123");
auto in = reader->readFile("a.txt");
auto in = reader->readFile("a.txt", /*throw_on_not_found=*/true);
String str;
readStringUntilEOF(str, *in);
EXPECT_EQ(str, contents);
@ -387,7 +387,7 @@ TEST(TarArchiveReaderTest, ReadFile) {
bool created = createArchiveWithFiles<ArchiveType::Tar>(archive_path, {{filename, contents}});
EXPECT_EQ(created, true);
auto reader = createArchiveReader(archive_path);
auto in = reader->readFile(filename);
auto in = reader->readFile(filename, /*throw_on_not_found=*/true);
String str;
readStringUntilEOF(str, *in);
EXPECT_EQ(str, contents);
@ -405,11 +405,11 @@ TEST(TarArchiveReaderTest, ReadTwoFiles) {
auto reader = createArchiveReader(archive_path);
EXPECT_EQ(reader->fileExists(file1), true);
EXPECT_EQ(reader->fileExists(file2), true);
auto in = reader->readFile(file1);
auto in = reader->readFile(file1, /*throw_on_not_found=*/true);
String str;
readStringUntilEOF(str, *in);
EXPECT_EQ(str, contents1);
in = reader->readFile(file2);
in = reader->readFile(file2, /*throw_on_not_found=*/true);
readStringUntilEOF(str, *in);
EXPECT_EQ(str, contents2);
@ -448,7 +448,7 @@ TEST(SevenZipArchiveReaderTest, ReadFile) {
bool created = createArchiveWithFiles<ArchiveType::SevenZip>(archive_path, {{filename, contents}});
EXPECT_EQ(created, true);
auto reader = createArchiveReader(archive_path);
auto in = reader->readFile(filename);
auto in = reader->readFile(filename, /*throw_on_not_found=*/true);
String str;
readStringUntilEOF(str, *in);
EXPECT_EQ(str, contents);
@ -479,11 +479,11 @@ TEST(SevenZipArchiveReaderTest, ReadTwoFiles) {
auto reader = createArchiveReader(archive_path);
EXPECT_EQ(reader->fileExists(file1), true);
EXPECT_EQ(reader->fileExists(file2), true);
auto in = reader->readFile(file1);
auto in = reader->readFile(file1, /*throw_on_not_found=*/true);
String str;
readStringUntilEOF(str, *in);
EXPECT_EQ(str, contents1);
in = reader->readFile(file2);
in = reader->readFile(file2, /*throw_on_not_found=*/true);
readStringUntilEOF(str, *in);
EXPECT_EQ(str, contents2);

View File

@ -273,6 +273,8 @@ public:
/// Are distributed DDL Queries (ON CLUSTER Clause) allowed for this cluster
bool areDistributedDDLQueriesAllowed() const { return allow_distributed_ddl_queries; }
const String & getName() const { return name; }
private:
SlotToShard slot_to_shard;

View File

@ -28,7 +28,7 @@ namespace DB
namespace ErrorCodes
{
extern const int TOO_LARGE_DISTRIBUTED_DEPTH;
extern const int SUPPORT_IS_DISABLED;
extern const int LOGICAL_ERROR;
}
namespace ClusterProxy
@ -234,7 +234,8 @@ void executeQuery(
std::move(external_tables),
log,
shards,
query_info.storage_limits);
query_info.storage_limits,
query_info.getCluster()->getName());
read_from_remote->setStepDescription("Read from remote replica");
plan->addStep(std::move(read_from_remote));
@ -266,20 +267,57 @@ void executeQueryWithParallelReplicas(
const StorageID & main_table,
const ASTPtr & table_func_ptr,
SelectStreamFactory & stream_factory,
const ASTPtr & query_ast, ContextPtr context, const SelectQueryInfo & query_info,
const ASTPtr & query_ast,
ContextPtr context,
const SelectQueryInfo & query_info,
const ClusterPtr & not_optimized_cluster)
{
if (not_optimized_cluster->getShardsInfo().size() != 1)
throw Exception(ErrorCodes::SUPPORT_IS_DISABLED, "Cluster for parallel replicas should consist only from one shard");
auto shard_info = not_optimized_cluster->getShardsInfo().front();
const auto & settings = context->getSettingsRef();
ClusterPtr new_cluster = not_optimized_cluster->getClusterWithReplicasAsShards(settings);
auto new_context = Context::createCopy(context);
auto scalars = new_context->hasQueryContext() ? new_context->getQueryContext()->getScalars() : Scalars{};
UInt64 shard_num = 0; /// shard_num is 1-based, so 0 - no shard specified
const auto it = scalars.find("_shard_num");
if (it != scalars.end())
{
const Block & block = it->second;
const auto & column = block.safeGetByPosition(0).column;
shard_num = column->getUInt(0);
}
size_t all_replicas_count = 0;
ClusterPtr new_cluster;
/// if got valid shard_num from query initiator, then parallel replicas scope is the specified shard
/// shards are numbered in order of appearance in the cluster config
if (shard_num > 0)
{
const auto shard_count = not_optimized_cluster->getShardCount();
if (shard_num > shard_count)
throw Exception(
ErrorCodes::LOGICAL_ERROR,
"Shard number is greater than shard count: shard_num={} shard_count={} cluster={}",
shard_num,
shard_count,
not_optimized_cluster->getName());
chassert(shard_count == not_optimized_cluster->getShardsAddresses().size());
LOG_DEBUG(&Poco::Logger::get("executeQueryWithParallelReplicas"), "Parallel replicas query in shard scope: shard_num={} cluster={}",
shard_num, not_optimized_cluster->getName());
const auto shard_replicas_num = not_optimized_cluster->getShardsAddresses()[shard_num - 1].size();
all_replicas_count = std::min(static_cast<size_t>(settings.max_parallel_replicas), shard_replicas_num);
/// shard_num is 1-based, but getClusterWithSingleShard expects 0-based index
new_cluster = not_optimized_cluster->getClusterWithSingleShard(shard_num - 1);
}
else
{
new_cluster = not_optimized_cluster->getClusterWithReplicasAsShards(settings);
all_replicas_count = std::min(static_cast<size_t>(settings.max_parallel_replicas), new_cluster->getShardCount());
}
auto all_replicas_count = std::min(static_cast<size_t>(settings.max_parallel_replicas), new_cluster->getShardCount());
auto coordinator = std::make_shared<ParallelReplicasReadingCoordinator>(all_replicas_count);
auto remote_plan = std::make_unique<QueryPlan>();
/// This is a little bit weird, but we construct an "empty" coordinator without
/// any specified reading/coordination method (like Default, InOrder, InReverseOrder)
@ -288,8 +326,6 @@ void executeQueryWithParallelReplicas(
/// to then tell it about the reading method we chose.
query_info.coordinator = coordinator;
auto new_context = Context::createCopy(context);
auto scalars = new_context->hasQueryContext() ? new_context->getQueryContext()->getScalars() : Scalars{};
auto external_tables = new_context->getExternalTables();
auto read_from_remote = std::make_unique<ReadFromParallelRemoteReplicasStep>(

View File

@ -341,14 +341,10 @@ DatabaseAndTable DatabaseCatalog::getTableImpl(
{
TableNameHints hints(this->tryGetDatabase(table_id.getDatabaseName()), getContext());
std::vector<String> names = hints.getHints(table_id.getTableName());
if (!names.empty())
{
/// There is two options: first is to print just the name of the table
/// and the second is to print the result in format: db_name.table_name. I'll comment out the second option below
/// I also leave possibility to print several suggestions
if (names.empty())
exception->emplace(Exception(ErrorCodes::UNKNOWN_TABLE, "Table {} does not exist", table_id.getNameForLogs()));
else
exception->emplace(Exception(ErrorCodes::UNKNOWN_TABLE, "Table {} does not exist. Maybe you meant {}?", table_id.getNameForLogs(), backQuoteIfNeed(names[0])));
}
else exception->emplace(Exception(ErrorCodes::UNKNOWN_TABLE, "Table {} does not exist", table_id.getNameForLogs()));
}
return {};
}

View File

@ -204,7 +204,7 @@ void ASTSystemQuery::formatImpl(const FormatSettings & settings, FormatState &,
}
else if (type == Type::SUSPEND)
{
settings.ostr << (settings.hilite ? hilite_keyword : "") << " FOR "
settings.ostr << (settings.hilite ? hilite_keyword : "") << " FOR "
<< (settings.hilite ? hilite_none : "") << seconds
<< (settings.hilite ? hilite_keyword : "") << " SECOND"
<< (settings.hilite ? hilite_none : "");
@ -232,12 +232,50 @@ void ASTSystemQuery::formatImpl(const FormatSettings & settings, FormatState &,
}
else if (type == Type::START_LISTEN || type == Type::STOP_LISTEN)
{
settings.ostr << (settings.hilite ? hilite_keyword : "") << " " << ServerType::serverTypeToString(server_type.type)
<< (settings.hilite ? hilite_none : "");
settings.ostr << (settings.hilite ? hilite_keyword : "") << " "
<< ServerType::serverTypeToString(server_type.type) << (settings.hilite ? hilite_none : "");
if (server_type.type == ServerType::CUSTOM)
if (server_type.type == ServerType::Type::CUSTOM)
{
settings.ostr << (settings.hilite ? hilite_identifier : "") << " " << backQuoteIfNeed(server_type.custom_name);
settings.ostr << " " << quoteString(server_type.custom_name);
}
bool comma = false;
if (!server_type.exclude_types.empty())
{
settings.ostr << (settings.hilite ? hilite_keyword : "")
<< " EXCEPT" << (settings.hilite ? hilite_none : "");
for (auto cur_type : server_type.exclude_types)
{
if (cur_type == ServerType::Type::CUSTOM)
continue;
if (comma)
settings.ostr << ",";
else
comma = true;
settings.ostr << (settings.hilite ? hilite_keyword : "") << " "
<< ServerType::serverTypeToString(cur_type) << (settings.hilite ? hilite_none : "");
}
if (server_type.exclude_types.contains(ServerType::Type::CUSTOM))
{
for (const auto & cur_name : server_type.exclude_custom_names)
{
if (comma)
settings.ostr << ",";
else
comma = true;
settings.ostr << (settings.hilite ? hilite_keyword : "") << " "
<< ServerType::serverTypeToString(ServerType::Type::CUSTOM) << (settings.hilite ? hilite_none : "");
settings.ostr << " " << quoteString(cur_name);
}
}
}
}

View File

@ -458,32 +458,71 @@ bool ParserSystemQuery::parseImpl(IParser::Pos & pos, ASTPtr & node, Expected &
if (!parseQueryWithOnCluster(res, pos, expected))
return false;
ServerType::Type current_type = ServerType::Type::END;
std::string current_custom_name;
for (const auto & type : magic_enum::enum_values<ServerType::Type>())
auto parse_server_type = [&](ServerType::Type & type, std::string & custom_name) -> bool
{
if (ParserKeyword{ServerType::serverTypeToString(type)}.ignore(pos, expected))
type = ServerType::Type::END;
custom_name = "";
for (const auto & cur_type : magic_enum::enum_values<ServerType::Type>())
{
current_type = type;
break;
if (ParserKeyword{ServerType::serverTypeToString(cur_type)}.ignore(pos, expected))
{
type = cur_type;
break;
}
}
if (type == ServerType::Type::END)
return false;
if (type == ServerType::CUSTOM)
{
ASTPtr ast;
if (!ParserStringLiteral{}.parse(pos, ast, expected))
return false;
custom_name = ast->as<ASTLiteral &>().value.get<const String &>();
}
return true;
};
ServerType::Type base_type;
std::string base_custom_name;
ServerType::Types exclude_type;
ServerType::CustomNames exclude_custom_names;
if (!parse_server_type(base_type, base_custom_name))
return false;
if (ParserKeyword{"EXCEPT"}.ignore(pos, expected))
{
if (base_type != ServerType::Type::QUERIES_ALL &&
base_type != ServerType::Type::QUERIES_DEFAULT &&
base_type != ServerType::Type::QUERIES_CUSTOM)
return false;
ServerType::Type current_type;
std::string current_custom_name;
while (true)
{
if (!exclude_type.empty() && !ParserToken(TokenType::Comma).ignore(pos, expected))
break;
if (!parse_server_type(current_type, current_custom_name))
return false;
exclude_type.insert(current_type);
if (current_type == ServerType::Type::CUSTOM)
exclude_custom_names.insert(current_custom_name);
}
}
if (current_type == ServerType::Type::END)
return false;
if (current_type == ServerType::CUSTOM)
{
ASTPtr ast;
if (!ParserStringLiteral{}.parse(pos, ast, expected))
return false;
current_custom_name = ast->as<ASTLiteral &>().value.get<const String &>();
}
res->server_type = ServerType(current_type, current_custom_name);
res->server_type = ServerType(base_type, base_custom_name, exclude_type, exclude_custom_names);
break;
}

View File

@ -115,21 +115,24 @@ NamesAndTypesList IRowSchemaReader::readSchema()
"Cannot read rows to determine the schema, the maximum number of rows (or bytes) to read is set to 0. "
"Most likely setting input_format_max_rows_to_read_for_schema_inference or input_format_max_bytes_to_read_for_schema_inference is set to 0");
DataTypes data_types = readRowAndGetDataTypes();
auto data_types_maybe = readRowAndGetDataTypes();
/// Check that we read at list one column.
if (data_types.empty())
if (!data_types_maybe)
throw Exception(ErrorCodes::EMPTY_DATA_PASSED, "Cannot read rows from the data");
DataTypes data_types = std::move(*data_types_maybe);
/// If column names weren't set, use default names 'c1', 'c2', ...
if (column_names.empty())
bool use_default_column_names = column_names.empty();
if (use_default_column_names)
{
column_names.reserve(data_types.size());
for (size_t i = 0; i != data_types.size(); ++i)
column_names.push_back("c" + std::to_string(i + 1));
}
/// If column names were set, check that the number of names match the number of types.
else if (column_names.size() != data_types.size())
else if (column_names.size() != data_types.size() && !allowVariableNumberOfColumns())
{
throw Exception(
ErrorCodes::INCORRECT_DATA,
@ -137,6 +140,9 @@ NamesAndTypesList IRowSchemaReader::readSchema()
}
else
{
if (column_names.size() != data_types.size())
data_types.resize(column_names.size());
std::unordered_set<std::string_view> names_set;
for (const auto & name : column_names)
{
@ -155,13 +161,39 @@ NamesAndTypesList IRowSchemaReader::readSchema()
for (rows_read = 1; rows_read < max_rows_to_read && in.count() < max_bytes_to_read; ++rows_read)
{
DataTypes new_data_types = readRowAndGetDataTypes();
if (new_data_types.empty())
auto new_data_types_maybe = readRowAndGetDataTypes();
if (!new_data_types_maybe)
/// We reached eof.
break;
DataTypes new_data_types = std::move(*new_data_types_maybe);
if (new_data_types.size() != data_types.size())
throw Exception(ErrorCodes::INCORRECT_DATA, "Rows have different amount of values");
{
if (!allowVariableNumberOfColumns())
throw Exception(ErrorCodes::INCORRECT_DATA, "Rows have different amount of values");
if (use_default_column_names)
{
/// Current row contains new columns, add new default names.
if (new_data_types.size() > data_types.size())
{
for (size_t i = data_types.size(); i < new_data_types.size(); ++i)
column_names.push_back("c" + std::to_string(i + 1));
data_types.resize(new_data_types.size());
}
/// Current row contain less columns than previous rows.
else
{
new_data_types.resize(data_types.size());
}
}
/// If names were explicitly set, ignore all extra columns.
else
{
new_data_types.resize(column_names.size());
}
}
for (field_index = 0; field_index != data_types.size(); ++field_index)
{

View File

@ -93,11 +93,13 @@ protected:
/// Read one row and determine types of columns in it.
/// Return types in the same order in which the values were in the row.
/// If it's impossible to determine the type for some column, return nullptr for it.
/// Return empty list if can't read more data.
virtual DataTypes readRowAndGetDataTypes() = 0;
/// Return std::nullopt if can't read more data.
virtual std::optional<DataTypes> readRowAndGetDataTypes() = 0;
void setColumnNames(const std::vector<String> & names) { column_names = names; }
virtual bool allowVariableNumberOfColumns() const { return false; }
size_t field_index;
private:

View File

@ -284,7 +284,7 @@ bool CSVFormatReader::parseRowEndWithDiagnosticInfo(WriteBuffer & out)
return true;
}
bool CSVFormatReader::allowVariableNumberOfColumns()
bool CSVFormatReader::allowVariableNumberOfColumns() const
{
return format_settings.csv.allow_variable_number_of_columns;
}
@ -410,19 +410,22 @@ CSVSchemaReader::CSVSchemaReader(ReadBuffer & in_, bool with_names_, bool with_t
{
}
std::pair<std::vector<String>, DataTypes> CSVSchemaReader::readRowAndGetFieldsAndDataTypes()
std::optional<std::pair<std::vector<String>, DataTypes>> CSVSchemaReader::readRowAndGetFieldsAndDataTypes()
{
if (buf.eof())
return {};
auto fields = reader.readRow();
auto data_types = tryInferDataTypesByEscapingRule(fields, format_settings, FormatSettings::EscapingRule::CSV);
return {fields, data_types};
return std::make_pair(std::move(fields), std::move(data_types));
}
DataTypes CSVSchemaReader::readRowAndGetDataTypesImpl()
std::optional<DataTypes> CSVSchemaReader::readRowAndGetDataTypesImpl()
{
return std::move(readRowAndGetFieldsAndDataTypes().second);
auto fields_with_types = readRowAndGetFieldsAndDataTypes();
if (!fields_with_types)
return {};
return std::move(fields_with_types->second);
}

View File

@ -70,7 +70,7 @@ public:
void skipPrefixBeforeHeader() override;
bool checkForEndOfRow() override;
bool allowVariableNumberOfColumns() override;
bool allowVariableNumberOfColumns() const override;
std::vector<String> readNames() override { return readHeaderRow(); }
std::vector<String> readTypes() override { return readHeaderRow(); }
@ -102,8 +102,10 @@ public:
CSVSchemaReader(ReadBuffer & in_, bool with_names_, bool with_types_, const FormatSettings & format_settings_);
private:
DataTypes readRowAndGetDataTypesImpl() override;
std::pair<std::vector<String>, DataTypes> readRowAndGetFieldsAndDataTypes() override;
bool allowVariableNumberOfColumns() const override { return format_settings.csv.allow_variable_number_of_columns; }
std::optional<DataTypes> readRowAndGetDataTypesImpl() override;
std::optional<std::pair<std::vector<String>, DataTypes>> readRowAndGetFieldsAndDataTypes() override;
PeekableReadBuffer buf;
CSVFormatReader reader;

View File

@ -139,10 +139,13 @@ void CustomSeparatedFormatReader::skipRowBetweenDelimiter()
void CustomSeparatedFormatReader::skipField()
{
skipSpaces();
skipFieldByEscapingRule(*buf, format_settings.custom.escaping_rule, format_settings);
if (format_settings.custom.escaping_rule == FormatSettings::EscapingRule::CSV)
readCSVFieldWithTwoPossibleDelimiters(*buf, format_settings.csv, format_settings.custom.field_delimiter, format_settings.custom.row_after_delimiter);
else
skipFieldByEscapingRule(*buf, format_settings.custom.escaping_rule, format_settings);
}
bool CustomSeparatedFormatReader::checkEndOfRow()
bool CustomSeparatedFormatReader::checkForEndOfRow()
{
PeekableReadBufferCheckpoint checkpoint{*buf, true};
@ -200,12 +203,12 @@ std::vector<String> CustomSeparatedFormatReader::readRowImpl()
std::vector<String> values;
skipRowStartDelimiter();
if (columns == 0)
if (columns == 0 || allowVariableNumberOfColumns())
{
do
{
values.push_back(readFieldIntoString<mode>(values.empty(), false, true));
} while (!checkEndOfRow());
} while (!checkForEndOfRow());
columns = values.size();
}
else
@ -230,7 +233,7 @@ void CustomSeparatedFormatReader::skipHeaderRow()
skipField();
}
while (!checkEndOfRow());
while (!checkForEndOfRow());
skipRowEndDelimiter();
}
@ -369,7 +372,7 @@ CustomSeparatedSchemaReader::CustomSeparatedSchemaReader(
{
}
std::pair<std::vector<String>, DataTypes> CustomSeparatedSchemaReader::readRowAndGetFieldsAndDataTypes()
std::optional<std::pair<std::vector<String>, DataTypes>> CustomSeparatedSchemaReader::readRowAndGetFieldsAndDataTypes()
{
if (no_more_data || reader.checkForSuffix())
{
@ -385,12 +388,15 @@ std::pair<std::vector<String>, DataTypes> CustomSeparatedSchemaReader::readRowAn
auto fields = reader.readRow();
auto data_types = tryInferDataTypesByEscapingRule(fields, reader.getFormatSettings(), reader.getEscapingRule(), &json_inference_info);
return {fields, data_types};
return std::make_pair(std::move(fields), std::move(data_types));
}
DataTypes CustomSeparatedSchemaReader::readRowAndGetDataTypesImpl()
std::optional<DataTypes> CustomSeparatedSchemaReader::readRowAndGetDataTypesImpl()
{
return readRowAndGetFieldsAndDataTypes().second;
auto fields_with_types = readRowAndGetFieldsAndDataTypes();
if (!fields_with_types)
return {};
return std::move(fields_with_types->second);
}
void CustomSeparatedSchemaReader::transformTypesIfNeeded(DataTypePtr & type, DataTypePtr & new_type)

View File

@ -74,7 +74,9 @@ public:
std::vector<String> readRowForHeaderDetection() override { return readRowImpl<ReadFieldMode::AS_POSSIBLE_STRING>(); }
bool checkEndOfRow();
bool checkForEndOfRow() override;
bool allowVariableNumberOfColumns() const override { return format_settings.custom.allow_variable_number_of_columns; }
bool checkForSuffixImpl(bool check_eof);
inline void skipSpaces() { if (ignore_spaces) skipWhitespaceIfAny(*buf, true); }
@ -109,9 +111,11 @@ public:
CustomSeparatedSchemaReader(ReadBuffer & in_, bool with_names_, bool with_types_, bool ignore_spaces_, const FormatSettings & format_setting_);
private:
DataTypes readRowAndGetDataTypesImpl() override;
bool allowVariableNumberOfColumns() const override { return format_settings.custom.allow_variable_number_of_columns; }
std::pair<std::vector<String>, DataTypes> readRowAndGetFieldsAndDataTypes() override;
std::optional<DataTypes> readRowAndGetDataTypesImpl() override;
std::optional<std::pair<std::vector<String>, DataTypes>> readRowAndGetFieldsAndDataTypes() override;
void transformTypesIfNeeded(DataTypePtr & type, DataTypePtr & new_type) override;

View File

@ -112,6 +112,12 @@ bool JSONCompactEachRowFormatReader::readField(IColumn & column, const DataTypeP
return JSONUtils::readField(*in, column, type, serialization, column_name, format_settings, yield_strings);
}
bool JSONCompactEachRowFormatReader::checkForEndOfRow()
{
skipWhitespaceIfAny(*in);
return !in->eof() && *in->position() == ']';
}
bool JSONCompactEachRowFormatReader::parseRowStartWithDiagnosticInfo(WriteBuffer & out)
{
skipWhitespaceIfAny(*in);
@ -187,7 +193,7 @@ JSONCompactEachRowRowSchemaReader::JSONCompactEachRowRowSchemaReader(
{
}
DataTypes JSONCompactEachRowRowSchemaReader::readRowAndGetDataTypesImpl()
std::optional<DataTypes> JSONCompactEachRowRowSchemaReader::readRowAndGetDataTypesImpl()
{
if (first_row)
first_row = false;

View File

@ -68,6 +68,9 @@ public:
std::vector<String> readNames() override { return readHeaderRow(); }
std::vector<String> readTypes() override { return readHeaderRow(); }
bool checkForEndOfRow() override;
bool allowVariableNumberOfColumns() const override { return format_settings.json.compact_allow_variable_number_of_columns; }
bool yieldStrings() const { return yield_strings; }
private:
bool yield_strings;
@ -79,7 +82,9 @@ public:
JSONCompactEachRowRowSchemaReader(ReadBuffer & in_, bool with_names_, bool with_types_, bool yield_strings_, const FormatSettings & format_settings_);
private:
DataTypes readRowAndGetDataTypesImpl() override;
bool allowVariableNumberOfColumns() const override { return format_settings.json.compact_allow_variable_number_of_columns; }
std::optional<DataTypes> readRowAndGetDataTypesImpl() override;
void transformTypesIfNeeded(DataTypePtr & type, DataTypePtr & new_type) override;
void transformFinalTypeIfNeeded(DataTypePtr & type) override;

View File

@ -634,7 +634,7 @@ DataTypePtr MsgPackSchemaReader::getDataType(const msgpack::object & object)
UNREACHABLE();
}
DataTypes MsgPackSchemaReader::readRowAndGetDataTypes()
std::optional<DataTypes> MsgPackSchemaReader::readRowAndGetDataTypes()
{
if (buf.eof())
return {};

View File

@ -91,7 +91,7 @@ public:
private:
msgpack::object_handle readObject();
DataTypePtr getDataType(const msgpack::object & object);
DataTypes readRowAndGetDataTypes() override;
std::optional<DataTypes> readRowAndGetDataTypes() override;
PeekableReadBuffer buf;
UInt64 number_of_columns;

View File

@ -422,7 +422,7 @@ NamesAndTypesList MySQLDumpSchemaReader::readSchema()
return IRowSchemaReader::readSchema();
}
DataTypes MySQLDumpSchemaReader::readRowAndGetDataTypes()
std::optional<DataTypes> MySQLDumpSchemaReader::readRowAndGetDataTypes()
{
if (in.eof())
return {};

View File

@ -33,7 +33,7 @@ public:
private:
NamesAndTypesList readSchema() override;
DataTypes readRowAndGetDataTypes() override;
std::optional<DataTypes> readRowAndGetDataTypes() override;
String table_name;
};

View File

@ -143,7 +143,7 @@ RegexpSchemaReader::RegexpSchemaReader(ReadBuffer & in_, const FormatSettings &
{
}
DataTypes RegexpSchemaReader::readRowAndGetDataTypes()
std::optional<DataTypes> RegexpSchemaReader::readRowAndGetDataTypes()
{
if (buf.eof())
return {};

View File

@ -79,7 +79,7 @@ public:
RegexpSchemaReader(ReadBuffer & in_, const FormatSettings & format_settings);
private:
DataTypes readRowAndGetDataTypes() override;
std::optional<DataTypes> readRowAndGetDataTypes() override;
void transformTypesIfNeeded(DataTypePtr & type, DataTypePtr & new_type) override;

View File

@ -300,6 +300,11 @@ bool TabSeparatedFormatReader::checkForSuffix()
return false;
}
bool TabSeparatedFormatReader::checkForEndOfRow()
{
return buf->eof() || *buf->position() == '\n';
}
TabSeparatedSchemaReader::TabSeparatedSchemaReader(
ReadBuffer & in_, bool with_names_, bool with_types_, bool is_raw_, const FormatSettings & format_settings_)
: FormatWithNamesAndTypesSchemaReader(
@ -315,19 +320,22 @@ TabSeparatedSchemaReader::TabSeparatedSchemaReader(
{
}
std::pair<std::vector<String>, DataTypes> TabSeparatedSchemaReader::readRowAndGetFieldsAndDataTypes()
std::optional<std::pair<std::vector<String>, DataTypes>> TabSeparatedSchemaReader::readRowAndGetFieldsAndDataTypes()
{
if (buf.eof())
return {};
auto fields = reader.readRow();
auto data_types = tryInferDataTypesByEscapingRule(fields, reader.getFormatSettings(), reader.getEscapingRule());
return {fields, data_types};
return std::make_pair(fields, data_types);
}
DataTypes TabSeparatedSchemaReader::readRowAndGetDataTypesImpl()
std::optional<DataTypes> TabSeparatedSchemaReader::readRowAndGetDataTypesImpl()
{
return readRowAndGetFieldsAndDataTypes().second;
auto fields_with_types = readRowAndGetFieldsAndDataTypes();
if (!fields_with_types)
return {};
return std::move(fields_with_types->second);
}
void registerInputFormatTabSeparated(FormatFactory & factory)

View File

@ -76,6 +76,9 @@ public:
void setReadBuffer(ReadBuffer & in_) override;
bool checkForSuffix() override;
bool checkForEndOfRow() override;
bool allowVariableNumberOfColumns() const override { return format_settings.tsv.allow_variable_number_of_columns; }
private:
template <bool is_header>
@ -92,8 +95,10 @@ public:
TabSeparatedSchemaReader(ReadBuffer & in_, bool with_names_, bool with_types_, bool is_raw_, const FormatSettings & format_settings);
private:
DataTypes readRowAndGetDataTypesImpl() override;
std::pair<std::vector<String>, DataTypes> readRowAndGetFieldsAndDataTypes() override;
bool allowVariableNumberOfColumns() const override { return format_settings.tsv.allow_variable_number_of_columns; }
std::optional<DataTypes> readRowAndGetDataTypesImpl() override;
std::optional<std::pair<std::vector<String>, DataTypes>> readRowAndGetFieldsAndDataTypes() override;
PeekableReadBuffer buf;
TabSeparatedFormatReader reader;

View File

@ -490,7 +490,7 @@ TemplateSchemaReader::TemplateSchemaReader(
setColumnNames(row_format.column_names);
}
DataTypes TemplateSchemaReader::readRowAndGetDataTypes()
std::optional<DataTypes> TemplateSchemaReader::readRowAndGetDataTypes()
{
if (first_row)
format_reader.readPrefix();

View File

@ -119,7 +119,7 @@ public:
std::string row_between_delimiter,
const FormatSettings & format_settings_);
DataTypes readRowAndGetDataTypes() override;
std::optional<DataTypes> readRowAndGetDataTypes() override;
private:
void transformTypesIfNeeded(DataTypePtr & type, DataTypePtr & new_type) override;

View File

@ -638,7 +638,7 @@ ValuesSchemaReader::ValuesSchemaReader(ReadBuffer & in_, const FormatSettings &
{
}
DataTypes ValuesSchemaReader::readRowAndGetDataTypes()
std::optional<DataTypes> ValuesSchemaReader::readRowAndGetDataTypes()
{
if (first_row)
{

View File

@ -105,7 +105,7 @@ public:
ValuesSchemaReader(ReadBuffer & in_, const FormatSettings & format_settings);
private:
DataTypes readRowAndGetDataTypes() override;
std::optional<DataTypes> readRowAndGetDataTypes() override;
PeekableReadBuffer buf;
ParserExpression parser;

View File

@ -212,8 +212,24 @@ bool RowInputFormatWithNamesAndTypes::readRow(MutableColumns & columns, RowReadE
format_reader->skipRowStartDelimiter();
ext.read_columns.resize(data_types.size());
for (size_t file_column = 0; file_column < column_mapping->column_indexes_for_input_fields.size(); ++file_column)
size_t file_column = 0;
for (; file_column < column_mapping->column_indexes_for_input_fields.size(); ++file_column)
{
if (format_reader->allowVariableNumberOfColumns() && format_reader->checkForEndOfRow())
{
while (file_column < column_mapping->column_indexes_for_input_fields.size())
{
const auto & rem_column_index = column_mapping->column_indexes_for_input_fields[file_column];
if (rem_column_index)
columns[*rem_column_index]->insertDefault();
++file_column;
}
break;
}
if (file_column != 0)
format_reader->skipFieldDelimiter();
const auto & column_index = column_mapping->column_indexes_for_input_fields[file_column];
const bool is_last_file_column = file_column + 1 == column_mapping->column_indexes_for_input_fields.size();
if (column_index)
@ -225,22 +241,6 @@ bool RowInputFormatWithNamesAndTypes::readRow(MutableColumns & columns, RowReadE
column_mapping->names_of_columns[file_column]);
else
format_reader->skipField(file_column);
if (!is_last_file_column)
{
if (format_reader->allowVariableNumberOfColumns() && format_reader->checkForEndOfRow())
{
++file_column;
while (file_column < column_mapping->column_indexes_for_input_fields.size())
{
const auto & rem_column_index = column_mapping->column_indexes_for_input_fields[file_column];
columns[*rem_column_index]->insertDefault();
++file_column;
}
}
else
format_reader->skipFieldDelimiter();
}
}
if (format_reader->allowVariableNumberOfColumns() && !format_reader->checkForEndOfRow())
@ -248,7 +248,7 @@ bool RowInputFormatWithNamesAndTypes::readRow(MutableColumns & columns, RowReadE
do
{
format_reader->skipFieldDelimiter();
format_reader->skipField(1);
format_reader->skipField(file_column++);
}
while (!format_reader->checkForEndOfRow());
}
@ -419,12 +419,14 @@ namespace
void FormatWithNamesAndTypesSchemaReader::tryDetectHeader(std::vector<String> & column_names, std::vector<String> & type_names)
{
auto [first_row_values, first_row_types] = readRowAndGetFieldsAndDataTypes();
auto first_row = readRowAndGetFieldsAndDataTypes();
/// No data.
if (first_row_values.empty())
if (!first_row)
return;
const auto & [first_row_values, first_row_types] = *first_row;
/// The first row contains non String elements, it cannot be a header.
if (!checkIfAllTypesAreString(first_row_types))
{
@ -432,15 +434,17 @@ void FormatWithNamesAndTypesSchemaReader::tryDetectHeader(std::vector<String> &
return;
}
auto [second_row_values, second_row_types] = readRowAndGetFieldsAndDataTypes();
auto second_row = readRowAndGetFieldsAndDataTypes();
/// Data contains only 1 row, don't treat it as a header.
if (second_row_values.empty())
if (!second_row)
{
buffered_types = first_row_types;
return;
}
const auto & [second_row_values, second_row_types] = *second_row;
DataTypes data_types;
bool second_row_can_be_type_names = checkIfAllTypesAreString(second_row_types) && checkIfAllValuesAreTypeNames(readNamesFromFields(second_row_values));
size_t row = 2;
@ -450,15 +454,16 @@ void FormatWithNamesAndTypesSchemaReader::tryDetectHeader(std::vector<String> &
}
else
{
data_types = readRowAndGetDataTypes();
auto data_types_maybe = readRowAndGetDataTypes();
/// Data contains only 2 rows.
if (data_types.empty())
if (!data_types_maybe)
{
second_row_can_be_type_names = false;
data_types = second_row_types;
}
else
{
data_types = *data_types_maybe;
++row;
}
}
@ -490,10 +495,10 @@ void FormatWithNamesAndTypesSchemaReader::tryDetectHeader(std::vector<String> &
return;
}
auto next_row_types = readRowAndGetDataTypes();
auto next_row_types_maybe = readRowAndGetDataTypes();
/// Check if there are no more rows in data. It means that all rows contains only String values and Nulls,
/// so, the first two rows with all String elements can be real data and we cannot use them as a header.
if (next_row_types.empty())
if (!next_row_types_maybe)
{
/// Buffer first data types from the first row, because it doesn't contain Nulls.
buffered_types = first_row_types;
@ -502,11 +507,11 @@ void FormatWithNamesAndTypesSchemaReader::tryDetectHeader(std::vector<String> &
++row;
/// Combine types from current row and from previous rows.
chooseResultColumnTypes(*this, data_types, next_row_types, getDefaultDataTypeForEscapingRule(FormatSettings::EscapingRule::CSV), default_colum_names, row);
chooseResultColumnTypes(*this, data_types, *next_row_types_maybe, getDefaultDataTypeForEscapingRule(FormatSettings::EscapingRule::CSV), default_colum_names, row);
}
}
DataTypes FormatWithNamesAndTypesSchemaReader::readRowAndGetDataTypes()
std::optional<DataTypes> FormatWithNamesAndTypesSchemaReader::readRowAndGetDataTypes()
{
/// Check if we tried to detect a header and have buffered types from read rows.
if (!buffered_types.empty())

View File

@ -119,9 +119,10 @@ public:
/// Check suffix.
virtual bool checkForSuffix() { return in->eof(); }
/// Check if we are at the end of row, not between fields.
virtual bool checkForEndOfRow() { throw Exception(ErrorCodes::NOT_IMPLEMENTED, "Method checkForEndOfRow is not implemented"); }
virtual bool allowVariableNumberOfColumns() { return false; }
virtual bool allowVariableNumberOfColumns() const { return false; }
const FormatSettings & getFormatSettings() const { return format_settings; }
@ -160,15 +161,15 @@ public:
NamesAndTypesList readSchema() override;
protected:
virtual DataTypes readRowAndGetDataTypes() override;
virtual std::optional<DataTypes> readRowAndGetDataTypes() override;
virtual DataTypes readRowAndGetDataTypesImpl()
virtual std::optional<DataTypes> readRowAndGetDataTypesImpl()
{
throw Exception{ErrorCodes::NOT_IMPLEMENTED, "Method readRowAndGetDataTypesImpl is not implemented"};
}
/// Return column fields with inferred types. In case of no more rows, return empty vectors.
virtual std::pair<std::vector<String>, DataTypes> readRowAndGetFieldsAndDataTypes()
/// Return column fields with inferred types. In case of no more rows, return nullopt.
virtual std::optional<std::pair<std::vector<String>, DataTypes>> readRowAndGetFieldsAndDataTypes()
{
throw Exception{ErrorCodes::NOT_IMPLEMENTED, "Method readRowAndGetFieldsAndDataTypes is not implemented"};
}

View File

@ -103,7 +103,8 @@ ReadFromRemote::ReadFromRemote(
Tables external_tables_,
Poco::Logger * log_,
UInt32 shard_count_,
std::shared_ptr<const StorageLimitsList> storage_limits_)
std::shared_ptr<const StorageLimitsList> storage_limits_,
const String & cluster_name_)
: ISourceStep(DataStream{.header = std::move(header_)})
, shards(std::move(shards_))
, stage(stage_)
@ -116,6 +117,7 @@ ReadFromRemote::ReadFromRemote(
, storage_limits(std::move(storage_limits_))
, log(log_)
, shard_count(shard_count_)
, cluster_name(cluster_name_)
{
}
@ -234,13 +236,37 @@ void ReadFromRemote::addPipe(Pipes & pipes, const ClusterProxy::SelectStreamFact
scalars["_shard_num"]
= Block{{DataTypeUInt32().createColumnConst(1, shard.shard_info.shard_num), std::make_shared<DataTypeUInt32>(), "_shard_num"}};
if (context->getParallelReplicasMode() == Context::ParallelReplicasMode::READ_TASKS)
{
if (context->getSettingsRef().cluster_for_parallel_replicas.changed)
{
const String cluster_for_parallel_replicas = context->getSettingsRef().cluster_for_parallel_replicas;
if (cluster_for_parallel_replicas != cluster_name)
LOG_INFO(log, "cluster_for_parallel_replicas has been set for the query but has no effect: {}. Distributed table cluster is used: {}",
cluster_for_parallel_replicas, cluster_name);
}
context->setSetting("cluster_for_parallel_replicas", cluster_name);
}
std::shared_ptr<RemoteQueryExecutor> remote_query_executor;
remote_query_executor = std::make_shared<RemoteQueryExecutor>(
shard.shard_info.pool, query_string, output_stream->header, context, throttler, scalars, external_tables, stage);
remote_query_executor->setLogger(log);
remote_query_executor->setPoolMode(PoolMode::GET_MANY);
if (context->getParallelReplicasMode() == Context::ParallelReplicasMode::READ_TASKS)
{
// when doing parallel reading from replicas (ParallelReplicasMode::READ_TASKS) on a shard:
// establish a connection to a replica on the shard, the replica will instantiate coordinator to manage parallel reading from replicas on the shard.
// The coordinator will return query result from the shard.
// Only one coordinator per shard is necessary. Therefore using PoolMode::GET_ONE to establish only one connection per shard.
// Using PoolMode::GET_MANY for this mode will(can) lead to instantiation of several coordinators (depends on max_parallel_replicas setting)
// each will execute parallel reading from replicas, so the query result will be multiplied by the number of created coordinators
remote_query_executor->setPoolMode(PoolMode::GET_ONE);
}
else
remote_query_executor->setPoolMode(PoolMode::GET_MANY);
if (!table_func_ptr)
remote_query_executor->setMainTable(shard.main_table ? shard.main_table : main_table);

View File

@ -35,7 +35,8 @@ public:
Tables external_tables_,
Poco::Logger * log_,
UInt32 shard_count_,
std::shared_ptr<const StorageLimitsList> storage_limits_);
std::shared_ptr<const StorageLimitsList> storage_limits_,
const String & cluster_name_);
String getName() const override { return "ReadFromRemote"; }
@ -55,8 +56,9 @@ private:
Tables external_tables;
std::shared_ptr<const StorageLimitsList> storage_limits;
Poco::Logger * log;
UInt32 shard_count;
String cluster_name;
void addLazyPipe(Pipes & pipes, const ClusterProxy::SelectStreamFactory::Shard & shard);
void addPipe(Pipes & pipes, const ClusterProxy::SelectStreamFactory::Shard & shard);
};

View File

@ -42,12 +42,9 @@ const char * ServerType::serverTypeToString(ServerType::Type type)
bool ServerType::shouldStart(Type server_type, const std::string & server_custom_name) const
{
if (type == Type::QUERIES_ALL)
return true;
if (type == Type::QUERIES_DEFAULT)
auto is_type_default = [](Type current_type)
{
switch (server_type)
switch (current_type)
{
case Type::TCP:
case Type::TCP_WITH_PROXY:
@ -64,21 +61,37 @@ bool ServerType::shouldStart(Type server_type, const std::string & server_custom
default:
return false;
}
};
if (exclude_types.contains(Type::QUERIES_ALL))
return false;
if (exclude_types.contains(Type::QUERIES_DEFAULT) && is_type_default(server_type))
return false;
if (exclude_types.contains(Type::QUERIES_CUSTOM) && server_type == Type::CUSTOM)
return false;
if (exclude_types.contains(server_type))
{
if (server_type != Type::CUSTOM)
return false;
if (exclude_custom_names.contains(server_custom_name))
return false;
}
if (type == Type::QUERIES_ALL)
return true;
if (type == Type::QUERIES_DEFAULT)
return is_type_default(server_type);
if (type == Type::QUERIES_CUSTOM)
{
switch (server_type)
{
case Type::CUSTOM:
return true;
default:
return false;
}
}
return server_type == Type::CUSTOM;
if (type == Type::CUSTOM)
return server_type == type && server_custom_name == "protocols." + custom_name + ".port";
return server_type == type && server_custom_name == custom_name;
return server_type == type;
}
@ -86,6 +99,7 @@ bool ServerType::shouldStart(Type server_type, const std::string & server_custom
bool ServerType::shouldStop(const std::string & port_name) const
{
Type port_type;
std::string port_custom_name;
if (port_name == "http_port")
port_type = Type::HTTP;
@ -121,12 +135,19 @@ bool ServerType::shouldStop(const std::string & port_name) const
port_type = Type::INTERSERVER_HTTPS;
else if (port_name.starts_with("protocols.") && port_name.ends_with(".port"))
{
port_type = Type::CUSTOM;
constexpr size_t protocols_size = std::string_view("protocols.").size();
constexpr size_t ports_size = std::string_view(".ports").size();
port_custom_name = port_name.substr(protocols_size, port_name.size() - protocols_size - ports_size + 1);
}
else
return false;
return shouldStart(port_type, port_name);
return shouldStart(port_type, port_custom_name);
}
}

View File

@ -1,6 +1,7 @@
#pragma once
#include <base/types.h>
#include <unordered_set>
namespace DB
{
@ -28,8 +29,20 @@ public:
END
};
using Types = std::unordered_set<Type>;
using CustomNames = std::unordered_set<String>;
ServerType() = default;
explicit ServerType(Type type_, const std::string & custom_name_ = "") : type(type_), custom_name(custom_name_) {}
explicit ServerType(
Type type_,
const std::string & custom_name_ = "",
const Types & exclude_types_ = {},
const CustomNames exclude_custom_names_ = {})
: type(type_),
custom_name(custom_name_),
exclude_types(exclude_types_),
exclude_custom_names(exclude_custom_names_) {}
static const char * serverTypeToString(Type type);
@ -39,6 +52,9 @@ public:
Type type;
std::string custom_name;
Types exclude_types;
CustomNames exclude_custom_names;
};
}

View File

@ -399,11 +399,11 @@ std::unique_ptr<ReadBuffer> createReadBuffer(
return reader->readFile([my_matcher = std::move(matcher)](const std::string & path)
{
return re2::RE2::FullMatch(path, *my_matcher);
});
}, /*throw_on_not_found=*/true);
}
else
{
return reader->readFile(current_path);
return reader->readFile(current_path, /*throw_on_not_found=*/true);
}
}
@ -721,28 +721,20 @@ public:
{
public:
explicit FilesIterator(
const Strings & files_, std::vector<std::string> archives_, std::vector<std::pair<uint64_t, std::string>> files_in_archive_)
: files(files_), archives(std::move(archives_)), files_in_archive(std::move(files_in_archive_))
const Strings & files_, std::vector<std::string> archives_, const IArchiveReader::NameFilter & name_filter_)
: files(files_), archives(std::move(archives_)), name_filter(name_filter_)
{
}
String next()
{
const auto & fs = fromArchive() ? archives : files;
auto current_index = index.fetch_add(1, std::memory_order_relaxed);
if (current_index >= files.size())
if (current_index >= fs.size())
return "";
return files[current_index];
}
std::pair<String, String> nextFileFromArchive()
{
auto current_index = index.fetch_add(1, std::memory_order_relaxed);
if (current_index >= files_in_archive.size())
return {"", ""};
const auto & [archive_index, filename] = files_in_archive[current_index];
return {archives[archive_index], filename};
return fs[current_index];
}
bool fromArchive() const
@ -750,10 +742,31 @@ public:
return !archives.empty();
}
bool readSingleFileFromArchive() const
{
return !name_filter;
}
bool passesFilter(const std::string & name) const
{
std::lock_guard lock(filter_mutex);
return name_filter(name);
}
const String & getFileName()
{
if (files.size() != 1)
throw Exception(ErrorCodes::LOGICAL_ERROR, "Expected only 1 filename but got {}", files.size());
return files[0];
}
private:
std::vector<std::string> files;
std::vector<std::string> archives;
std::vector<std::pair<uint64_t, std::string>> files_in_archive;
mutable std::mutex filter_mutex;
IArchiveReader::NameFilter name_filter;
std::atomic<size_t> index = 0;
};
@ -863,25 +876,62 @@ public:
{
if (files_iterator->fromArchive())
{
auto [archive, filename] = files_iterator->nextFileFromArchive();
if (archive.empty())
return {};
current_path = std::move(filename);
if (!archive_reader || archive_reader->getPath() != archive)
if (files_iterator->readSingleFileFromArchive())
{
auto archive = files_iterator->next();
if (archive.empty())
return {};
struct stat file_stat = getFileStat(archive, storage->use_table_fd, storage->table_fd, storage->getName());
if (context->getSettingsRef().engine_file_skip_empty_files && file_stat.st_size == 0)
continue;
archive_reader = createArchiveReader(archive);
file_enumerator = archive_reader->firstFile();
current_path = files_iterator->getFileName();
read_buf = archive_reader->readFile(current_path, /*throw_on_not_found=*/false);
if (!read_buf)
continue;
}
if (file_enumerator == nullptr)
throw Exception(ErrorCodes::LOGICAL_ERROR, "Failed to find a file in archive {}", archive);
while (file_enumerator->getFileName() != current_path)
else
{
if (!file_enumerator->nextFile())
throw Exception(ErrorCodes::LOGICAL_ERROR, "Expected file {} is missing from archive {}", current_path, archive);
while (true)
{
if (file_enumerator == nullptr)
{
auto archive = files_iterator->next();
if (archive.empty())
return {};
struct stat file_stat = getFileStat(archive, storage->use_table_fd, storage->table_fd, storage->getName());
if (context->getSettingsRef().engine_file_skip_empty_files && file_stat.st_size == 0)
continue;
archive_reader = createArchiveReader(archive);
file_enumerator = archive_reader->firstFile();
continue;
}
bool file_found = true;
while (!files_iterator->passesFilter(file_enumerator->getFileName()))
{
if (!file_enumerator->nextFile())
{
file_found = false;
break;
}
}
if (file_found)
{
current_path = file_enumerator->getFileName();
break;
}
file_enumerator = nullptr;
}
chassert(file_enumerator);
read_buf = archive_reader->readFile(std::move(file_enumerator));
}
}
else
@ -903,23 +953,12 @@ public:
if (!read_buf)
{
struct stat file_stat;
if (archive_reader == nullptr)
{
file_stat = getFileStat(current_path, storage->use_table_fd, storage->table_fd, storage->getName());
file_stat = getFileStat(current_path, storage->use_table_fd, storage->table_fd, storage->getName());
if (context->getSettingsRef().engine_file_skip_empty_files && file_stat.st_size == 0)
continue;
}
if (context->getSettingsRef().engine_file_skip_empty_files && file_stat.st_size == 0)
continue;
if (archive_reader == nullptr)
{
read_buf = createReadBuffer(current_path, file_stat, storage->use_table_fd, storage->table_fd, storage->compression_method, context);
}
else
{
chassert(file_enumerator);
read_buf = archive_reader->readFile(std::move(file_enumerator));
}
read_buf = createReadBuffer(current_path, file_stat, storage->use_table_fd, storage->table_fd, storage->compression_method, context);
}
const Settings & settings = context->getSettingsRef();
@ -987,10 +1026,10 @@ public:
pipeline.reset();
input_format.reset();
if (archive_reader != nullptr)
if (files_iterator->fromArchive() && !files_iterator->readSingleFileFromArchive())
file_enumerator = archive_reader->nextFile(std::move(read_buf));
else
read_buf.reset();
read_buf.reset();
}
return {};
@ -1050,9 +1089,7 @@ Pipe StorageFile::read(
}
}
std::vector<std::pair<uint64_t, std::string>> files_in_archive;
size_t files_in_archive_num = 0;
IArchiveReader::NameFilter filter;
if (!paths_to_archive.empty())
{
if (paths.size() != 1)
@ -1060,7 +1097,6 @@ Pipe StorageFile::read(
const auto & path = paths[0];
IArchiveReader::NameFilter filter;
if (path.find_first_of("*?{") != std::string::npos)
{
auto matcher = std::make_shared<re2::RE2>(makeRegexpPatternFromGlobs(path));
@ -1073,32 +1109,14 @@ Pipe StorageFile::read(
return re2::RE2::FullMatch(p, *matcher);
};
}
for (size_t i = 0; i < paths_to_archive.size(); ++i)
{
if (filter)
{
const auto & path_to_archive = paths_to_archive[i];
auto archive_reader = createArchiveReader(path_to_archive);
auto files = archive_reader->getAllFiles(filter);
for (auto & file : files)
files_in_archive.push_back({i, std::move(file)});
}
else
{
files_in_archive.push_back({i, path});
}
}
files_in_archive_num = files_in_archive.size();
}
auto files_iterator = std::make_shared<StorageFileSource::FilesIterator>(paths, paths_to_archive, std::move(files_in_archive));
auto files_iterator = std::make_shared<StorageFileSource::FilesIterator>(paths, paths_to_archive, std::move(filter));
auto this_ptr = std::static_pointer_cast<StorageFile>(shared_from_this());
size_t num_streams = max_num_streams;
auto files_to_read = std::max(files_in_archive_num, paths.size());
auto files_to_read = std::max(paths_to_archive.size(), paths.size());
if (max_num_streams > files_to_read)
num_streams = files_to_read;

View File

@ -220,7 +220,8 @@ void StorageMergeTree::read(
local_context, query_info.query,
table_id.database_name, table_id.table_name, /*remote_table_function_ptr*/nullptr);
auto cluster = local_context->getCluster(local_context->getSettingsRef().cluster_for_parallel_replicas);
String cluster_for_parallel_replicas = local_context->getSettingsRef().cluster_for_parallel_replicas;
auto cluster = local_context->getCluster(cluster_for_parallel_replicas);
Block header;
@ -919,7 +920,7 @@ MergeMutateSelectedEntryPtr StorageMergeTree::selectPartsToMerge(
if (getCurrentMutationVersion(left, lock) != getCurrentMutationVersion(right, lock))
{
disable_reason = "Some parts have differ mmutatuon version";
disable_reason = "Some parts have different mutation version";
return false;
}

View File

@ -5157,7 +5157,9 @@ void StorageReplicatedMergeTree::readParallelReplicasImpl(
{
auto table_id = getStorageID();
auto parallel_replicas_cluster = local_context->getCluster(local_context->getSettingsRef().cluster_for_parallel_replicas);
auto scalars = local_context->hasQueryContext() ? local_context->getQueryContext()->getScalars() : Scalars{};
String cluster_for_parallel_replicas = local_context->getSettingsRef().cluster_for_parallel_replicas;
auto parallel_replicas_cluster = local_context->getCluster(cluster_for_parallel_replicas);
ASTPtr modified_query_ast;
Block header;

View File

@ -67,6 +67,7 @@ class ClickHouseHelper:
if args:
url = args[0]
url = kwargs.get("url", url)
kwargs["timeout"] = kwargs.get("timeout", 100)
for i in range(5):
try:

View File

@ -176,6 +176,38 @@
</replica>
</shard-->
</test_cluster_one_shard_three_replicas_localhost>
<test_cluster_two_shard_three_replicas_localhost>
<shard>
<internal_replication>false</internal_replication>
<replica>
<host>127.0.0.1</host>
<port>9000</port>
</replica>
<replica>
<host>127.0.0.2</host>
<port>9000</port>
</replica>
<replica>
<host>127.0.0.3</host>
<port>9000</port>
</replica>
</shard>
<shard>
<internal_replication>false</internal_replication>
<replica>
<host>127.0.0.4</host>
<port>9000</port>
</replica>
<replica>
<host>127.0.0.5</host>
<port>9000</port>
</replica>
<replica>
<host>127.0.0.6</host>
<port>9000</port>
</replica>
</shard>
</test_cluster_two_shard_three_replicas_localhost>
<test_cluster_two_shards_localhost>
<shard>
<replica>

View File

@ -1,6 +1,6 @@
<clickhouse>
<remote_servers>
<two_shards>
<one_shard_two_nodes>
<shard>
<replica>
<host>node1</host>
@ -11,6 +11,6 @@
<port>9000</port>
</replica>
</shard>
</two_shards>
</one_shard_two_nodes>
</remote_servers>
</clickhouse>

View File

@ -4,12 +4,8 @@ from helpers.cluster import ClickHouseCluster
cluster = ClickHouseCluster(__file__)
node1 = cluster.add_instance(
"node1", main_configs=["configs/remote_servers.xml"], with_zookeeper=True
)
node2 = cluster.add_instance(
"node2", main_configs=["configs/remote_servers.xml"], with_zookeeper=True
)
node1 = cluster.add_instance("node1", main_configs=["configs/remote_servers.xml"])
node2 = cluster.add_instance("node2", main_configs=["configs/remote_servers.xml"])
@pytest.fixture(scope="module")
@ -24,11 +20,13 @@ def start_cluster():
def test_remote(start_cluster):
assert (
node1.query(
"""SELECT hostName() FROM clusterAllReplicas("two_shards", system.one)"""
"""SELECT hostName() FROM clusterAllReplicas("one_shard_two_nodes", system.one)"""
)
== "node1\nnode2\n"
)
assert (
node1.query("""SELECT hostName() FROM cluster("two_shards", system.one)""")
node1.query(
"""SELECT hostName() FROM cluster("one_shard_two_nodes", system.one)"""
)
== "node1\n"
)

View File

@ -0,0 +1,58 @@
<clickhouse>
<remote_servers>
<test_multiple_shards_multiple_replicas>
<shard>
<internal_replication>true</internal_replication>
<replica>
<host>n1</host>
<port>9000</port>
</replica>
<replica>
<host>n2</host>
<port>9000</port>
</replica>
<replica>
<host>n3</host>
<port>9000</port>
</replica>
</shard>
<shard>
<internal_replication>true</internal_replication>
<replica>
<host>n4</host>
<port>9000</port>
</replica>
<replica>
<host>n5</host>
<port>9000</port>
</replica>
<replica>
<host>n6</host>
<port>9000</port>
</replica>
</shard>
</test_multiple_shards_multiple_replicas>
<test_single_shard_multiple_replicas>
<shard>
<internal_replication>true</internal_replication>
<replica>
<host>n1</host>
<port>9000</port>
</replica>
<replica>
<host>n2</host>
<port>9000</port>
</replica>
<replica>
<host>n3</host>
<port>9000</port>
</replica>
<replica>
<host>n4</host>
<port>9000</port>
</replica>
</shard>
</test_single_shard_multiple_replicas>
</remote_servers>
</clickhouse>

View File

@ -0,0 +1,154 @@
import pytest
from helpers.cluster import ClickHouseCluster
cluster = ClickHouseCluster(__file__)
nodes = [
cluster.add_instance(
f"n{i}", main_configs=["configs/remote_servers.xml"], with_zookeeper=True
)
for i in (1, 2, 3, 4, 5, 6)
]
@pytest.fixture(scope="module", autouse=True)
def start_cluster():
try:
cluster.start()
yield cluster
finally:
cluster.shutdown()
def create_tables(cluster, table_name):
# create replicated tables
for node in nodes:
node.query(f"DROP TABLE IF EXISTS {table_name} SYNC")
if cluster == "test_single_shard_multiple_replicas":
nodes[0].query(
f"CREATE TABLE IF NOT EXISTS {table_name} (key Int64, value String) Engine=ReplicatedMergeTree('/test_parallel_replicas/shard1/{table_name}', 'r1') ORDER BY (key)"
)
nodes[1].query(
f"CREATE TABLE IF NOT EXISTS {table_name} (key Int64, value String) Engine=ReplicatedMergeTree('/test_parallel_replicas/shard1/{table_name}', 'r2') ORDER BY (key)"
)
nodes[2].query(
f"CREATE TABLE IF NOT EXISTS {table_name} (key Int64, value String) Engine=ReplicatedMergeTree('/test_parallel_replicas/shard1/{table_name}', 'r3') ORDER BY (key)"
)
nodes[3].query(
f"CREATE TABLE IF NOT EXISTS {table_name} (key Int64, value String) Engine=ReplicatedMergeTree('/test_parallel_replicas/shard1/{table_name}', 'r4') ORDER BY (key)"
)
elif cluster == "test_multiple_shards_multiple_replicas":
# shard 1
nodes[0].query(
f"CREATE TABLE IF NOT EXISTS {table_name} (key Int64, value String) Engine=ReplicatedMergeTree('/test_parallel_replicas/shard1/{table_name}', 'r1') ORDER BY (key)"
)
nodes[1].query(
f"CREATE TABLE IF NOT EXISTS {table_name} (key Int64, value String) Engine=ReplicatedMergeTree('/test_parallel_replicas/shard1/{table_name}', 'r2') ORDER BY (key)"
)
nodes[2].query(
f"CREATE TABLE IF NOT EXISTS {table_name} (key Int64, value String) Engine=ReplicatedMergeTree('/test_parallel_replicas/shard1/{table_name}', 'r3') ORDER BY (key)"
)
# shard 2
nodes[3].query(
f"CREATE TABLE IF NOT EXISTS {table_name} (key Int64, value String) Engine=ReplicatedMergeTree('/test_parallel_replicas/shard2/{table_name}', 'r1') ORDER BY (key)"
)
nodes[4].query(
f"CREATE TABLE IF NOT EXISTS {table_name} (key Int64, value String) Engine=ReplicatedMergeTree('/test_parallel_replicas/shard2/{table_name}', 'r2') ORDER BY (key)"
)
nodes[5].query(
f"CREATE TABLE IF NOT EXISTS {table_name} (key Int64, value String) Engine=ReplicatedMergeTree('/test_parallel_replicas/shard2/{table_name}', 'r3') ORDER BY (key)"
)
else:
raise Exception(f"Unexpected cluster: {cluster}")
# create distributed table
nodes[0].query(f"DROP TABLE IF EXISTS {table_name}_d SYNC")
nodes[0].query(
f"""
CREATE TABLE {table_name}_d AS {table_name}
Engine=Distributed(
{cluster},
currentDatabase(),
{table_name},
key
)
"""
)
# populate data
nodes[0].query(
f"INSERT INTO {table_name}_d SELECT number, number FROM numbers(1000)",
settings={"insert_distributed_sync": 1},
)
nodes[0].query(
f"INSERT INTO {table_name}_d SELECT number, number FROM numbers(2000)",
settings={"insert_distributed_sync": 1},
)
nodes[0].query(
f"INSERT INTO {table_name}_d SELECT -number, -number FROM numbers(1000)",
settings={"insert_distributed_sync": 1},
)
nodes[0].query(
f"INSERT INTO {table_name}_d SELECT -number, -number FROM numbers(2000)",
settings={"insert_distributed_sync": 1},
)
nodes[0].query(
f"INSERT INTO {table_name}_d SELECT number, number FROM numbers(3)",
settings={"insert_distributed_sync": 1},
)
@pytest.mark.parametrize(
"cluster,max_parallel_replicas,prefer_localhost_replica",
[
# prefer_localhost_replica=0
pytest.param("test_single_shard_multiple_replicas", 2, 0),
pytest.param("test_single_shard_multiple_replicas", 3, 0),
pytest.param("test_single_shard_multiple_replicas", 4, 0),
pytest.param("test_single_shard_multiple_replicas", 10, 0),
# prefer_localhost_replica=1
pytest.param("test_single_shard_multiple_replicas", 2, 1),
pytest.param("test_single_shard_multiple_replicas", 3, 1),
pytest.param("test_single_shard_multiple_replicas", 4, 1),
pytest.param("test_single_shard_multiple_replicas", 10, 1),
# prefer_localhost_replica=0
pytest.param("test_multiple_shards_multiple_replicas", 2, 0),
pytest.param("test_multiple_shards_multiple_replicas", 3, 0),
pytest.param("test_multiple_shards_multiple_replicas", 4, 0),
pytest.param("test_multiple_shards_multiple_replicas", 10, 0),
# prefer_localhost_replica=1
pytest.param("test_multiple_shards_multiple_replicas", 2, 1),
pytest.param("test_multiple_shards_multiple_replicas", 3, 1),
pytest.param("test_multiple_shards_multiple_replicas", 4, 1),
pytest.param("test_multiple_shards_multiple_replicas", 10, 1),
],
)
def test_parallel_replicas_over_distributed(
start_cluster, cluster, max_parallel_replicas, prefer_localhost_replica
):
table_name = "test_table"
create_tables(cluster, table_name)
node = nodes[0]
expected_result = f"6003\t-1999\t1999\t3\n"
# w/o parallel replicas
assert (
node.query(f"SELECT count(), min(key), max(key), sum(key) FROM {table_name}_d")
== expected_result
)
# parallel replicas
assert (
node.query(
f"SELECT count(), min(key), max(key), sum(key) FROM {table_name}_d",
settings={
"allow_experimental_parallel_reading_from_replicas": 2,
"prefer_localhost_replica": prefer_localhost_replica,
"max_parallel_replicas": max_parallel_replicas,
"use_hedged_requests": 0,
},
)
== expected_result
)

View File

@ -143,3 +143,73 @@ def test_all_protocols(started_cluster):
backup_node.query("SYSTEM START LISTEN ON CLUSTER default QUERIES ALL")
assert_everything_works()
def test_except(started_cluster):
custom_client = Client(main_node.ip_address, 9001, command=cluster.client_bin_path)
assert_everything_works()
# STOP LISTEN QUERIES ALL EXCEPT
main_node.query("SYSTEM STOP LISTEN QUERIES ALL EXCEPT MYSQL, CUSTOM 'tcp'")
assert "Connection refused" in main_node.query_and_get_error(QUERY)
custom_client.query(MYSQL_QUERY)
assert http_works() == False
assert http_works(8124) == False
# START LISTEN QUERIES ALL EXCEPT
backup_node.query("SYSTEM START LISTEN ON CLUSTER default QUERIES ALL EXCEPT TCP")
assert "Connection refused" in main_node.query_and_get_error(QUERY)
custom_client.query(MYSQL_QUERY)
assert http_works() == True
assert http_works(8124) == True
backup_node.query("SYSTEM START LISTEN ON CLUSTER default QUERIES ALL")
assert_everything_works()
# STOP LISTEN QUERIES DEFAULT EXCEPT
main_node.query("SYSTEM STOP LISTEN QUERIES DEFAULT EXCEPT TCP")
main_node.query(QUERY)
assert "Connections to mysql failed" in custom_client.query_and_get_error(
MYSQL_QUERY
)
custom_client.query(QUERY)
assert http_works() == False
assert http_works(8124) == True
# START LISTEN QUERIES DEFAULT EXCEPT
backup_node.query(
"SYSTEM START LISTEN ON CLUSTER default QUERIES DEFAULT EXCEPT HTTP"
)
main_node.query(QUERY)
main_node.query(MYSQL_QUERY)
custom_client.query(QUERY)
assert http_works() == False
assert http_works(8124) == True
backup_node.query("SYSTEM START LISTEN ON CLUSTER default QUERIES ALL")
assert_everything_works()
# STOP LISTEN QUERIES CUSTOM EXCEPT
main_node.query("SYSTEM STOP LISTEN QUERIES CUSTOM EXCEPT CUSTOM 'tcp'")
main_node.query(QUERY)
custom_client.query(MYSQL_QUERY)
custom_client.query(QUERY)
assert http_works() == True
assert http_works(8124) == False
main_node.query("SYSTEM STOP LISTEN QUERIES CUSTOM")
# START LISTEN QUERIES DEFAULT EXCEPT
backup_node.query(
"SYSTEM START LISTEN ON CLUSTER default QUERIES CUSTOM EXCEPT CUSTOM 'tcp'"
)
main_node.query(QUERY)
main_node.query(MYSQL_QUERY)
assert "Connection refused" in custom_client.query_and_get_error(QUERY)
assert http_works() == True
assert http_works(8124) == True
backup_node.query("SYSTEM START LISTEN ON CLUSTER default QUERIES ALL")
assert_everything_works()

View File

@ -10,14 +10,14 @@ set -o pipefail
# NOTE: database = $CLICKHOUSE_DATABASE is unwanted
verify_sql="SELECT
(SELECT sumIf(value, metric = 'PartsInMemory'), sumIf(value, metric = 'PartsCompact'), sumIf(value, metric = 'PartsWide') FROM system.metrics) =
(SELECT countIf(part_type == 'InMemory'), countIf(part_type == 'Compact'), countIf(part_type == 'Wide')
FROM (SELECT part_type FROM system.parts UNION ALL SELECT part_type FROM system.projection_parts))"
(SELECT sumIf(value, metric = 'PartsCompact'), sumIf(value, metric = 'PartsWide') FROM system.metrics) =
(SELECT countIf(part_type = 'Compact'), countIf(part_type = 'Wide')
FROM (SELECT part_type FROM system.parts UNION ALL SELECT part_type FROM system.projection_parts))"
# The query is not atomic - it can compare states between system.parts and system.metrics from different points in time.
# So, there is inherent race condition (especially in fasttest that runs tests in parallel).
#
# But it should get expected result eventually.
# But it should get the expected result eventually.
# In case of test failure, this code will do infinite loop and timeout.
verify()
{
@ -32,21 +32,16 @@ verify()
}
$CLICKHOUSE_CLIENT --database_atomic_wait_for_drop_and_detach_synchronously=1 --query="DROP TABLE IF EXISTS data_01600"
# InMemory - [0..5]
# Compact - (5..10]
# Wide - >10
$CLICKHOUSE_CLIENT --query="CREATE TABLE data_01600 (part_type String, key Int) ENGINE = MergeTree PARTITION BY part_type ORDER BY key SETTINGS min_bytes_for_wide_part=0, min_rows_for_wide_part=10, index_granularity = 8192, index_granularity_bytes = '10Mi'"
# InMemory
$CLICKHOUSE_CLIENT --query="INSERT INTO data_01600 SELECT 'InMemory', number FROM system.numbers LIMIT 1"
verify
# Compact
$CLICKHOUSE_CLIENT --query="INSERT INTO data_01600 SELECT 'Compact', number FROM system.numbers LIMIT 6 OFFSET 1"
$CLICKHOUSE_CLIENT --query="INSERT INTO data_01600 SELECT 'Compact', number FROM system.numbers LIMIT 6"
verify
# Wide
$CLICKHOUSE_CLIENT --query="INSERT INTO data_01600 SELECT 'Wide', number FROM system.numbers LIMIT 11 OFFSET 7"
$CLICKHOUSE_CLIENT --query="INSERT INTO data_01600 SELECT 'Wide', number FROM system.numbers LIMIT 11 OFFSET 6"
verify
# DROP and check

View File

@ -302,6 +302,7 @@ formatRowNoNewline
fragment
fromModifiedJulianDay
fromModifiedJulianDayOrNull
fromUTCTimestamp
fromUnixTimestamp
fromUnixTimestamp64Micro
fromUnixTimestamp64Milli
@ -849,6 +850,7 @@ toUInt8
toUInt8OrDefault
toUInt8OrNull
toUInt8OrZero
toUTCTimestamp
toUUID
toUUIDOrDefault
toUUIDOrNull

View File

@ -16,33 +16,35 @@ function read_archive_file() {
function run_archive_test() {
$CLICKHOUSE_CLIENT --query "DROP TABLE IF EXISTS 02661_archive_table"
FILE_PREFIX="${CLICKHOUSE_TEST_UNIQUE_NAME}_$1_"
user_files_path=$(clickhouse-client --query "select _path,_file from file('nonexist.txt', 'CSV', 'val1 char')" 2>&1 | grep Exception | awk '{gsub("/nonexist.txt","",$9); print $9}')
echo -e "1,2\n3,4" > ${CLICKHOUSE_TEST_UNIQUE_NAME}_data1.csv
echo -e "5,6\n7,8" > ${CLICKHOUSE_TEST_UNIQUE_NAME}_data2.csv
echo -e "9,10\n11,12" > ${CLICKHOUSE_TEST_UNIQUE_NAME}_data3.csv
echo -e "1,2\n3,4" > ${FILE_PREFIX}_data1.csv
echo -e "5,6\n7,8" > ${FILE_PREFIX}_data2.csv
echo -e "9,10\n11,12" > ${FILE_PREFIX}_data3.csv
eval "$2 ${user_files_path}/${CLICKHOUSE_TEST_UNIQUE_NAME}_archive1.$1 ${CLICKHOUSE_TEST_UNIQUE_NAME}_data1.csv ${CLICKHOUSE_TEST_UNIQUE_NAME}_data2.csv > /dev/null"
eval "$2 ${user_files_path}/${CLICKHOUSE_TEST_UNIQUE_NAME}_archive2.$1 ${CLICKHOUSE_TEST_UNIQUE_NAME}_data1.csv ${CLICKHOUSE_TEST_UNIQUE_NAME}_data3.csv > /dev/null"
eval "$2 ${user_files_path}/${CLICKHOUSE_TEST_UNIQUE_NAME}_archive3.$1 ${CLICKHOUSE_TEST_UNIQUE_NAME}_data2.csv ${CLICKHOUSE_TEST_UNIQUE_NAME}_data3.csv > /dev/null"
eval "$2 ${user_files_path}/${FILE_PREFIX}_archive1.$1 ${FILE_PREFIX}_data1.csv ${FILE_PREFIX}_data2.csv > /dev/null"
eval "$2 ${user_files_path}/${FILE_PREFIX}_archive2.$1 ${FILE_PREFIX}_data1.csv ${FILE_PREFIX}_data3.csv > /dev/null"
eval "$2 ${user_files_path}/${FILE_PREFIX}_archive3.$1 ${FILE_PREFIX}_data2.csv ${FILE_PREFIX}_data3.csv > /dev/null"
echo "archive1 data1.csv"
read_archive_file "${CLICKHOUSE_TEST_UNIQUE_NAME}_archive1.$1 :: ${CLICKHOUSE_TEST_UNIQUE_NAME}_data1.csv"
read_archive_file "${FILE_PREFIX}_archive1.$1 :: ${FILE_PREFIX}_data1.csv"
echo "archive{1..2} data1.csv"
read_archive_file "${CLICKHOUSE_TEST_UNIQUE_NAME}_archive{1..2}.$1 :: ${CLICKHOUSE_TEST_UNIQUE_NAME}_data1.csv"
read_archive_file "${FILE_PREFIX}_archive{1..2}.$1 :: ${FILE_PREFIX}_data1.csv"
echo "archive{1,2} data{1,3}.csv"
read_archive_file "${CLICKHOUSE_TEST_UNIQUE_NAME}_archive{1,2}.$1 :: ${CLICKHOUSE_TEST_UNIQUE_NAME}_data{1,3}.csv"
read_archive_file "${FILE_PREFIX}_archive{1,2}.$1 :: ${FILE_PREFIX}_data{1,3}.csv"
echo "archive3 data*.csv"
read_archive_file "${CLICKHOUSE_TEST_UNIQUE_NAME}_archive3.$1 :: ${CLICKHOUSE_TEST_UNIQUE_NAME}_data*.csv"
read_archive_file "${FILE_PREFIX}_archive3.$1 :: ${FILE_PREFIX}_data*.csv"
echo "archive* *.csv"
read_archive_file "${CLICKHOUSE_TEST_UNIQUE_NAME}_archive*.$1 :: *.csv"
read_archive_file "${FILE_PREFIX}_archive*.$1 :: *.csv"
echo "archive* {2..3}.csv"
read_archive_file "${CLICKHOUSE_TEST_UNIQUE_NAME}_archive*.$1 :: ${CLICKHOUSE_TEST_UNIQUE_NAME}_data{2..3}.csv"
read_archive_file "${FILE_PREFIX}_archive*.$1 :: ${FILE_PREFIX}_data{2..3}.csv"
$CLICKHOUSE_LOCAL --query "SELECT * FROM file('${user_files_path}/${CLICKHOUSE_TEST_UNIQUE_NAME}_archive1.$1::nonexistent.csv')" 2>&1 | grep -q "CANNOT_UNPACK_ARCHIVE" && echo "OK" || echo "FAIL"
$CLICKHOUSE_LOCAL --query "SELECT * FROM file('${user_files_path}/${CLICKHOUSE_TEST_UNIQUE_NAME}_archive3.$1::{2..3}.csv')" 2>&1 | grep -q "CANNOT_UNPACK_ARCHIVE" && echo "OK" || echo "FAIL"
$CLICKHOUSE_LOCAL --query "SELECT * FROM file('${user_files_path}/${FILE_PREFIX}_archive1.$1::nonexistent.csv')" 2>&1 | grep -q "CANNOT_UNPACK_ARCHIVE" && echo "OK" || echo "FAIL"
$CLICKHOUSE_LOCAL --query "SELECT * FROM file('${user_files_path}/${FILE_PREFIX}_archive3.$1::{2..3}.csv')" 2>&1 | grep -q "CANNOT_UNPACK_ARCHIVE" && echo "OK" || echo "FAIL"
rm ${user_files_path}/${CLICKHOUSE_TEST_UNIQUE_NAME}_archive{1..3}.$1
rm ${user_files_path}/${FILE_PREFIX}_archive{1..3}.$1
rm ${CLICKHOUSE_TEST_UNIQUE_NAME}_data{1..3}.csv
rm ${FILE_PREFIX}_data{1..3}.csv
}

View File

@ -0,0 +1,116 @@
archive1 data1.csv
1 2
3 4
1 2
3 4
1 2
3 4
archive{1..2} data1.csv
1 2
1 2
3 4
3 4
1 2
1 2
3 4
3 4
1 2
1 2
3 4
3 4
archive{1,2} data{1,3}.csv
1 2
1 2
3 4
3 4
9 10
11 12
1 2
1 2
3 4
3 4
9 10
11 12
1 2
1 2
3 4
3 4
9 10
11 12
archive3 data*.csv
5 6
7 8
9 10
11 12
5 6
7 8
9 10
11 12
5 6
7 8
9 10
11 12
archive* *.csv
1 2
1 2
3 4
3 4
5 6
5 6
7 8
7 8
9 10
9 10
11 12
11 12
1 2
1 2
3 4
3 4
5 6
5 6
7 8
7 8
9 10
9 10
11 12
11 12
1 2
1 2
3 4
3 4
5 6
5 6
7 8
7 8
9 10
9 10
11 12
11 12
archive* {2..3}.csv
5 6
5 6
7 8
7 8
9 10
9 10
11 12
11 12
5 6
5 6
7 8
7 8
9 10
9 10
11 12
11 12
5 6
5 6
7 8
7 8
9 10
9 10
11 12
11 12
OK
OK

View File

@ -0,0 +1,11 @@
#!/usr/bin/env bash
# Tags: no-fasttest, long
CUR_DIR=$(cd "$(dirname "${BASH_SOURCE[0]}")" && pwd)
# shellcheck source=../shell_config.sh
. "$CUR_DIR"/../shell_config.sh
# shellcheck source=./02661_read_from_archive.lib
. "$CUR_DIR"/02661_read_from_archive.lib
run_archive_test "tar.bz2" "tar -cjf"

View File

@ -0,0 +1,116 @@
archive1 data1.csv
1 2
3 4
1 2
3 4
1 2
3 4
archive{1..2} data1.csv
1 2
1 2
3 4
3 4
1 2
1 2
3 4
3 4
1 2
1 2
3 4
3 4
archive{1,2} data{1,3}.csv
1 2
1 2
3 4
3 4
9 10
11 12
1 2
1 2
3 4
3 4
9 10
11 12
1 2
1 2
3 4
3 4
9 10
11 12
archive3 data*.csv
5 6
7 8
9 10
11 12
5 6
7 8
9 10
11 12
5 6
7 8
9 10
11 12
archive* *.csv
1 2
1 2
3 4
3 4
5 6
5 6
7 8
7 8
9 10
9 10
11 12
11 12
1 2
1 2
3 4
3 4
5 6
5 6
7 8
7 8
9 10
9 10
11 12
11 12
1 2
1 2
3 4
3 4
5 6
5 6
7 8
7 8
9 10
9 10
11 12
11 12
archive* {2..3}.csv
5 6
5 6
7 8
7 8
9 10
9 10
11 12
11 12
5 6
5 6
7 8
7 8
9 10
9 10
11 12
11 12
5 6
5 6
7 8
7 8
9 10
9 10
11 12
11 12
OK
OK

View File

@ -0,0 +1,11 @@
#!/usr/bin/env bash
# Tags: no-fasttest, long
CUR_DIR=$(cd "$(dirname "${BASH_SOURCE[0]}")" && pwd)
# shellcheck source=../shell_config.sh
. "$CUR_DIR"/../shell_config.sh
# shellcheck source=./02661_read_from_archive.lib
. "$CUR_DIR"/02661_read_from_archive.lib
run_archive_test "tar.xz" "tar -cJf"

View File

@ -0,0 +1,116 @@
archive1 data1.csv
1 2
3 4
1 2
3 4
1 2
3 4
archive{1..2} data1.csv
1 2
1 2
3 4
3 4
1 2
1 2
3 4
3 4
1 2
1 2
3 4
3 4
archive{1,2} data{1,3}.csv
1 2
1 2
3 4
3 4
9 10
11 12
1 2
1 2
3 4
3 4
9 10
11 12
1 2
1 2
3 4
3 4
9 10
11 12
archive3 data*.csv
5 6
7 8
9 10
11 12
5 6
7 8
9 10
11 12
5 6
7 8
9 10
11 12
archive* *.csv
1 2
1 2
3 4
3 4
5 6
5 6
7 8
7 8
9 10
9 10
11 12
11 12
1 2
1 2
3 4
3 4
5 6
5 6
7 8
7 8
9 10
9 10
11 12
11 12
1 2
1 2
3 4
3 4
5 6
5 6
7 8
7 8
9 10
9 10
11 12
11 12
archive* {2..3}.csv
5 6
5 6
7 8
7 8
9 10
9 10
11 12
11 12
5 6
5 6
7 8
7 8
9 10
9 10
11 12
11 12
5 6
5 6
7 8
7 8
9 10
9 10
11 12
11 12
OK
OK

View File

@ -0,0 +1,11 @@
#!/usr/bin/env bash
# Tags: no-fasttest, long
CUR_DIR=$(cd "$(dirname "${BASH_SOURCE[0]}")" && pwd)
# shellcheck source=../shell_config.sh
. "$CUR_DIR"/../shell_config.sh
# shellcheck source=./02661_read_from_archive.lib
. "$CUR_DIR"/02661_read_from_archive.lib
run_archive_test "tzst" "tar -caf"

View File

@ -0,0 +1,2 @@
2000-01-01 00:00:00 abc
2000-01-01 00:00:00.000 abc

View File

@ -0,0 +1,3 @@
select * from format(CSV, 'c1 DateTime, c2 String', '01-01-2000,abc') settings date_time_input_format='best_effort';
select * from format(CSV, 'c1 DateTime64(3), c2 String', '01-01-2000,abc') settings date_time_input_format='best_effort';

View File

@ -0,0 +1,3 @@
1 2023-03-16 12:22:33 2023-03-16 10:22:33.000 2023-03-15 16:00:00 2023-03-16 19:22:33.000
2 2023-03-16 12:22:33 2023-03-16 10:22:33.000 2023-03-16 03:22:33 2023-03-16 08:00:00.000
3 2023-03-16 12:22:33 2023-03-16 10:22:33.000 2023-03-16 03:22:33 2023-03-16 19:22:33.123

View File

@ -0,0 +1,15 @@
#!/usr/bin/env bash
# NOTE: this sh wrapper is required because of shell_config
CURDIR=$(cd "$(dirname "${BASH_SOURCE[0]}")" && pwd)
# shellcheck source=../shell_config.sh
. "$CURDIR"/../shell_config.sh
$CLICKHOUSE_CLIENT -q "drop table if exists test_tbl"
$CLICKHOUSE_CLIENT -q "create table test_tbl (x UInt32, y DateTime, z DateTime64) engine=MergeTree order by x"
${CLICKHOUSE_CLIENT} -q "INSERT INTO test_tbl values(1, '2023-03-16', '2023-03-16 11:22:33')"
${CLICKHOUSE_CLIENT} -q "INSERT INTO test_tbl values(2, '2023-03-16 11:22:33', '2023-03-16')"
${CLICKHOUSE_CLIENT} -q "INSERT INTO test_tbl values(3, '2023-03-16 11:22:33', '2023-03-16 11:22:33.123456')"
$CLICKHOUSE_CLIENT -q "select x, to_utc_timestamp(toDateTime('2023-03-16 11:22:33'), 'Etc/GMT+1'), from_utc_timestamp(toDateTime64('2023-03-16 11:22:33', 3), 'Etc/GMT+1'), to_utc_timestamp(y, 'Asia/Shanghai'), from_utc_timestamp(z, 'Asia/Shanghai') from test_tbl order by x"
$CLICKHOUSE_CLIENT -q "drop table test_tbl"

View File

@ -0,0 +1,76 @@
CSV
1 1
2 0
0 0
3 3
1 1 \N \N
2 \N \N \N
\N \N \N \N
3 3 3 3
1 1
2 \N
\N \N
3 3
1 0
2 0
0 0
3 0
TSV
1 1
2 0
0 0
3 3
1 1 \N \N
2 \N \N \N
\N \N \N \N
3 3 3 3
1 1
2 \N
\N \N
3 3
1 0
2 0
0 0
3 0
JSONCompactEachRow
1 1
2 0
0 0
3 3
1 1
2 0
0 0
3 3
1 [1,2,3]
2 []
0 []
3 [3]
1 1 \N \N
2 \N \N \N
\N \N \N \N
3 3 3 3
1 1
2 \N
\N \N
3 3
1 0
2 0
0 0
3 0
CustomSeparated
1 1
2 0
0 0
3 3
1 1 \N \N
2 \N \N \N
\N \N \N \N
3 3 3 3
1 1
2 \N
\N \N
3 3
1 0
2 0
0 0
3 0

View File

@ -0,0 +1,24 @@
select 'CSV';
select * from format(CSV, 'x UInt32, y UInt32', '1,1\n2\n\n3,3,3,3') settings input_format_csv_allow_variable_number_of_columns=1;
select * from format(CSV, '1,1\n2\n\n3,3,3,3') settings input_format_csv_allow_variable_number_of_columns=1;
select * from format(CSVWithNames, '"x","y"\n1,1\n2\n\n3,3,3,3') settings input_format_csv_allow_variable_number_of_columns=1;
select * from format(CSVWithNames, 'x UInt32, z UInt32', '"x","y"\n1,1\n2\n\n3,3,3,3') settings input_format_csv_allow_variable_number_of_columns=1;
select 'TSV';
select * from format(TSV, 'x UInt32, y UInt32', '1\t1\n2\n\n3\t3\t3\t3') settings input_format_tsv_allow_variable_number_of_columns=1;
select * from format(TSV, '1\t1\n2\n\n3\t3\t3\t3') settings input_format_tsv_allow_variable_number_of_columns=1;
select * from format(TSVWithNames, 'x\ty\n1\t1\n2\n\n3\t3\t3\t3') settings input_format_tsv_allow_variable_number_of_columns=1;
select * from format(TSVWithNames, 'x UInt32, z UInt32', 'x\ty\n1\t1\n2\n\n3\t3\t3\t3') settings input_format_tsv_allow_variable_number_of_columns=1;
select 'JSONCompactEachRow';
select * from format(JSONCompactEachRow, 'x UInt32, y UInt32', '[1,1]\n[2]\n[]\n[3,3,3,3]') settings input_format_json_compact_allow_variable_number_of_columns=1;
select * from format(JSONCompactEachRow, 'x UInt32, y UInt32', '[1,1,[1,2,3]]\n[2]\n[]\n[3,3,3,3,[1,2,3]]') settings input_format_json_compact_allow_variable_number_of_columns=1;
select * from format(JSONCompactEachRow, 'x UInt32, y Array(UInt32)', '[1,[1,2,3],1]\n[2]\n[]\n[3,[3],3,3,[1,2,3]]') settings input_format_json_compact_allow_variable_number_of_columns=1;
select * from format(JSONCompactEachRow, '[1,1]\n[2]\n[]\n[3,3,3,3]') settings input_format_json_compact_allow_variable_number_of_columns=1;
select * from format(JSONCompactEachRowWithNames, '["x","y"]\n[1,1]\n[2]\n[]\n[3,3,3,3]') settings input_format_json_compact_allow_variable_number_of_columns=1;
select * from format(JSONCompactEachRowWithNames, 'x UInt32, z UInt32', '["x","y"]\n[1,1]\n[2]\n[]\n[3,3,3,3]') settings input_format_json_compact_allow_variable_number_of_columns=1;
select 'CustomSeparated';
set format_custom_escaping_rule='CSV', format_custom_field_delimiter='<field_delimiter>', format_custom_row_before_delimiter='<row_before_delimiter>', format_custom_row_after_delimiter='<row_after_delimiter>', format_custom_row_between_delimiter='<row_between_delimiter>', format_custom_result_before_delimiter='<result_before_delimiter>', format_custom_result_after_delimiter='<result_after_delimiter>';
select * from format(CustomSeparated, 'x UInt32, y UInt32', '<result_before_delimiter><row_before_delimiter>1<field_delimiter>1<row_after_delimiter><row_between_delimiter><row_before_delimiter>2<row_after_delimiter><row_between_delimiter><row_before_delimiter><row_after_delimiter><row_between_delimiter><row_before_delimiter>3<field_delimiter>3<field_delimiter>3<field_delimiter>3<row_after_delimiter><result_after_delimiter>') settings input_format_custom_allow_variable_number_of_columns=1;
select * from format(CustomSeparated, '<result_before_delimiter><row_before_delimiter>1<field_delimiter>1<row_after_delimiter><row_between_delimiter><row_before_delimiter>2<row_after_delimiter><row_between_delimiter><row_before_delimiter><row_after_delimiter><row_between_delimiter><row_before_delimiter>3<field_delimiter>3<field_delimiter>3<field_delimiter>3<row_after_delimiter><result_after_delimiter>') settings input_format_custom_allow_variable_number_of_columns=1;
select * from format(CustomSeparatedWithNames, '<result_before_delimiter><row_before_delimiter>"x"<field_delimiter>"y"<row_after_delimiter><row_between_delimiter><row_before_delimiter>1<field_delimiter>1<row_after_delimiter><row_between_delimiter><row_before_delimiter>2<row_after_delimiter><row_between_delimiter><row_before_delimiter><row_after_delimiter><row_between_delimiter><row_before_delimiter>3<field_delimiter>3<field_delimiter>3<field_delimiter>3<row_after_delimiter><result_after_delimiter>') settings input_format_custom_allow_variable_number_of_columns=1;
select * from format(CustomSeparatedWithNames, 'x UInt32, z UInt32', '<result_before_delimiter><row_before_delimiter>"x"<field_delimiter>"y"<row_after_delimiter><row_between_delimiter><row_before_delimiter>1<field_delimiter>1<row_after_delimiter><row_between_delimiter><row_before_delimiter>2<row_after_delimiter><row_between_delimiter><row_before_delimiter><row_after_delimiter><row_between_delimiter><row_before_delimiter>3<field_delimiter>3<field_delimiter>3<field_delimiter>3<row_after_delimiter><result_after_delimiter>') settings input_format_custom_allow_variable_number_of_columns=1;

View File

@ -0,0 +1,6 @@
-- 1 shard, 3 replicas
100 0 99 49.5
200 0 99 49.5
-- 2 shards, 3 replicas each
200 0 99 49.5
400 0 99 49.5

View File

@ -0,0 +1,47 @@
-- 1 shard
SELECT '-- 1 shard, 3 replicas';
DROP TABLE IF EXISTS test_d;
DROP TABLE IF EXISTS test;
CREATE TABLE test (id UInt64, date Date)
ENGINE = MergeTree
ORDER BY id;
CREATE TABLE IF NOT EXISTS test_d as test
ENGINE = Distributed(test_cluster_one_shard_three_replicas_localhost, currentDatabase(), test);
insert into test select *, today() from numbers(100);
SELECT count(), min(id), max(id), avg(id)
FROM test_d
SETTINGS allow_experimental_parallel_reading_from_replicas = 1, max_parallel_replicas = 3, prefer_localhost_replica = 0, parallel_replicas_for_non_replicated_merge_tree=1, use_hedged_requests=0;
insert into test select *, today() from numbers(100);
SELECT count(), min(id), max(id), avg(id)
FROM test_d
SETTINGS allow_experimental_parallel_reading_from_replicas = 1, max_parallel_replicas = 3, prefer_localhost_replica = 0, parallel_replicas_for_non_replicated_merge_tree=1, use_hedged_requests=0;
-- 2 shards
SELECT '-- 2 shards, 3 replicas each';
DROP TABLE IF EXISTS test2_d;
DROP TABLE IF EXISTS test2;
CREATE TABLE test2 (id UInt64, date Date)
ENGINE = MergeTree
ORDER BY id;
CREATE TABLE IF NOT EXISTS test2_d as test2
ENGINE = Distributed(test_cluster_two_shard_three_replicas_localhost, currentDatabase(), test2, id);
insert into test2 select *, today() from numbers(100);
SELECT count(), min(id), max(id), avg(id)
FROM test2_d
SETTINGS allow_experimental_parallel_reading_from_replicas = 1, max_parallel_replicas = 3, prefer_localhost_replica = 0, parallel_replicas_for_non_replicated_merge_tree=1, use_hedged_requests=0;
insert into test2 select *, today() from numbers(100);
SELECT count(), min(id), max(id), avg(id)
FROM test2_d
SETTINGS allow_experimental_parallel_reading_from_replicas = 1, max_parallel_replicas = 3, prefer_localhost_replica = 0, parallel_replicas_for_non_replicated_merge_tree=1, use_hedged_requests=0;

View File

@ -1464,6 +1464,7 @@ formatter
freezed
fromModifiedJulianDay
fromModifiedJulianDayOrNull
fromUTCTimestamp
fromUnixTimestamp
fromUnixTimestampInJodaSyntax
fsync
@ -2394,6 +2395,7 @@ toTimeZone
toType
toTypeName
toUInt
toUTCTimestamp
toUUID
toUUIDOrDefault
toUUIDOrNull

View File

@ -14,6 +14,7 @@ v23.4.4.16-stable 2023-06-17
v23.4.3.48-stable 2023-06-12
v23.4.2.11-stable 2023-05-02
v23.4.1.1943-stable 2023-04-27
v23.3.9.55-lts 2023-08-21
v23.3.8.21-lts 2023-07-13
v23.3.7.5-lts 2023-06-29
v23.3.6.7-lts 2023-06-28

1 v23.7.4.5-stable 2023-08-08
14 v23.4.3.48-stable 2023-06-12
15 v23.4.2.11-stable 2023-05-02
16 v23.4.1.1943-stable 2023-04-27
17 v23.3.9.55-lts 2023-08-21
18 v23.3.8.21-lts 2023-07-13
19 v23.3.7.5-lts 2023-06-29
20 v23.3.6.7-lts 2023-06-28