Merge branch 'master' into fix_993

This commit is contained in:
Alexander Tokmakov 2023-03-17 21:28:23 +03:00 committed by GitHub
commit d11aee2b0c
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
144 changed files with 1596 additions and 642 deletions

View File

@ -162,56 +162,28 @@ Checks: '*,
WarningsAsErrors: '*'
# TODO: use dictionary syntax for CheckOptions when minimum clang-tidy level rose to 15
# some-check.SomeOption: 'some value'
# instead of
# - key: some-check.SomeOption
# value: 'some value'
CheckOptions:
- key: readability-identifier-naming.ClassCase
value: CamelCase
- key: readability-identifier-naming.EnumCase
value: CamelCase
- key: readability-identifier-naming.LocalVariableCase
value: lower_case
- key: readability-identifier-naming.StaticConstantCase
value: aNy_CasE
- key: readability-identifier-naming.MemberCase
value: lower_case
- key: readability-identifier-naming.PrivateMemberPrefix
value: ''
- key: readability-identifier-naming.ProtectedMemberPrefix
value: ''
- key: readability-identifier-naming.PublicMemberCase
value: lower_case
- key: readability-identifier-naming.MethodCase
value: camelBack
- key: readability-identifier-naming.PrivateMethodPrefix
value: ''
- key: readability-identifier-naming.ProtectedMethodPrefix
value: ''
- key: readability-identifier-naming.ParameterPackCase
value: lower_case
- key: readability-identifier-naming.StructCase
value: CamelCase
- key: readability-identifier-naming.TemplateTemplateParameterCase
value: CamelCase
- key: readability-identifier-naming.TemplateUsingCase
value: lower_case
- key: readability-identifier-naming.TypeTemplateParameterCase
value: CamelCase
- key: readability-identifier-naming.TypedefCase
value: CamelCase
- key: readability-identifier-naming.UnionCase
value: CamelCase
- key: readability-identifier-naming.UsingCase
value: CamelCase
- key: modernize-loop-convert.UseCxx20ReverseRanges
value: false
- key: performance-move-const-arg.CheckTriviallyCopyableMove
value: false
# Workaround clang-tidy bug: https://github.com/llvm/llvm-project/issues/46097
- key: readability-identifier-naming.TypeTemplateParameterIgnoredRegexp
value: expr-type
- key: cppcoreguidelines-avoid-do-while.IgnoreMacros
value: true
readability-identifier-naming.ClassCase: CamelCase
readability-identifier-naming.EnumCase: CamelCase
readability-identifier-naming.LocalVariableCase: lower_case
readability-identifier-naming.StaticConstantCase: aNy_CasE
readability-identifier-naming.MemberCase: lower_case
readability-identifier-naming.PrivateMemberPrefix: ''
readability-identifier-naming.ProtectedMemberPrefix: ''
readability-identifier-naming.PublicMemberCase: lower_case
readability-identifier-naming.MethodCase: camelBack
readability-identifier-naming.PrivateMethodPrefix: ''
readability-identifier-naming.ProtectedMethodPrefix: ''
readability-identifier-naming.ParameterPackCase: lower_case
readability-identifier-naming.StructCase: CamelCase
readability-identifier-naming.TemplateTemplateParameterCase: CamelCase
readability-identifier-naming.TemplateUsingCase: lower_case
readability-identifier-naming.TypeTemplateParameterCase: CamelCase
readability-identifier-naming.TypedefCase: CamelCase
readability-identifier-naming.UnionCase: CamelCase
readability-identifier-naming.UsingCase: CamelCase
modernize-loop-convert.UseCxx20ReverseRanges: false
performance-move-const-arg.CheckTriviallyCopyableMove: false
# Workaround clang-tidy bug: https://github.com/llvm/llvm-project/issues/46097
readability-identifier-naming.TypeTemplateParameterIgnoredRegexp: expr-type
cppcoreguidelines-avoid-do-while.IgnoreMacros: true

View File

@ -184,26 +184,12 @@ if (OS_DARWIN)
set (ENABLE_CURL_BUILD OFF)
endif ()
# Ignored if `lld` is used
option(ADD_GDB_INDEX_FOR_GOLD "Add .gdb-index to resulting binaries for gold linker.")
if (NOT CMAKE_BUILD_TYPE_UC STREQUAL "RELEASE")
# Can be lld or ld-lld or lld-13 or /path/to/lld.
if (LINKER_NAME MATCHES "lld" AND OS_LINUX)
set (CMAKE_EXE_LINKER_FLAGS "${CMAKE_EXE_LINKER_FLAGS} -Wl,--gdb-index")
set (CMAKE_SHARED_LINKER_FLAGS "${CMAKE_SHARED_LINKER_FLAGS} -Wl,--gdb-index")
message (STATUS "Adding .gdb-index via --gdb-index linker option.")
# we use another tool for gdb-index, because gold linker removes section .debug_aranges, which used inside clickhouse stacktraces
# http://sourceware-org.1504.n7.nabble.com/gold-No-debug-aranges-section-when-linking-with-gdb-index-td540965.html#a556932
elseif (LINKER_NAME MATCHES "gold$" AND ADD_GDB_INDEX_FOR_GOLD)
find_program (GDB_ADD_INDEX_EXE NAMES "gdb-add-index" DOC "Path to gdb-add-index executable")
if (NOT GDB_ADD_INDEX_EXE)
set (USE_GDB_ADD_INDEX 0)
message (WARNING "Cannot add gdb index to binaries, because gold linker is used, but gdb-add-index executable not found.")
else()
set (USE_GDB_ADD_INDEX 1)
message (STATUS "gdb-add-index found: ${GDB_ADD_INDEX_EXE}")
endif()
endif ()
endif()
@ -302,11 +288,11 @@ if (ENABLE_BUILD_PROFILING)
endif ()
set (CMAKE_CXX_STANDARD 23)
set (CMAKE_CXX_EXTENSIONS ON) # Same as gnu++2a (ON) vs c++2a (OFF): https://cmake.org/cmake/help/latest/prop_tgt/CXX_EXTENSIONS.html
set (CMAKE_CXX_EXTENSIONS OFF)
set (CMAKE_CXX_STANDARD_REQUIRED ON)
set (CMAKE_C_STANDARD 11)
set (CMAKE_C_EXTENSIONS ON)
set (CMAKE_C_EXTENSIONS ON) # required by most contribs written in C
set (CMAKE_C_STANDARD_REQUIRED ON)
if (COMPILER_GCC OR COMPILER_CLANG)

View File

@ -50,15 +50,18 @@ endif ()
string (REGEX MATCHALL "[0-9]+" COMPILER_VERSION_LIST ${CMAKE_CXX_COMPILER_VERSION})
list (GET COMPILER_VERSION_LIST 0 COMPILER_VERSION_MAJOR)
# Example values: `lld-10`, `gold`.
# Example values: `lld-10`
option (LINKER_NAME "Linker name or full path")
if (LINKER_NAME MATCHES "gold")
message (FATAL_ERROR "Linking with gold is unsupported. Please use lld.")
endif ()
# s390x doesnt support lld
if (NOT ARCH_S390X)
if (NOT LINKER_NAME)
if (COMPILER_GCC)
find_program (LLD_PATH NAMES "ld.lld")
find_program (GOLD_PATH NAMES "ld.gold")
elseif (COMPILER_CLANG)
# llvm lld is a generic driver.
# Invoke ld.lld (Unix), ld64.lld (macOS), lld-link (Windows), wasm-ld (WebAssembly) instead
@ -67,13 +70,11 @@ if (NOT ARCH_S390X)
elseif (OS_DARWIN)
find_program (LLD_PATH NAMES "ld64.lld-${COMPILER_VERSION_MAJOR}" "ld64.lld")
endif ()
find_program (GOLD_PATH NAMES "ld.gold" "gold")
endif ()
endif()
endif()
if ((OS_LINUX OR OS_DARWIN) AND NOT LINKER_NAME)
# prefer lld linker over gold or ld on linux and macos
if (LLD_PATH)
if (COMPILER_GCC)
# GCC driver requires one of supported linker names like "lld".
@ -83,17 +84,6 @@ if ((OS_LINUX OR OS_DARWIN) AND NOT LINKER_NAME)
set (LINKER_NAME ${LLD_PATH})
endif ()
endif ()
if (NOT LINKER_NAME)
if (GOLD_PATH)
message (FATAL_ERROR "Linking with gold is unsupported. Please use lld.")
if (COMPILER_GCC)
set (LINKER_NAME "gold")
else ()
set (LINKER_NAME ${GOLD_PATH})
endif ()
endif ()
endif ()
endif ()
# TODO: allow different linker on != OS_LINUX

View File

@ -47,6 +47,8 @@ if [ "$is_tsan_build" -eq "0" ]; then
fi
export ZOOKEEPER_FAULT_INJECTION=1
# Initial run without S3 to create system.*_log on local file system to make it
# available for dump via clickhouse-local
configure
azurite-blob --blobHost 0.0.0.0 --blobPort 10000 --debug /azurite_log &

View File

@ -49,17 +49,19 @@ echo -e "Successfully cloned previous release tests$OK" >> /test_output/test_res
echo -e "Successfully downloaded previous release packages$OK" >> /test_output/test_results.tsv
# Make upgrade check more funny by forcing Ordinary engine for system database
mkdir /var/lib/clickhouse/metadata
mkdir -p /var/lib/clickhouse/metadata
echo "ATTACH DATABASE system ENGINE=Ordinary" > /var/lib/clickhouse/metadata/system.sql
# Install previous release packages
install_packages previous_release_package_folder
# Start server from previous release
# Let's enable S3 storage by default
export USE_S3_STORAGE_FOR_MERGE_TREE=1
# Previous version may not be ready for fault injections
export ZOOKEEPER_FAULT_INJECTION=0
# Initial run without S3 to create system.*_log on local file system to make it
# available for dump via clickhouse-local
configure
start
stop
mv /var/log/clickhouse-server/clickhouse-server.log /var/log/clickhouse-server/clickhouse-server.initial.log
# force_sync=false doesn't work correctly on some older versions
sudo cat /etc/clickhouse-server/config.d/keeper_port.xml \
@ -67,8 +69,6 @@ sudo cat /etc/clickhouse-server/config.d/keeper_port.xml \
> /etc/clickhouse-server/config.d/keeper_port.xml.tmp
sudo mv /etc/clickhouse-server/config.d/keeper_port.xml.tmp /etc/clickhouse-server/config.d/keeper_port.xml
configure
# But we still need default disk because some tables loaded only into it
sudo cat /etc/clickhouse-server/config.d/s3_storage_policy_by_default.xml \
| sed "s|<main><disk>s3</disk></main>|<main><disk>s3</disk></main><default><disk>default</disk></default>|" \
@ -76,6 +76,13 @@ sudo cat /etc/clickhouse-server/config.d/s3_storage_policy_by_default.xml \
sudo chown clickhouse /etc/clickhouse-server/config.d/s3_storage_policy_by_default.xml
sudo chgrp clickhouse /etc/clickhouse-server/config.d/s3_storage_policy_by_default.xml
# Start server from previous release
# Let's enable S3 storage by default
export USE_S3_STORAGE_FOR_MERGE_TREE=1
# Previous version may not be ready for fault injections
export ZOOKEEPER_FAULT_INJECTION=0
configure
start
clickhouse-client --query="SELECT 'Server version: ', version()"
@ -185,8 +192,6 @@ tar -chf /test_output/coordination.tar /var/lib/clickhouse/coordination ||:
collect_query_and_trace_logs
check_oom_in_dmesg
mv /var/log/clickhouse-server/stderr.log /test_output/
# Write check result into check_status.tsv

View File

@ -27,7 +27,7 @@ $ clickhouse-format --query "select number from numbers(10) where number%2 order
Result:
```text
```sql
SELECT number
FROM numbers(10)
WHERE number % 2
@ -54,7 +54,7 @@ $ clickhouse-format -n <<< "SELECT * FROM (SELECT 1 AS x UNION ALL SELECT 1 UNIO
Result:
```text
```sql
SELECT *
FROM
(
@ -75,7 +75,7 @@ $ clickhouse-format --seed Hello --obfuscate <<< "SELECT cost_first_screen BETWE
Result:
```text
```sql
SELECT treasury_mammoth_hazelnut BETWEEN nutmeg AND span, CASE WHEN chive >= 116 THEN switching ELSE ANYTHING END;
```
@ -87,7 +87,7 @@ $ clickhouse-format --seed World --obfuscate <<< "SELECT cost_first_screen BETWE
Result:
```text
```sql
SELECT horse_tape_summer BETWEEN folklore AND moccasins, CASE WHEN intestine >= 116 THEN nonconformist ELSE FORESTRY END;
```
@ -99,7 +99,7 @@ $ clickhouse-format --backslash <<< "SELECT * FROM (SELECT 1 AS x UNION ALL SELE
Result:
```text
```sql
SELECT * \
FROM \
( \

View File

@ -22,15 +22,15 @@ tuple(x, y, …)
## tupleElement
A function that allows getting a column from a tuple.
N is the column index, starting from 1. N must be a constant. N must be a strict postive integer no greater than the size of the tuple.
There is no cost to execute the function.
The function implements the operator `x.N`.
If the second argument is a number `n`, it is the column index, starting from 1. If the second argument is a string `s`, it represents the name of the element. Besides, we can provide the third optional argument, such that when index out of bounds or element for such name does not exist, the default value returned instead of throw exception. The second and third arguments if provided are always must be constant. There is no cost to execute the function.
The function implements the operator `x.n` and `x.s`.
**Syntax**
``` sql
tupleElement(tuple, n)
tupleElement(tuple, n/s [, default_value])
```
## untuple

View File

@ -16,7 +16,7 @@ ALTER TABLE [db].name [ON CLUSTER cluster] MODIFY COMMENT 'Comment'
**Examples**
Creating a table with comment (for more information, see the [COMMENT] clause(../../../sql-reference/statements/create/table.md#comment-table)):
Creating a table with comment (for more information, see the [COMMENT](../../../sql-reference/statements/create/table.md#comment-table) clause):
``` sql
CREATE TABLE table_with_comment

View File

@ -393,15 +393,15 @@ These codecs are designed to make compression more effective by using specific f
#### DoubleDelta
`DoubleDelta` — Calculates delta of deltas and writes it in compact binary form. Optimal compression rates are achieved for monotonic sequences with a constant stride, such as time series data. Can be used with any fixed-width type. Implements the algorithm used in Gorilla TSDB, extending it to support 64-bit types. Uses 1 extra bit for 32-byte deltas: 5-bit prefixes instead of 4-bit prefixes. For additional information, see Compressing Time Stamps in [Gorilla: A Fast, Scalable, In-Memory Time Series Database](http://www.vldb.org/pvldb/vol8/p1816-teller.pdf).
`DoubleDelta(bytes_size)` — Calculates delta of deltas and writes it in compact binary form. Possible `bytes_size` values: 1, 2, 4, 8, the default value is `sizeof(type)` if equal to 1, 2, 4, or 8. In all other cases, its 1. Optimal compression rates are achieved for monotonic sequences with a constant stride, such as time series data. Can be used with any fixed-width type. Implements the algorithm used in Gorilla TSDB, extending it to support 64-bit types. Uses 1 extra bit for 32-bit deltas: 5-bit prefixes instead of 4-bit prefixes. For additional information, see Compressing Time Stamps in [Gorilla: A Fast, Scalable, In-Memory Time Series Database](http://www.vldb.org/pvldb/vol8/p1816-teller.pdf).
#### Gorilla
`Gorilla` — Calculates XOR between current and previous floating point value and writes it in compact binary form. The smaller the difference between consecutive values is, i.e. the slower the values of the series changes, the better the compression rate. Implements the algorithm used in Gorilla TSDB, extending it to support 64-bit types. For additional information, see section 4.1 in [Gorilla: A Fast, Scalable, In-Memory Time Series Database](https://doi.org/10.14778/2824032.2824078).
`Gorilla(bytes_size)` — Calculates XOR between current and previous floating point value and writes it in compact binary form. The smaller the difference between consecutive values is, i.e. the slower the values of the series changes, the better the compression rate. Implements the algorithm used in Gorilla TSDB, extending it to support 64-bit types. Possible `bytes_size` values: 1, 2, 4, 8, the default value is `sizeof(type)` if equal to 1, 2, 4, or 8. In all other cases, its 1. For additional information, see section 4.1 in [Gorilla: A Fast, Scalable, In-Memory Time Series Database](https://doi.org/10.14778/2824032.2824078).
#### FPC
`FPC` - Repeatedly predicts the next floating point value in the sequence using the better of two predictors, then XORs the actual with the predicted value, and leading-zero compresses the result. Similar to Gorilla, this is efficient when storing a series of floating point values that change slowly. For 64-bit values (double), FPC is faster than Gorilla, for 32-bit values your mileage may vary. For a detailed description of the algorithm see [High Throughput Compression of Double-Precision Floating-Point Data](https://userweb.cs.txstate.edu/~burtscher/papers/dcc07a.pdf).
`FPC(level, float_size)` - Repeatedly predicts the next floating point value in the sequence using the better of two predictors, then XORs the actual with the predicted value, and leading-zero compresses the result. Similar to Gorilla, this is efficient when storing a series of floating point values that change slowly. For 64-bit values (double), FPC is faster than Gorilla, for 32-bit values your mileage may vary. Possible `level` values: 1-28, the default value is 12. Possible `float_size` values: 4, 8, the default value is `sizeof(type)` if type is Float. In all other cases, its 4. For a detailed description of the algorithm see [High Throughput Compression of Double-Precision Floating-Point Data](https://userweb.cs.txstate.edu/~burtscher/papers/dcc07a.pdf).
#### T64
@ -473,7 +473,7 @@ ENGINE = MergeTree ORDER BY x;
ClickHouse supports temporary tables which have the following characteristics:
- Temporary tables disappear when the session ends, including if the connection is lost.
- A temporary table uses the Memory engine only.
- A temporary table uses the Memory table engine when engine is not specified and it may use any table engine except Replicated and `KeeperMap` engines.
- The DB cant be specified for a temporary table. It is created outside of databases.
- Impossible to create a temporary table with distributed DDL query on all cluster servers (by using `ON CLUSTER`): this table exists only in the current session.
- If a temporary table has the same name as another one and a query specifies the table name without specifying the DB, the temporary table will be used.
@ -487,7 +487,7 @@ CREATE TEMPORARY TABLE [IF NOT EXISTS] table_name
name1 [type1] [DEFAULT|MATERIALIZED|ALIAS expr1],
name2 [type2] [DEFAULT|MATERIALIZED|ALIAS expr2],
...
)
) [ENGINE = engine]
```
In most cases, temporary tables are not created manually, but when using external data for a query, or for distributed `(GLOBAL) IN`. For more information, see the appropriate sections

View File

@ -105,7 +105,8 @@ Hierarchy of privileges:
- [CREATE](#grant-create)
- `CREATE DATABASE`
- `CREATE TABLE`
- `CREATE TEMPORARY TABLE`
- `CREATE ARBITRARY TEMPORARY TABLE`
- `CREATE TEMPORARY TABLE`
- `CREATE VIEW`
- `CREATE DICTIONARY`
- `CREATE FUNCTION`
@ -313,7 +314,8 @@ Allows executing [CREATE](../../sql-reference/statements/create/index.md) and [A
- `CREATE`. Level: `GROUP`
- `CREATE DATABASE`. Level: `DATABASE`
- `CREATE TABLE`. Level: `TABLE`
- `CREATE TEMPORARY TABLE`. Level: `GLOBAL`
- `CREATE ARBITRARY TEMPORARY TABLE`. Level: `GLOBAL`
- `CREATE TEMPORARY TABLE`. Level: `GLOBAL`
- `CREATE VIEW`. Level: `VIEW`
- `CREATE DICTIONARY`. Level: `DICTIONARY`

View File

@ -260,8 +260,8 @@ ENGINE = MergeTree()
Кодеки шифрования:
- `CODEC('AES-128-GCM-SIV')` — Зашифровывает данные с помощью AES-128 в режиме [RFC 8452](https://tools.ietf.org/html/rfc8452) GCM-SIV.
- `CODEC('AES-256-GCM-SIV')` — Зашифровывает данные с помощью AES-256 в режиме GCM-SIV.
- `CODEC('AES-128-GCM-SIV')` — Зашифровывает данные с помощью AES-128 в режиме [RFC 8452](https://tools.ietf.org/html/rfc8452) GCM-SIV.
- `CODEC('AES-256-GCM-SIV')` — Зашифровывает данные с помощью AES-256 в режиме GCM-SIV.
Эти кодеки используют фиксированный одноразовый ключ шифрования. Таким образом, это детерминированное шифрование. Оно совместимо с поддерживающими дедупликацию движками, в частности, [ReplicatedMergeTree](../../../engines/table-engines/mergetree-family/replication.md). Однако у шифрования имеется недостаток: если дважды зашифровать один и тот же блок данных, текст на выходе получится одинаковым, и злоумышленник, у которого есть доступ к диску, заметит эту эквивалентность (при этом доступа к содержимому он не получит).
@ -274,10 +274,10 @@ ENGINE = MergeTree()
**Пример**
```sql
CREATE TABLE mytable
CREATE TABLE mytable
(
x String Codec(AES_128_GCM_SIV)
)
)
ENGINE = MergeTree ORDER BY x;
```
@ -287,10 +287,10 @@ ENGINE = MergeTree ORDER BY x;
**Пример**
```sql
CREATE TABLE mytable
CREATE TABLE mytable
(
x String Codec(Delta, LZ4, AES_128_GCM_SIV)
)
)
ENGINE = MergeTree ORDER BY x;
```
@ -299,7 +299,7 @@ ENGINE = MergeTree ORDER BY x;
ClickHouse поддерживает временные таблицы со следующими характеристиками:
- Временные таблицы исчезают после завершения сессии, в том числе при обрыве соединения.
- Временная таблица использует только модуль памяти.
- Временная таблица использует движок таблиц Memory когда движок не указан и она может использовать любой движок таблиц за исключением движков Replicated и `KeeperMap`.
- Невозможно указать базу данных для временной таблицы. Она создается вне баз данных.
- Невозможно создать временную таблицу распределённым DDL запросом на всех серверах кластера (с опцией `ON CLUSTER`): такая таблица существует только в рамках существующей сессии.
- Если временная таблица имеет то же имя, что и некоторая другая, то, при упоминании в запросе без указания БД, будет использована временная таблица.
@ -313,7 +313,7 @@ CREATE TEMPORARY TABLE [IF NOT EXISTS] table_name
name1 [type1] [DEFAULT|MATERIALIZED|ALIAS expr1],
name2 [type2] [DEFAULT|MATERIALIZED|ALIAS expr2],
...
)
) [ENGINE = engine]
```
В большинстве случаев, временные таблицы создаются не вручную, а при использовании внешних данных для запроса, или при распределённом `(GLOBAL) IN`. Подробнее см. соответствующие разделы

View File

@ -107,7 +107,8 @@ GRANT SELECT(x,y) ON db.table TO john WITH GRANT OPTION
- [CREATE](#grant-create)
- `CREATE DATABASE`
- `CREATE TABLE`
- `CREATE TEMPORARY TABLE`
- `CREATE ARBITRARY TEMPORARY TABLE`
- `CREATE TEMPORARY TABLE`
- `CREATE VIEW`
- `CREATE DICTIONARY`
- `CREATE FUNCTION`
@ -314,7 +315,8 @@ GRANT INSERT(x,y) ON db.table TO john
- `CREATE`. Уровень: `GROUP`
- `CREATE DATABASE`. Уровень: `DATABASE`
- `CREATE TABLE`. Уровень: `TABLE`
- `CREATE TEMPORARY TABLE`. Уровень: `GLOBAL`
- `CREATE ARBITRARY TEMPORARY TABLE`. Уровень: `GLOBAL`
- `CREATE TEMPORARY TABLE`. Уровень: `GLOBAL`
- `CREATE VIEW`. Уровень: `VIEW`
- `CREATE DICTIONARY`. Уровень: `DICTIONARY`

View File

@ -18,7 +18,7 @@ Group=clickhouse
Restart=always
RestartSec=30
# Since ClickHouse is systemd aware default 1m30sec may not be enough
TimeoutStartSec=infinity
TimeoutStartSec=0
# %p is resolved to the systemd unit name
RuntimeDirectory=%p
ExecStart=/usr/bin/clickhouse-server --config=/etc/clickhouse-server/config.xml --pid-file=%t/%p/%p.pid

View File

@ -400,10 +400,6 @@ endif ()
add_custom_target (clickhouse-bundle ALL DEPENDS ${CLICKHOUSE_BUNDLE})
if (USE_GDB_ADD_INDEX)
add_custom_command(TARGET clickhouse POST_BUILD COMMAND ${GDB_ADD_INDEX_EXE} clickhouse COMMENT "Adding .gdb-index to clickhouse" VERBATIM)
endif()
if (USE_BINARY_HASH)
add_custom_command(TARGET clickhouse POST_BUILD COMMAND ./clickhouse hash-binary > hash && ${OBJCOPY_PATH} --add-section .clickhouse.hash=hash clickhouse COMMENT "Adding section '.clickhouse.hash' to clickhouse binary" VERBATIM)
endif()

View File

@ -66,6 +66,7 @@ int mainEntryClickHouseCompressor(int argc, char ** argv)
using namespace DB;
namespace po = boost::program_options;
bool print_stacktrace = false;
try
{
po::options_description desc = createOptionsDescription("Allowed options", getTerminalWidth());
@ -84,6 +85,7 @@ int mainEntryClickHouseCompressor(int argc, char ** argv)
("level", po::value<int>(), "compression level for codecs specified via flags")
("none", "use no compression instead of LZ4")
("stat", "print block statistics of compressed data")
("stacktrace", "print stacktrace of exception")
;
po::positional_options_description positional_desc;
@ -107,6 +109,7 @@ int mainEntryClickHouseCompressor(int argc, char ** argv)
bool use_deflate_qpl = options.count("deflate_qpl");
bool stat_mode = options.count("stat");
bool use_none = options.count("none");
print_stacktrace = options.count("stacktrace");
unsigned block_size = options["block-size"].as<unsigned>();
std::vector<std::string> codecs;
if (options.count("codec"))
@ -188,11 +191,12 @@ int mainEntryClickHouseCompressor(int argc, char ** argv)
/// Compression
CompressedWriteBuffer to(*wb, codec, block_size);
copyData(*rb, to);
to.finalize();
}
}
catch (...)
{
std::cerr << getCurrentExceptionMessage(true) << '\n';
std::cerr << getCurrentExceptionMessage(print_stacktrace) << '\n';
return getCurrentExceptionCode();
}

View File

@ -35,10 +35,6 @@ target_link_libraries(clickhouse-odbc-bridge PRIVATE
set_target_properties(clickhouse-odbc-bridge PROPERTIES RUNTIME_OUTPUT_DIRECTORY ..)
target_compile_options (clickhouse-odbc-bridge PRIVATE -Wno-reserved-id-macro -Wno-keyword-macro)
if (USE_GDB_ADD_INDEX)
add_custom_command(TARGET clickhouse-odbc-bridge POST_BUILD COMMAND ${GDB_ADD_INDEX_EXE} ../clickhouse-odbc-bridge COMMENT "Adding .gdb-index to clickhouse-odbc-bridge" VERBATIM)
endif()
if (SPLIT_DEBUG_SYMBOLS)
clickhouse_split_debug_symbols(TARGET clickhouse-odbc-bridge DESTINATION_DIR ${CMAKE_CURRENT_BINARY_DIR}/../${SPLITTED_DEBUG_SYMBOLS_DIR} BINARY_PATH ../clickhouse-odbc-bridge)
else()

View File

@ -15,6 +15,7 @@ enum class AccessType
/// node_type either specifies access type's level (GLOBAL/DATABASE/TABLE/DICTIONARY/VIEW/COLUMNS),
/// or specifies that the access type is a GROUP of other access types;
/// parent_group_name is the name of the group containing this access type (or NONE if there is no such group).
/// NOTE A parent group must be declared AFTER all its children.
#define APPLY_FOR_ACCESS_TYPES(M) \
M(SHOW_DATABASES, "", DATABASE, SHOW) /* allows to execute SHOW DATABASES, SHOW CREATE DATABASE, USE <database>;
implicitly enabled by any grant on the database */\
@ -86,8 +87,10 @@ enum class AccessType
M(CREATE_VIEW, "", VIEW, CREATE) /* allows to execute {CREATE|ATTACH} VIEW;
implicitly enabled by the grant CREATE_TABLE */\
M(CREATE_DICTIONARY, "", DICTIONARY, CREATE) /* allows to execute {CREATE|ATTACH} DICTIONARY */\
M(CREATE_TEMPORARY_TABLE, "", GLOBAL, CREATE) /* allows to create and manipulate temporary tables;
M(CREATE_TEMPORARY_TABLE, "", GLOBAL, CREATE_ARBITRARY_TEMPORARY_TABLE) /* allows to create and manipulate temporary tables;
implicitly enabled by the grant CREATE_TABLE on any table */ \
M(CREATE_ARBITRARY_TEMPORARY_TABLE, "", GLOBAL, CREATE) /* allows to create and manipulate temporary tables
with arbitrary table engine */\
M(CREATE_FUNCTION, "", GLOBAL, CREATE) /* allows to execute CREATE FUNCTION */ \
M(CREATE_NAMED_COLLECTION, "", GLOBAL, CREATE) /* allows to execute CREATE NAMED COLLECTION */ \
M(CREATE, "", GROUP, ALL) /* allows to execute {CREATE|ATTACH} */ \

View File

@ -81,6 +81,11 @@ namespace
if ((level == 0) && (max_flags_with_children & create_table))
res |= create_temporary_table;
/// CREATE TABLE (on any database/table) => CREATE_ARBITRARY_TEMPORARY_TABLE (global)
static const AccessFlags create_arbitrary_temporary_table = AccessType::CREATE_ARBITRARY_TEMPORARY_TABLE;
if ((level == 0) && (max_flags_with_children & create_table))
res |= create_arbitrary_temporary_table;
/// ALTER_TTL => ALTER_MATERIALIZE_TTL
static const AccessFlags alter_ttl = AccessType::ALTER_TTL;
static const AccessFlags alter_materialize_ttl = AccessType::ALTER_MATERIALIZE_TTL;

View File

@ -5727,8 +5727,27 @@ void QueryAnalyzer::resolveInterpolateColumnsNodeList(QueryTreeNodePtr & interpo
{
auto & interpolate_node_typed = interpolate_node->as<InterpolateNode &>();
auto * column_to_interpolate = interpolate_node_typed.getExpression()->as<IdentifierNode>();
if (!column_to_interpolate)
throw Exception(ErrorCodes::LOGICAL_ERROR, "INTERPOLATE can work only for indentifiers, but {} is found",
interpolate_node_typed.getExpression()->formatASTForErrorMessage());
auto column_to_interpolate_name = column_to_interpolate->getIdentifier().getFullName();
resolveExpressionNode(interpolate_node_typed.getExpression(), scope, false /*allow_lambda_expression*/, false /*allow_table_expression*/);
resolveExpressionNode(interpolate_node_typed.getInterpolateExpression(), scope, false /*allow_lambda_expression*/, false /*allow_table_expression*/);
bool is_column_constant = interpolate_node_typed.getExpression()->getNodeType() == QueryTreeNodeType::CONSTANT;
auto & interpolation_to_resolve = interpolate_node_typed.getInterpolateExpression();
IdentifierResolveScope interpolate_scope(interpolation_to_resolve, &scope /*parent_scope*/);
auto fake_column_node = std::make_shared<ColumnNode>(NameAndTypePair(column_to_interpolate_name, interpolate_node_typed.getExpression()->getResultType()), interpolate_node_typed.getExpression());
if (is_column_constant)
interpolate_scope.expression_argument_name_to_node.emplace(column_to_interpolate_name, fake_column_node);
resolveExpressionNode(interpolation_to_resolve, interpolate_scope, false /*allow_lambda_expression*/, false /*allow_table_expression*/);
if (is_column_constant)
interpolation_to_resolve = interpolation_to_resolve->cloneAndReplace(fake_column_node, interpolate_node_typed.getExpression());
}
}

View File

@ -11,13 +11,21 @@
M(ReplicatedSend, "Number of data parts being sent to replicas") \
M(ReplicatedChecks, "Number of data parts checking for consistency") \
M(BackgroundMergesAndMutationsPoolTask, "Number of active merges and mutations in an associated background pool") \
M(BackgroundMergesAndMutationsPoolSize, "Limit on number of active merges and mutations in an associated background pool") \
M(BackgroundFetchesPoolTask, "Number of active fetches in an associated background pool") \
M(BackgroundFetchesPoolSize, "Limit on number of simultaneous fetches in an associated background pool") \
M(BackgroundCommonPoolTask, "Number of active tasks in an associated background pool") \
M(BackgroundCommonPoolSize, "Limit on number of tasks in an associated background pool") \
M(BackgroundMovePoolTask, "Number of active tasks in BackgroundProcessingPool for moves") \
M(BackgroundMovePoolSize, "Limit on number of tasks in BackgroundProcessingPool for moves") \
M(BackgroundSchedulePoolTask, "Number of active tasks in BackgroundSchedulePool. This pool is used for periodic ReplicatedMergeTree tasks, like cleaning old data parts, altering data parts, replica re-initialization, etc.") \
M(BackgroundSchedulePoolSize, "Limit on number of tasks in BackgroundSchedulePool. This pool is used for periodic ReplicatedMergeTree tasks, like cleaning old data parts, altering data parts, replica re-initialization, etc.") \
M(BackgroundBufferFlushSchedulePoolTask, "Number of active tasks in BackgroundBufferFlushSchedulePool. This pool is used for periodic Buffer flushes") \
M(BackgroundBufferFlushSchedulePoolSize, "Limit on number of tasks in BackgroundBufferFlushSchedulePool") \
M(BackgroundDistributedSchedulePoolTask, "Number of active tasks in BackgroundDistributedSchedulePool. This pool is used for distributed sends that is done in background.") \
M(BackgroundDistributedSchedulePoolSize, "Limit on number of tasks in BackgroundDistributedSchedulePool") \
M(BackgroundMessageBrokerSchedulePoolTask, "Number of active tasks in BackgroundProcessingPool for message streaming") \
M(BackgroundMessageBrokerSchedulePoolSize, "Limit on number of tasks in BackgroundProcessingPool for message streaming") \
M(CacheDictionaryUpdateQueueBatches, "Number of 'batches' (a set of keys) in update queue in CacheDictionaries.") \
M(CacheDictionaryUpdateQueueKeys, "Exact number of keys in update queue in CacheDictionaries.") \
M(DiskSpaceReservedForMerge, "Disk space reserved for currently running background merges. It is slightly more than the total size of currently merging parts.") \

View File

@ -41,6 +41,16 @@ ZooKeeperLock::~ZooKeeperLock()
}
}
bool ZooKeeperLock::isLocked() const
{
return locked;
}
const std::string & ZooKeeperLock::getLockPath() const
{
return lock_path;
}
void ZooKeeperLock::unlock()
{
if (!locked)

View File

@ -37,6 +37,8 @@ public:
void unlock();
bool tryLock();
bool isLocked() const;
const std::string & getLockPath() const;
private:
zkutil::ZooKeeperPtr zookeeper;

View File

@ -193,7 +193,8 @@ void registerCodecDelta(CompressionCodecFactory & factory)
UInt8 method_code = static_cast<UInt8>(CompressionMethodByte::Delta);
auto codec_builder = [&](const ASTPtr & arguments, const IDataType * column_type) -> CompressionCodecPtr
{
UInt8 delta_bytes_size = 0;
/// Default bytes size is 1.
UInt8 delta_bytes_size = 1;
if (arguments && !arguments->children.empty())
{
@ -202,8 +203,8 @@ void registerCodecDelta(CompressionCodecFactory & factory)
const auto children = arguments->children;
const auto * literal = children[0]->as<ASTLiteral>();
if (!literal)
throw Exception(ErrorCodes::ILLEGAL_CODEC_PARAMETER, "Delta codec argument must be integer");
if (!literal || literal->value.getType() != Field::Types::Which::UInt64)
throw Exception(ErrorCodes::ILLEGAL_CODEC_PARAMETER, "Delta codec argument must be unsigned integer");
size_t user_bytes_size = literal->value.safeGet<UInt64>();
if (user_bytes_size != 1 && user_bytes_size != 2 && user_bytes_size != 4 && user_bytes_size != 8)

View File

@ -7,7 +7,7 @@
#include <Compression/CompressionFactory.h>
#include <base/unaligned.h>
#include <Parsers/IAST_fwd.h>
#include <Parsers/ASTIdentifier.h>
#include <Parsers/ASTLiteral.h>
#include <IO/ReadBufferFromMemory.h>
#include <IO/BitHelpers.h>
@ -31,7 +31,7 @@ namespace DB
/** DoubleDelta column codec implementation.
*
* Based on Gorilla paper: http://www.vldb.org/pvldb/vol8/p1816-teller.pdf, which was extended
* to support 64bit types. The drawback is 1 extra bit for 32-byte wide deltas: 5-bit prefix
* to support 64bit types. The drawback is 1 extra bit for 32-bit wide deltas: 5-bit prefix
* instead of 4-bit prefix.
*
* This codec is best used against monotonic integer sequences with constant (or almost constant)
@ -145,6 +145,8 @@ namespace ErrorCodes
extern const int CANNOT_COMPRESS;
extern const int CANNOT_DECOMPRESS;
extern const int BAD_ARGUMENTS;
extern const int ILLEGAL_SYNTAX_FOR_CODEC_TYPE;
extern const int ILLEGAL_CODEC_PARAMETER;
}
namespace
@ -549,10 +551,28 @@ void registerCodecDoubleDelta(CompressionCodecFactory & factory)
factory.registerCompressionCodecWithType("DoubleDelta", method_code,
[&](const ASTPtr & arguments, const IDataType * column_type) -> CompressionCodecPtr
{
if (arguments)
throw Exception(ErrorCodes::BAD_ARGUMENTS, "Codec DoubleDelta does not accept any arguments");
/// Default bytes size is 1.
UInt8 data_bytes_size = 1;
if (arguments && !arguments->children.empty())
{
if (arguments->children.size() > 1)
throw Exception(ErrorCodes::ILLEGAL_SYNTAX_FOR_CODEC_TYPE, "DoubleDelta codec must have 1 parameter, given {}", arguments->children.size());
const auto children = arguments->children;
const auto * literal = children[0]->as<ASTLiteral>();
if (!literal || literal->value.getType() != Field::Types::Which::UInt64)
throw Exception(ErrorCodes::ILLEGAL_CODEC_PARAMETER, "DoubleDelta codec argument must be unsigned integer");
size_t user_bytes_size = literal->value.safeGet<UInt64>();
if (user_bytes_size != 1 && user_bytes_size != 2 && user_bytes_size != 4 && user_bytes_size != 8)
throw Exception(ErrorCodes::ILLEGAL_CODEC_PARAMETER, "Argument value for DoubleDelta codec can be 1, 2, 4 or 8, given {}", user_bytes_size);
data_bytes_size = static_cast<UInt8>(user_bytes_size);
}
else if (column_type)
{
data_bytes_size = getDataBytesSize(column_type);
}
UInt8 data_bytes_size = column_type ? getDataBytesSize(column_type) : 0;
return std::make_shared<CompressionCodecDoubleDelta>(data_bytes_size);
});
}

View File

@ -109,28 +109,42 @@ void registerCodecFPC(CompressionCodecFactory & factory)
auto method_code = static_cast<UInt8>(CompressionMethodByte::FPC);
auto codec_builder = [&](const ASTPtr & arguments, const IDataType * column_type) -> CompressionCodecPtr
{
UInt8 float_width = 0;
/// Set default float width to 4.
UInt8 float_width = 4;
if (column_type != nullptr)
float_width = getFloatBytesSize(*column_type);
UInt8 level = CompressionCodecFPC::DEFAULT_COMPRESSION_LEVEL;
if (arguments && !arguments->children.empty())
{
if (arguments->children.size() > 1)
if (arguments->children.size() > 2)
{
throw Exception(ErrorCodes::ILLEGAL_SYNTAX_FOR_CODEC_TYPE,
"FPC codec must have 1 parameter, given {}", arguments->children.size());
"FPC codec must have from 0 to 2 parameters, given {}", arguments->children.size());
}
const auto * literal = arguments->children.front()->as<ASTLiteral>();
if (!literal)
throw Exception(ErrorCodes::ILLEGAL_CODEC_PARAMETER, "FPC codec argument must be integer");
if (!literal || literal->value.getType() != Field::Types::Which::UInt64)
throw Exception(ErrorCodes::ILLEGAL_CODEC_PARAMETER, "FPC codec argument must be unsigned integer");
level = literal->value.safeGet<UInt8>();
if (level < 1 || level > CompressionCodecFPC::MAX_COMPRESSION_LEVEL)
throw Exception(ErrorCodes::ILLEGAL_CODEC_PARAMETER, "FPC codec level must be between {} and {}",
1, static_cast<int>(CompressionCodecFPC::MAX_COMPRESSION_LEVEL));
if (arguments->children.size() == 2)
{
literal = arguments->children[1]->as<ASTLiteral>();
if (!literal || !isInt64OrUInt64FieldType(literal->value.getType()))
throw Exception(ErrorCodes::ILLEGAL_CODEC_PARAMETER, "FPC codec argument must be unsigned integer");
size_t user_float_width = literal->value.safeGet<UInt64>();
if (user_float_width != 4 && user_float_width != 8)
throw Exception(ErrorCodes::ILLEGAL_CODEC_PARAMETER, "Float size for FPC codec can be 4 or 8, given {}", user_float_width);
float_width = static_cast<UInt8>(user_float_width);
}
}
return std::make_shared<CompressionCodecFPC>(float_width, level);
};
factory.registerCompressionCodecWithType("FPC", method_code, codec_builder);

View File

@ -7,6 +7,7 @@
#include <Compression/CompressionFactory.h>
#include <base/unaligned.h>
#include <Parsers/IAST_fwd.h>
#include <Parsers/ASTLiteral.h>
#include <IO/WriteHelpers.h>
#include <IO/ReadBufferFromMemory.h>
#include <IO/BitHelpers.h>
@ -134,6 +135,8 @@ namespace ErrorCodes
extern const int CANNOT_COMPRESS;
extern const int CANNOT_DECOMPRESS;
extern const int BAD_ARGUMENTS;
extern const int ILLEGAL_SYNTAX_FOR_CODEC_TYPE;
extern const int ILLEGAL_CODEC_PARAMETER;
}
namespace
@ -445,10 +448,28 @@ void registerCodecGorilla(CompressionCodecFactory & factory)
UInt8 method_code = static_cast<UInt8>(CompressionMethodByte::Gorilla);
auto codec_builder = [&](const ASTPtr & arguments, const IDataType * column_type) -> CompressionCodecPtr
{
if (arguments)
throw Exception(ErrorCodes::BAD_ARGUMENTS, "Codec Gorilla does not accept any arguments");
/// Default bytes size is 1
UInt8 data_bytes_size = 1;
if (arguments && !arguments->children.empty())
{
if (arguments->children.size() > 1)
throw Exception(ErrorCodes::ILLEGAL_SYNTAX_FOR_CODEC_TYPE, "Gorilla codec must have 1 parameter, given {}", arguments->children.size());
const auto children = arguments->children;
const auto * literal = children[0]->as<ASTLiteral>();
if (!literal || literal->value.getType() != Field::Types::Which::UInt64)
throw Exception(ErrorCodes::ILLEGAL_CODEC_PARAMETER, "Gorilla codec argument must be unsigned integer");
size_t user_bytes_size = literal->value.safeGet<UInt64>();
if (user_bytes_size != 1 && user_bytes_size != 2 && user_bytes_size != 4 && user_bytes_size != 8)
throw Exception(ErrorCodes::ILLEGAL_CODEC_PARAMETER, "Argument value for Gorilla codec can be 1, 2, 4 or 8, given {}", user_bytes_size);
data_bytes_size = static_cast<UInt8>(user_bytes_size);
}
else if (column_type)
{
data_bytes_size = getDataBytesSize(column_type);
}
UInt8 data_bytes_size = column_type ? getDataBytesSize(column_type) : 0;
return std::make_shared<CompressionCodecGorilla>(data_bytes_size);
};
factory.registerCompressionCodecWithType("Gorilla", method_code, codec_builder);

View File

@ -33,7 +33,8 @@ public:
Bit
};
CompressionCodecT64(TypeIndex type_idx_, Variant variant_);
// type_idx_ is required for compression, but not for decompression.
CompressionCodecT64(std::optional<TypeIndex> type_idx_, Variant variant_);
uint8_t getMethodByte() const override;
@ -53,7 +54,7 @@ protected:
bool isGenericCompression() const override { return false; }
private:
TypeIndex type_idx;
std::optional<TypeIndex> type_idx;
Variant variant;
};
@ -91,9 +92,12 @@ enum class MagicNumber : uint8_t
IPv4 = 21,
};
MagicNumber serializeTypeId(TypeIndex type_id)
MagicNumber serializeTypeId(std::optional<TypeIndex> type_id)
{
switch (type_id)
if (!type_id)
throw Exception(ErrorCodes::CANNOT_COMPRESS, "T64 codec doesn't support compression without information about column type");
switch (*type_id)
{
case TypeIndex::UInt8: return MagicNumber::UInt8;
case TypeIndex::UInt16: return MagicNumber::UInt16;
@ -115,7 +119,7 @@ MagicNumber serializeTypeId(TypeIndex type_id)
break;
}
throw Exception(ErrorCodes::LOGICAL_ERROR, "Type is not supported by T64 codec: {}", static_cast<UInt32>(type_id));
throw Exception(ErrorCodes::LOGICAL_ERROR, "Type is not supported by T64 codec: {}", static_cast<UInt32>(*type_id));
}
TypeIndex deserializeTypeId(uint8_t serialized_type_id)
@ -632,7 +636,7 @@ UInt32 CompressionCodecT64::doCompressData(const char * src, UInt32 src_size, ch
memcpy(dst, &cookie, 1);
dst += 1;
switch (baseType(type_idx))
switch (baseType(*type_idx))
{
case TypeIndex::Int8:
return 1 + compressData<Int8>(src, src_size, dst, variant);
@ -699,7 +703,7 @@ uint8_t CompressionCodecT64::getMethodByte() const
return codecId();
}
CompressionCodecT64::CompressionCodecT64(TypeIndex type_idx_, Variant variant_)
CompressionCodecT64::CompressionCodecT64(std::optional<TypeIndex> type_idx_, Variant variant_)
: type_idx(type_idx_)
, variant(variant_)
{
@ -712,7 +716,7 @@ CompressionCodecT64::CompressionCodecT64(TypeIndex type_idx_, Variant variant_)
void CompressionCodecT64::updateHash(SipHash & hash) const
{
getCodecDesc()->updateTreeHash(hash);
hash.update(type_idx);
hash.update(type_idx.value_or(TypeIndex::Nothing));
hash.update(variant);
}
@ -742,9 +746,14 @@ void registerCodecT64(CompressionCodecFactory & factory)
throw Exception(ErrorCodes::ILLEGAL_CODEC_PARAMETER, "Wrong modification for T64: {}", name);
}
auto type_idx = typeIdx(type);
if (type && type_idx == TypeIndex::Nothing)
throw Exception(ErrorCodes::ILLEGAL_SYNTAX_FOR_CODEC_TYPE, "T64 codec is not supported for specified type {}", type->getName());
std::optional<TypeIndex> type_idx;
if (type)
{
type_idx = typeIdx(type);
if (type_idx == TypeIndex::Nothing)
throw Exception(
ErrorCodes::ILLEGAL_SYNTAX_FOR_CODEC_TYPE, "T64 codec is not supported for specified type {}", type->getName());
}
return std::make_shared<CompressionCodecT64>(type_idx, variant);
};

View File

@ -149,8 +149,9 @@ Coordination::WatchCallback BackgroundSchedulePoolTaskInfo::getWatchCallback()
}
BackgroundSchedulePool::BackgroundSchedulePool(size_t size_, CurrentMetrics::Metric tasks_metric_, const char *thread_name_)
BackgroundSchedulePool::BackgroundSchedulePool(size_t size_, CurrentMetrics::Metric tasks_metric_, CurrentMetrics::Metric size_metric_, const char *thread_name_)
: tasks_metric(tasks_metric_)
, size_metric(size_metric_, size_)
, thread_name(thread_name_)
{
LOG_INFO(&Poco::Logger::get("BackgroundSchedulePool/" + thread_name), "Create BackgroundSchedulePool with {} threads", size_);
@ -177,6 +178,8 @@ void BackgroundSchedulePool::increaseThreadsCount(size_t new_threads_count)
threads.resize(new_threads_count);
for (size_t i = old_threads_count; i < new_threads_count; ++i)
threads[i] = ThreadFromGlobalPoolNoTracingContextPropagation([this] { threadFunction(); });
size_metric.changeTo(new_threads_count);
}

View File

@ -54,7 +54,7 @@ public:
void increaseThreadsCount(size_t new_threads_count);
/// thread_name_ cannot be longer then 13 bytes (2 bytes is reserved for "/D" suffix for delayExecutionThreadFunction())
BackgroundSchedulePool(size_t size_, CurrentMetrics::Metric tasks_metric_, const char *thread_name_);
BackgroundSchedulePool(size_t size_, CurrentMetrics::Metric tasks_metric_, CurrentMetrics::Metric size_metric_, const char *thread_name_);
~BackgroundSchedulePool();
private:
@ -91,6 +91,7 @@ private:
DelayedTasks delayed_tasks;
CurrentMetrics::Metric tasks_metric;
CurrentMetrics::Increment size_metric;
std::string thread_name;
};

View File

@ -50,7 +50,7 @@ class IColumn;
M(UInt64, max_download_buffer_size, 10*1024*1024, "The maximal size of buffer for parallel downloading (e.g. for URL engine) per each thread.", 0) \
M(UInt64, max_read_buffer_size, DBMS_DEFAULT_BUFFER_SIZE, "The maximum size of the buffer to read from the filesystem.", 0) \
M(UInt64, max_distributed_connections, 1024, "The maximum number of connections for distributed processing of one query (should be greater than max_threads).", 0) \
M(UInt64, max_query_size, DBMS_DEFAULT_MAX_QUERY_SIZE, "Which part of the query can be read into RAM for parsing (the remaining data for INSERT, if any, is read later)", 0) \
M(UInt64, max_query_size, DBMS_DEFAULT_MAX_QUERY_SIZE, "The maximum number of bytes of a query string parsed by the SQL parser. Data in the VALUES clause of INSERT queries is processed by a separate stream parser (that consumes O(1) RAM) and not affected by this restriction.", 0) \
M(UInt64, interactive_delay, 100000, "The interval in microseconds to check if the request is cancelled, and to send progress info.", 0) \
M(Seconds, connect_timeout, DBMS_DEFAULT_CONNECT_TIMEOUT_SEC, "Connection timeout if there are no replicas.", 0) \
M(Milliseconds, connect_timeout_with_failover_ms, 50, "Connection timeout for selecting first healthy replica.", 0) \
@ -718,6 +718,7 @@ class IColumn;
M(Float, insert_keeper_fault_injection_probability, 0.0f, "Approximate probability of failure for a keeper request during insert. Valid value is in interval [0.0f, 1.0f]", 0) \
M(UInt64, insert_keeper_fault_injection_seed, 0, "0 - random seed, otherwise the setting value", 0) \
M(Bool, force_aggregation_in_order, false, "Force use of aggregation in order on remote nodes during distributed aggregation. PLEASE, NEVER CHANGE THIS SETTING VALUE MANUALLY!", IMPORTANT) \
M(UInt64, http_max_request_param_data_size, 10_MiB, "Limit on size of request data used as a query parameter in predefined HTTP requests.", 0) \
// End of COMMON_SETTINGS
// Please add settings related to formats into the FORMAT_FACTORY_SETTINGS and move obsolete settings to OBSOLETE_SETTINGS.

View File

@ -26,7 +26,12 @@ namespace ErrorCodes
DatabaseMemory::DatabaseMemory(const String & name_, ContextPtr context_)
: DatabaseWithOwnTablesBase(name_, "DatabaseMemory(" + name_ + ")", context_)
, data_path("data/" + escapeForFileName(database_name) + "/")
{}
{
/// Temporary database should not have any data on the moment of its creation
/// In case of sudden server shutdown remove database folder of temporary database
if (name_ == DatabaseCatalog::TEMPORARY_DATABASE)
removeDataPath(context_);
}
void DatabaseMemory::createTable(
ContextPtr /*context*/,
@ -71,8 +76,7 @@ void DatabaseMemory::dropTable(
if (table->storesDataOnDisk())
{
assert(getDatabaseName() != DatabaseCatalog::TEMPORARY_DATABASE);
fs::path table_data_dir{getTableDataPath(table_name)};
fs::path table_data_dir{fs::path{getContext()->getPath()} / getTableDataPath(table_name)};
if (fs::exists(table_data_dir))
fs::remove_all(table_data_dir);
}
@ -80,7 +84,6 @@ void DatabaseMemory::dropTable(
catch (...)
{
std::lock_guard lock{mutex};
assert(database_name != DatabaseCatalog::TEMPORARY_DATABASE);
attachTableUnlocked(table_name, table);
throw;
}
@ -129,10 +132,15 @@ UUID DatabaseMemory::tryGetTableUUID(const String & table_name) const
return UUIDHelpers::Nil;
}
void DatabaseMemory::removeDataPath(ContextPtr local_context)
{
std::filesystem::remove_all(local_context->getPath() + data_path);
}
void DatabaseMemory::drop(ContextPtr local_context)
{
/// Remove data on explicit DROP DATABASE
std::filesystem::remove_all(local_context->getPath() + data_path);
removeDataPath(local_context);
}
void DatabaseMemory::alterTable(ContextPtr local_context, const StorageID & table_id, const StorageInMemoryMetadata & metadata)

View File

@ -53,6 +53,8 @@ public:
std::vector<std::pair<ASTPtr, StoragePtr>> getTablesForBackup(const FilterByNameFunction & filter, const ContextPtr & local_context) const override;
private:
void removeDataPath(ContextPtr local_context);
const String data_path;
using NameToASTCreate = std::unordered_map<String, ASTPtr>;
NameToASTCreate create_queries TSA_GUARDED_BY(mutex);

View File

@ -83,4 +83,24 @@ DiskConfigurationPtr getDiskConfigurationFromAST(const std::string & root_name,
return conf;
}
ASTs convertDiskConfigurationToAST(const Poco::Util::AbstractConfiguration & configuration, const std::string & config_path)
{
ASTs result;
Poco::Util::AbstractConfiguration::Keys keys;
configuration.keys(config_path, keys);
for (const auto & key : keys)
{
result.push_back(
makeASTFunction(
"equals",
std::make_shared<ASTIdentifier>(key),
std::make_shared<ASTLiteral>(configuration.getString(config_path + "." + key))));
}
return result;
}
}

View File

@ -25,4 +25,12 @@ using DiskConfigurationPtr = Poco::AutoPtr<Poco::Util::AbstractConfiguration>;
*/
DiskConfigurationPtr getDiskConfigurationFromAST(const std::string & root_name, const ASTs & disk_args, ContextPtr context);
/// The same as above function, but return XML::Document for easier modification of result configuration.
[[ maybe_unused ]] Poco::AutoPtr<Poco::XML::Document> getDiskConfigurationFromASTImpl(const std::string & root_name, const ASTs & disk_args, ContextPtr context);
/*
* A reverse function.
*/
[[ maybe_unused ]] ASTs convertDiskConfigurationToAST(const Poco::Util::AbstractConfiguration & configuration, const std::string & config_path);
}

View File

@ -803,7 +803,7 @@ struct ConvertImpl<DataTypeEnum<FieldType>, DataTypeNumber<FieldType>, Name, Con
}
};
static ColumnUInt8::MutablePtr copyNullMap(ColumnPtr col)
static inline ColumnUInt8::MutablePtr copyNullMap(ColumnPtr col)
{
ColumnUInt8::MutablePtr null_map = nullptr;
if (const auto * col_null = checkAndGetColumn<ColumnNullable>(col.get()))

View File

@ -10,6 +10,7 @@ namespace DB
namespace ErrorCodes
{
extern const int ATTEMPT_TO_READ_AFTER_EOF;
extern const int CANNOT_READ_ALL_DATA;
}
namespace
@ -91,6 +92,13 @@ void copyData(ReadBuffer & from, WriteBuffer & to, size_t bytes, std::function<v
copyDataImpl(from, to, true, bytes, cancellation_hook, nullptr);
}
void copyDataMaxBytes(ReadBuffer & from, WriteBuffer & to, size_t max_bytes)
{
copyDataImpl(from, to, false, max_bytes, nullptr, nullptr);
if (!from.eof())
throw Exception(ErrorCodes::CANNOT_READ_ALL_DATA, "Cannot read all data, max readable size reached.");
}
void copyDataWithThrottler(ReadBuffer & from, WriteBuffer & to, const std::atomic<int> & is_cancelled, ThrottlerPtr throttler)
{
copyDataImpl(from, to, false, std::numeric_limits<size_t>::max(), &is_cancelled, throttler);

View File

@ -27,6 +27,9 @@ void copyData(ReadBuffer & from, WriteBuffer & to, size_t bytes, const std::atom
void copyData(ReadBuffer & from, WriteBuffer & to, std::function<void()> cancellation_hook);
void copyData(ReadBuffer & from, WriteBuffer & to, size_t bytes, std::function<void()> cancellation_hook);
/// Copies at most `max_bytes` bytes from ReadBuffer to WriteBuffer. If there are more bytes, then throws an exception.
void copyDataMaxBytes(ReadBuffer & from, WriteBuffer & to, size_t max_bytes);
/// Same as above but also use throttler to limit maximum speed
void copyDataWithThrottler(ReadBuffer & from, WriteBuffer & to, const std::atomic<int> & is_cancelled, ThrottlerPtr throttler);
void copyDataWithThrottler(ReadBuffer & from, WriteBuffer & to, size_t bytes, const std::atomic<int> & is_cancelled, ThrottlerPtr throttler);

View File

@ -129,13 +129,21 @@ namespace CurrentMetrics
{
extern const Metric ContextLockWait;
extern const Metric BackgroundMovePoolTask;
extern const Metric BackgroundMovePoolSize;
extern const Metric BackgroundSchedulePoolTask;
extern const Metric BackgroundSchedulePoolSize;
extern const Metric BackgroundBufferFlushSchedulePoolTask;
extern const Metric BackgroundBufferFlushSchedulePoolSize;
extern const Metric BackgroundDistributedSchedulePoolTask;
extern const Metric BackgroundDistributedSchedulePoolSize;
extern const Metric BackgroundMessageBrokerSchedulePoolTask;
extern const Metric BackgroundMessageBrokerSchedulePoolSize;
extern const Metric BackgroundMergesAndMutationsPoolTask;
extern const Metric BackgroundMergesAndMutationsPoolSize;
extern const Metric BackgroundFetchesPoolTask;
extern const Metric BackgroundFetchesPoolSize;
extern const Metric BackgroundCommonPoolTask;
extern const Metric BackgroundCommonPoolSize;
}
namespace DB
@ -2175,6 +2183,7 @@ BackgroundSchedulePool & Context::getBufferFlushSchedulePool() const
shared->buffer_flush_schedule_pool = std::make_unique<BackgroundSchedulePool>(
background_buffer_flush_schedule_pool_size,
CurrentMetrics::BackgroundBufferFlushSchedulePoolTask,
CurrentMetrics::BackgroundBufferFlushSchedulePoolSize,
"BgBufSchPool");
}
@ -2226,6 +2235,7 @@ BackgroundSchedulePool & Context::getSchedulePool() const
shared->schedule_pool = std::make_unique<BackgroundSchedulePool>(
background_schedule_pool_size,
CurrentMetrics::BackgroundSchedulePoolTask,
CurrentMetrics::BackgroundSchedulePoolSize,
"BgSchPool");
}
@ -2246,6 +2256,7 @@ BackgroundSchedulePool & Context::getDistributedSchedulePool() const
shared->distributed_schedule_pool = std::make_unique<BackgroundSchedulePool>(
background_distributed_schedule_pool_size,
CurrentMetrics::BackgroundDistributedSchedulePoolTask,
CurrentMetrics::BackgroundDistributedSchedulePoolSize,
"BgDistSchPool");
}
@ -2266,6 +2277,7 @@ BackgroundSchedulePool & Context::getMessageBrokerSchedulePool() const
shared->message_broker_schedule_pool = std::make_unique<BackgroundSchedulePool>(
background_message_broker_schedule_pool_size,
CurrentMetrics::BackgroundMessageBrokerSchedulePoolTask,
CurrentMetrics::BackgroundMessageBrokerSchedulePoolSize,
"BgMBSchPool");
}
@ -3826,6 +3838,7 @@ void Context::initializeBackgroundExecutorsIfNeeded()
/*max_threads_count*/background_pool_size,
/*max_tasks_count*/background_pool_size * background_merges_mutations_concurrency_ratio,
CurrentMetrics::BackgroundMergesAndMutationsPoolTask,
CurrentMetrics::BackgroundMergesAndMutationsPoolSize,
background_merges_mutations_scheduling_policy
);
LOG_INFO(shared->log, "Initialized background executor for merges and mutations with num_threads={}, num_tasks={}, scheduling_policy={}",
@ -3836,7 +3849,8 @@ void Context::initializeBackgroundExecutorsIfNeeded()
"Move",
background_move_pool_size,
background_move_pool_size,
CurrentMetrics::BackgroundMovePoolTask
CurrentMetrics::BackgroundMovePoolTask,
CurrentMetrics::BackgroundMovePoolSize
);
LOG_INFO(shared->log, "Initialized background executor for move operations with num_threads={}, num_tasks={}", background_move_pool_size, background_move_pool_size);
@ -3845,7 +3859,8 @@ void Context::initializeBackgroundExecutorsIfNeeded()
"Fetch",
background_fetches_pool_size,
background_fetches_pool_size,
CurrentMetrics::BackgroundFetchesPoolTask
CurrentMetrics::BackgroundFetchesPoolTask,
CurrentMetrics::BackgroundFetchesPoolSize
);
LOG_INFO(shared->log, "Initialized background executor for fetches with num_threads={}, num_tasks={}", background_fetches_pool_size, background_fetches_pool_size);
@ -3854,7 +3869,8 @@ void Context::initializeBackgroundExecutorsIfNeeded()
"Common",
background_common_pool_size,
background_common_pool_size,
CurrentMetrics::BackgroundCommonPoolTask
CurrentMetrics::BackgroundCommonPoolTask,
CurrentMetrics::BackgroundCommonPoolSize
);
LOG_INFO(shared->log, "Initialized background executor for common operations (e.g. clearing old parts) with num_threads={}, num_tasks={}", background_common_pool_size, background_common_pool_size);

View File

@ -121,9 +121,16 @@ TemporaryTableHolder::~TemporaryTableHolder()
{
if (id != UUIDHelpers::Nil)
{
auto table = getTable();
table->flushAndShutdown();
temporary_tables->dropTable(getContext(), "_tmp_" + toString(id));
try
{
auto table = getTable();
table->flushAndShutdown();
temporary_tables->dropTable(getContext(), "_tmp_" + toString(id));
}
catch (...)
{
tryLogCurrentException("TemporaryTableHolder");
}
}
}
@ -140,7 +147,6 @@ StoragePtr TemporaryTableHolder::getTable() const
return table;
}
void DatabaseCatalog::initializeAndLoadTemporaryDatabase()
{
drop_delay_sec = getContext()->getConfigRef().getInt("database_atomic_delay_before_drop_table_sec", default_drop_delay_sec);

View File

@ -235,6 +235,21 @@ public:
void checkTableCanBeRemovedOrRenamed(const StorageID & table_id, bool check_referential_dependencies, bool check_loading_dependencies, bool is_drop_database = false) const;
struct TableMarkedAsDropped
{
StorageID table_id = StorageID::createEmpty();
StoragePtr table;
String metadata_path;
time_t drop_time{};
};
using TablesMarkedAsDropped = std::list<TableMarkedAsDropped>;
TablesMarkedAsDropped getTablesMarkedDropped()
{
std::lock_guard lock(tables_marked_dropped_mutex);
return tables_marked_dropped;
}
private:
// The global instance of database catalog. unique_ptr is to allow
// deferred initialization. Thought I'd use std::optional, but I can't
@ -263,15 +278,6 @@ private:
return uuid.toUnderType().items[0] >> (64 - bits_for_first_level);
}
struct TableMarkedAsDropped
{
StorageID table_id = StorageID::createEmpty();
StoragePtr table;
String metadata_path;
time_t drop_time{};
};
using TablesMarkedAsDropped = std::list<TableMarkedAsDropped>;
void dropTableDataTask();
void dropTableFinally(const TableMarkedAsDropped & table);

View File

@ -848,6 +848,23 @@ std::string ExpressionActions::dumpActions() const
return ss.str();
}
void ExpressionActions::describeActions(WriteBuffer & out, std::string_view prefix) const
{
bool first = true;
for (const auto & action : actions)
{
out << prefix << (first ? "Actions: " : " ");
out << action.toString() << '\n';
first = false;
}
out << prefix << "Positions:";
for (const auto & pos : result_positions)
out << ' ' << pos;
out << '\n';
}
JSONBuilder::ItemPtr ExpressionActions::toTree() const
{
auto inputs_array = std::make_unique<JSONBuilder::JSONArray>();

View File

@ -109,6 +109,9 @@ public:
const Block & getSampleBlock() const { return sample_block; }
std::string dumpActions() const;
void describeActions(WriteBuffer & out, std::string_view prefix) const;
JSONBuilder::ItemPtr toTree() const;
static NameAndTypePair getSmallestColumn(const NamesAndTypesList & columns);

View File

@ -107,39 +107,4 @@ void FillingRow::initFromDefaults(size_t from_pos)
row[i] = getFillDescription(i).fill_from;
}
void insertFromFillingRow(MutableColumns & filling_columns, MutableColumns & interpolate_columns, MutableColumns & other_columns,
const FillingRow & filling_row, const Block & interpolate_block)
{
for (size_t i = 0, size = filling_columns.size(); i < size; ++i)
{
if (filling_row[i].isNull())
{
filling_columns[i]->insertDefault();
}
else
{
filling_columns[i]->insert(filling_row[i]);
}
}
if (size_t size = interpolate_block.columns())
{
Columns columns = interpolate_block.getColumns();
for (size_t i = 0; i < size; ++i)
interpolate_columns[i]->insertFrom(*columns[i]->convertToFullColumnIfConst(), 0);
}
else
for (const auto & interpolate_column : interpolate_columns)
interpolate_column->insertDefault();
for (const auto & other_column : other_columns)
other_column->insertDefault();
}
void copyRowFromColumns(MutableColumns & dest, const Columns & source, size_t row_num)
{
for (size_t i = 0, size = source.size(); i < size; ++i)
dest[i]->insertFrom(*source[i], row_num);
}
}

View File

@ -39,8 +39,4 @@ private:
SortDescription sort_description;
};
void insertFromFillingRow(MutableColumns & filling_columns, MutableColumns & interpolate_columns, MutableColumns & other_columns,
const FillingRow & filling_row, const Block & interpolate_block);
void copyRowFromColumns(MutableColumns & dest, const Columns & source, size_t row_num);
}

View File

@ -495,7 +495,7 @@ size_t HashJoin::getTotalByteCount() const
if (!data)
return 0;
#ifdef NDEBUG
#ifndef NDEBUG
size_t debug_blocks_allocated_size = 0;
for (const auto & block : data->blocks)
debug_blocks_allocated_size += block.allocatedBytes();

View File

@ -940,23 +940,32 @@ void InterpreterCreateQuery::setEngine(ASTCreateQuery & create) const
if (create.temporary)
{
if (create.storage && create.storage->engine && create.storage->engine->name != "Memory")
throw Exception(ErrorCodes::INCORRECT_QUERY, "Temporary tables can only be created with ENGINE = Memory, not {}",
create.storage->engine->name);
/// It's possible if some part of storage definition (such as PARTITION BY) is specified, but ENGINE is not.
/// It makes sense when default_table_engine setting is used, but not for temporary tables.
/// For temporary tables we ignore this setting to allow CREATE TEMPORARY TABLE query without specifying ENGINE
/// even if setting is set to MergeTree or something like that (otherwise MergeTree will be substituted and query will fail).
if (create.storage && !create.storage->engine)
throw Exception(ErrorCodes::INCORRECT_QUERY, "Invalid storage definition for temporary table: must be either ENGINE = Memory or empty");
auto engine_ast = std::make_shared<ASTFunction>();
engine_ast->name = "Memory";
engine_ast->no_empty_args = true;
auto storage_ast = std::make_shared<ASTStorage>();
storage_ast->set(storage_ast->engine, engine_ast);
create.set(create.storage, storage_ast);
if (!create.cluster.empty())
throw Exception(ErrorCodes::INCORRECT_QUERY, "Temporary tables cannot be created with ON CLUSTER clause");
if (create.storage)
{
if (create.storage->engine)
{
if (create.storage->engine->name.starts_with("Replicated") || create.storage->engine->name == "KeeperMap")
throw Exception(ErrorCodes::INCORRECT_QUERY, "Temporary tables cannot be created with Replicated or KeeperMap table engines");
}
else
throw Exception(ErrorCodes::INCORRECT_QUERY, "Invalid storage definition for temporary table");
}
else
{
auto engine_ast = std::make_shared<ASTFunction>();
engine_ast->name = "Memory";
engine_ast->no_empty_args = true;
auto storage_ast = std::make_shared<ASTStorage>();
storage_ast->set(storage_ast->engine, engine_ast);
create.set(create.storage, storage_ast);
}
return;
}
@ -1284,8 +1293,21 @@ bool InterpreterCreateQuery::doCreateTable(ASTCreateQuery & create,
if (create.if_not_exists && getContext()->tryResolveStorageID({"", create.getTable()}, Context::ResolveExternal))
return false;
DatabasePtr database = DatabaseCatalog::instance().getDatabase(DatabaseCatalog::TEMPORARY_DATABASE);
String temporary_table_name = create.getTable();
auto temporary_table = TemporaryTableHolder(getContext(), properties.columns, properties.constraints, query_ptr);
auto creator = [&](const StorageID & table_id)
{
return StorageFactory::instance().get(create,
database->getTableDataPath(table_id.getTableName()),
getContext(),
getContext()->getGlobalContext(),
properties.columns,
properties.constraints,
false);
};
auto temporary_table = TemporaryTableHolder(getContext(), creator, query_ptr);
getContext()->getSessionContext()->addExternalTable(temporary_table_name, std::move(temporary_table));
return true;
}
@ -1712,7 +1734,13 @@ AccessRightsElements InterpreterCreateQuery::getRequiredAccess() const
else
{
if (create.temporary)
required_access.emplace_back(AccessType::CREATE_TEMPORARY_TABLE);
{
/// Currently default table engine for temporary tables is Memory. default_table_engine does not affect temporary tables.
if (create.storage && create.storage->engine && create.storage->engine->name != "Memory")
required_access.emplace_back(AccessType::CREATE_ARBITRARY_TEMPORARY_TABLE);
else
required_access.emplace_back(AccessType::CREATE_TEMPORARY_TABLE);
}
else
{
if (create.replace_table)

View File

@ -282,11 +282,6 @@ BlockIO InterpreterDropQuery::executeToTemporaryTable(const String & table_name,
else if (kind == ASTDropQuery::Kind::Drop)
{
context_handle->removeExternalTable(table_name);
table->flushAndShutdown();
auto table_lock = table->lockExclusively(getContext()->getCurrentQueryId(), getContext()->getSettingsRef().lock_acquire_timeout);
/// Delete table data
table->drop();
table->is_dropped = true;
}
else if (kind == ASTDropQuery::Kind::Detach)
{

View File

@ -83,7 +83,10 @@ void ReplaceQueryParameterVisitor::visitQueryParameter(ASTPtr & ast)
IColumn & temp_column = *temp_column_ptr;
ReadBufferFromString read_buffer{value};
FormatSettings format_settings;
data_type->getDefaultSerialization()->deserializeTextEscaped(temp_column, read_buffer, format_settings);
if (ast_param.name == "_request_body")
data_type->getDefaultSerialization()->deserializeWholeText(temp_column, read_buffer, format_settings);
else
data_type->getDefaultSerialization()->deserializeTextEscaped(temp_column, read_buffer, format_settings);
if (!read_buffer.eof())
throw Exception(ErrorCodes::BAD_QUERY_PARAMETER,

View File

@ -2151,8 +2151,9 @@ bool ParserTTLElement::parseImpl(Pos & pos, ASTPtr & node, Expected & expected)
ParserKeyword s_set("SET");
ParserKeyword s_recompress("RECOMPRESS");
ParserKeyword s_codec("CODEC");
ParserToken s_comma(TokenType::Comma);
ParserToken s_eq(TokenType::Equals);
ParserKeyword s_materialize("MATERIALIZE");
ParserKeyword s_remove("REMOVE");
ParserKeyword s_modify("MODIFY");
ParserIdentifier parser_identifier;
ParserStringLiteral parser_string_literal;
@ -2160,8 +2161,11 @@ bool ParserTTLElement::parseImpl(Pos & pos, ASTPtr & node, Expected & expected)
ParserExpressionList parser_keys_list(false);
ParserCodec parser_codec;
ParserList parser_assignment_list(
std::make_unique<ParserAssignment>(), std::make_unique<ParserToken>(TokenType::Comma));
if (s_materialize.checkWithoutMoving(pos, expected) ||
s_remove.checkWithoutMoving(pos, expected) ||
s_modify.checkWithoutMoving(pos, expected))
return false;
ASTPtr ttl_expr;
if (!parser_exp.parse(pos, ttl_expr, expected))
@ -2219,6 +2223,9 @@ bool ParserTTLElement::parseImpl(Pos & pos, ASTPtr & node, Expected & expected)
if (s_set.ignore(pos))
{
ParserList parser_assignment_list(
std::make_unique<ParserAssignment>(), std::make_unique<ParserToken>(TokenType::Comma));
if (!parser_assignment_list.parse(pos, group_by_assignments, expected))
return false;
}

View File

@ -38,7 +38,6 @@ static ITransformingStep::Traits getTraits(bool should_produce_results_in_order_
return ITransformingStep::Traits
{
{
.preserves_distinct_columns = false, /// Actually, we may check that distinct names are in aggregation keys
.returns_single_stream = should_produce_results_in_order_of_bucket_number,
.preserves_number_of_streams = false,
.preserves_sorting = false,

View File

@ -14,7 +14,6 @@ static ITransformingStep::Traits getTraits()
return ITransformingStep::Traits
{
{
.preserves_distinct_columns = false,
.returns_single_stream = false,
.preserves_number_of_streams = true,
.preserves_sorting = false,

View File

@ -40,7 +40,6 @@ static ITransformingStep::Traits getTraits()
return ITransformingStep::Traits
{
{
.preserves_distinct_columns = true,
.returns_single_stream = false,
.preserves_number_of_streams = true,
.preserves_sorting = true,

View File

@ -21,7 +21,6 @@ static ITransformingStep::Traits getTraits()
return ITransformingStep::Traits
{
{
.preserves_distinct_columns = true,
.returns_single_stream = false,
.preserves_number_of_streams = true,
.preserves_sorting = true,

View File

@ -14,7 +14,6 @@ static ITransformingStep::Traits getTraits()
return ITransformingStep::Traits
{
{
.preserves_distinct_columns = false,
.returns_single_stream = true,
.preserves_number_of_streams = false,
.preserves_sorting = false,
@ -32,9 +31,6 @@ CubeStep::CubeStep(const DataStream & input_stream_, Aggregator::Params params_,
, final(final_)
, use_nulls(use_nulls_)
{
/// Aggregation keys are distinct
for (const auto & key : params.keys)
output_stream->distinct_columns.insert(key);
}
ProcessorPtr addGroupingSetForTotals(const Block & header, const Names & keys, bool use_nulls, const BuildQueryPipelineSettings & settings, UInt64 grouping_set_number)
@ -89,9 +85,5 @@ void CubeStep::updateOutputStream()
{
output_stream = createOutputStream(
input_streams.front(), generateOutputHeader(params.getHeader(input_streams.front().header, final), params.keys, use_nulls), getDataStreamTraits());
/// Aggregation keys are distinct
for (const auto & key : params.keys)
output_stream->distinct_columns.insert(key);
}
}

View File

@ -10,28 +10,13 @@
namespace DB
{
static bool checkColumnsAlreadyDistinct(const Names & columns, const NameSet & distinct_names)
{
if (distinct_names.empty())
return false;
/// Now we need to check that distinct_names is a subset of columns.
std::unordered_set<std::string_view> columns_set(columns.begin(), columns.end());
for (const auto & name : distinct_names)
if (!columns_set.contains(name))
return false;
return true;
}
static ITransformingStep::Traits getTraits(bool pre_distinct, bool already_distinct_columns)
static ITransformingStep::Traits getTraits(bool pre_distinct)
{
return ITransformingStep::Traits
{
{
.preserves_distinct_columns = already_distinct_columns, /// Will be calculated separately otherwise
.returns_single_stream = !pre_distinct && !already_distinct_columns,
.preserves_number_of_streams = pre_distinct || already_distinct_columns,
.returns_single_stream = !pre_distinct,
.preserves_number_of_streams = pre_distinct,
.preserves_sorting = true, /// Sorting is preserved indeed because of implementation.
},
{
@ -62,34 +47,23 @@ DistinctStep::DistinctStep(
: ITransformingStep(
input_stream_,
input_stream_.header,
getTraits(pre_distinct_, checkColumnsAlreadyDistinct(columns_, input_stream_.distinct_columns)))
getTraits(pre_distinct_))
, set_size_limits(set_size_limits_)
, limit_hint(limit_hint_)
, columns(columns_)
, pre_distinct(pre_distinct_)
, optimize_distinct_in_order(optimize_distinct_in_order_)
{
if (!output_stream->distinct_columns.empty() /// Columns already distinct, do nothing
&& (!pre_distinct /// Main distinct
|| input_stream_.has_single_port)) /// pre_distinct for single port works as usual one
{
/// Build distinct set.
for (const auto & name : columns)
output_stream->distinct_columns.insert(name);
}
}
void DistinctStep::transformPipeline(QueryPipelineBuilder & pipeline, const BuildQueryPipelineSettings &)
{
const auto & input_stream = input_streams.back();
if (checkColumnsAlreadyDistinct(columns, input_stream.distinct_columns))
return;
if (!pre_distinct)
pipeline.resize(1);
if (optimize_distinct_in_order)
{
const auto & input_stream = input_streams.back();
const SortDescription distinct_sort_desc = getSortDescription(input_stream.sort_description, columns);
if (!distinct_sort_desc.empty())
{
@ -197,16 +171,7 @@ void DistinctStep::updateOutputStream()
output_stream = createOutputStream(
input_streams.front(),
input_streams.front().header,
getTraits(pre_distinct, checkColumnsAlreadyDistinct(columns, input_streams.front().distinct_columns)).data_stream_traits);
if (!output_stream->distinct_columns.empty() /// Columns already distinct, do nothing
&& (!pre_distinct /// Main distinct
|| input_streams.front().has_single_port)) /// pre_distinct for single port works as usual one
{
/// Build distinct set.
for (const auto & name : columns)
output_stream->distinct_columns.insert(name);
}
getTraits(pre_distinct).data_stream_traits);
}
}

View File

@ -15,7 +15,6 @@ static ITransformingStep::Traits getTraits(const ActionsDAGPtr & actions, const
return ITransformingStep::Traits
{
{
.preserves_distinct_columns = !actions->hasArrayJoin(),
.returns_single_stream = false,
.preserves_number_of_streams = true,
.preserves_sorting = actions->isSortingPreserved(header, sort_description),
@ -33,8 +32,6 @@ ExpressionStep::ExpressionStep(const DataStream & input_stream_, const ActionsDA
getTraits(actions_dag_, input_stream_.header, input_stream_.sort_description))
, actions_dag(actions_dag_)
{
/// Some columns may be removed by expression.
updateDistinctColumns(output_stream->header, output_stream->distinct_columns);
}
void ExpressionStep::transformPipeline(QueryPipelineBuilder & pipeline, const BuildQueryPipelineSettings & settings)
@ -63,22 +60,9 @@ void ExpressionStep::transformPipeline(QueryPipelineBuilder & pipeline, const Bu
void ExpressionStep::describeActions(FormatSettings & settings) const
{
String prefix(settings.offset, ' ');
bool first = true;
String prefix(settings.offset, settings.indent_char);
auto expression = std::make_shared<ExpressionActions>(actions_dag);
for (const auto & action : expression->getActions())
{
settings.out << prefix << (first ? "Actions: "
: " ");
first = false;
settings.out << action.toString() << '\n';
}
settings.out << prefix << "Positions:";
for (const auto & pos : expression->getResultPositions())
settings.out << ' ' << pos;
settings.out << '\n';
expression->describeActions(settings.out, prefix);
}
void ExpressionStep::describeActions(JSONBuilder::JSONMap & map) const

View File

@ -9,7 +9,6 @@ static ITransformingStep::Traits getTraits()
return ITransformingStep::Traits
{
{
.preserves_distinct_columns = true,
.returns_single_stream = false,
.preserves_number_of_streams = true,
.preserves_sorting = true,

View File

@ -17,7 +17,6 @@ static ITransformingStep::Traits getTraits()
return ITransformingStep::Traits
{
{
.preserves_distinct_columns = false, /// TODO: it seem to actually be true. Check it later.
.returns_single_stream = true,
.preserves_number_of_streams = true,
.preserves_sorting = true,

View File

@ -23,7 +23,6 @@ static ITransformingStep::Traits getTraits(const ActionsDAGPtr & expression, con
return ITransformingStep::Traits
{
{
.preserves_distinct_columns = !expression->hasArrayJoin(), /// I suppose it actually never happens
.returns_single_stream = false,
.preserves_number_of_streams = true,
.preserves_sorting = preserves_sorting,
@ -51,8 +50,6 @@ FilterStep::FilterStep(
, filter_column_name(std::move(filter_column_name_))
, remove_filter_column(remove_filter_column_)
{
/// TODO: it would be easier to remove all expressions from filter step. It should only filter by column name.
updateDistinctColumns(output_stream->header, output_stream->distinct_columns);
}
void FilterStep::transformPipeline(QueryPipelineBuilder & pipeline, const BuildQueryPipelineSettings & settings)
@ -82,27 +79,15 @@ void FilterStep::transformPipeline(QueryPipelineBuilder & pipeline, const BuildQ
void FilterStep::describeActions(FormatSettings & settings) const
{
String prefix(settings.offset, ' ');
String prefix(settings.offset, settings.indent_char);
settings.out << prefix << "Filter column: " << filter_column_name;
if (remove_filter_column)
settings.out << " (removed)";
settings.out << '\n';
bool first = true;
auto expression = std::make_shared<ExpressionActions>(actions_dag);
for (const auto & action : expression->getActions())
{
settings.out << prefix << (first ? "Actions: "
: " ");
first = false;
settings.out << action.toString() << '\n';
}
settings.out << prefix << "Positions:";
for (const auto & pos : expression->getResultPositions())
settings.out << ' ' << pos;
settings.out << '\n';
expression->describeActions(settings.out, prefix);
}
void FilterStep::describeActions(JSONBuilder::JSONMap & map) const

View File

@ -23,11 +23,6 @@ class DataStream
public:
Block header;
/// Tuples with those columns are distinct.
/// It doesn't mean that columns are distinct separately.
/// Removing any column from this list breaks this invariant.
NameSet distinct_columns = {};
/// QueryPipeline has single port. Totals or extremes ports are not counted.
bool has_single_port = false;
@ -51,8 +46,7 @@ public:
bool hasEqualPropertiesWith(const DataStream & other) const
{
return distinct_columns == other.distinct_columns
&& has_single_port == other.has_single_port
return has_single_port == other.has_single_port
&& sort_description == other.sort_description
&& (sort_description.empty() || sort_scope == other.sort_scope);
}

View File

@ -20,9 +20,6 @@ DataStream ITransformingStep::createOutputStream(
{
DataStream output_stream{.header = std::move(output_header)};
if (stream_traits.preserves_distinct_columns)
output_stream.distinct_columns = input_stream.distinct_columns;
output_stream.has_single_port = stream_traits.returns_single_stream
|| (input_stream.has_single_port && stream_traits.preserves_number_of_streams);
@ -50,21 +47,6 @@ QueryPipelineBuilderPtr ITransformingStep::updatePipeline(QueryPipelineBuilders
return std::move(pipelines.front());
}
void ITransformingStep::updateDistinctColumns(const Block & res_header, NameSet & distinct_columns)
{
if (distinct_columns.empty())
return;
for (const auto & column : distinct_columns)
{
if (!res_header.has(column))
{
distinct_columns.clear();
break;
}
}
}
void ITransformingStep::describePipeline(FormatSettings & settings) const
{
IQueryPlanStep::describePipeline(processors, settings);

View File

@ -18,11 +18,6 @@ public:
/// They are specified in constructor and cannot be changed.
struct DataStreamTraits
{
/// Keep distinct_columns unchanged.
/// Examples: true for LimitStep, false for ExpressionStep with ARRAY JOIN
/// It some columns may be removed from result header, call updateDistinctColumns
bool preserves_distinct_columns;
/// True if pipeline has single output port after this step.
/// Examples: MergeSortingStep, AggregatingStep
bool returns_single_stream;
@ -69,8 +64,6 @@ public:
input_streams.emplace_back(std::move(input_stream));
updateOutputStream();
updateDistinctColumns(output_stream->header, output_stream->distinct_columns);
}
void describePipeline(FormatSettings & settings) const override;
@ -83,9 +76,6 @@ public:
}
protected:
/// Clear distinct_columns if res_header doesn't contain all of them.
static void updateDistinctColumns(const Block & res_header, NameSet & distinct_columns);
/// Create output stream from header and traits.
static DataStream createOutputStream(
const DataStream & input_stream,

View File

@ -83,7 +83,6 @@ static ITransformingStep::Traits getStorageJoinTraits()
return ITransformingStep::Traits
{
{
.preserves_distinct_columns = false,
.returns_single_stream = false,
.preserves_number_of_streams = true,
.preserves_sorting = false,

View File

@ -12,7 +12,6 @@ static ITransformingStep::Traits getTraits()
return ITransformingStep::Traits
{
{
.preserves_distinct_columns = true,
.returns_single_stream = true,
.preserves_number_of_streams = false,
.preserves_sorting = true,

View File

@ -12,7 +12,6 @@ static ITransformingStep::Traits getTraits()
return ITransformingStep::Traits
{
{
.preserves_distinct_columns = true,
.returns_single_stream = false,
.preserves_number_of_streams = true,
.preserves_sorting = true,

View File

@ -24,7 +24,6 @@ static ITransformingStep::Traits getTraits(bool should_produce_results_in_order_
return ITransformingStep::Traits
{
{
.preserves_distinct_columns = false,
.returns_single_stream = should_produce_results_in_order_of_bucket_number,
.preserves_number_of_streams = false,
.preserves_sorting = false,
@ -62,10 +61,6 @@ MergingAggregatedStep::MergingAggregatedStep(
, should_produce_results_in_order_of_bucket_number(should_produce_results_in_order_of_bucket_number_)
, memory_bound_merging_of_aggregation_results_enabled(memory_bound_merging_of_aggregation_results_enabled_)
{
/// Aggregation keys are distinct
for (const auto & key : params.keys)
output_stream->distinct_columns.insert(key);
if (memoryBoundMergingWillBeUsed() && should_produce_results_in_order_of_bucket_number)
{
output_stream->sort_description = group_by_sort_description;
@ -157,10 +152,6 @@ void MergingAggregatedStep::describeActions(JSONBuilder::JSONMap & map) const
void MergingAggregatedStep::updateOutputStream()
{
output_stream = createOutputStream(input_streams.front(), params.getHeader(input_streams.front().header, final), getDataStreamTraits());
/// Aggregation keys are distinct
for (const auto & key : params.keys)
output_stream->distinct_columns.insert(key);
}
bool MergingAggregatedStep::memoryBoundMergingWillBeUsed() const

View File

@ -27,6 +27,7 @@ public:
bool memory_bound_merging_of_aggregation_results_enabled_);
String getName() const override { return "MergingAggregated"; }
const Aggregator::Params & getParams() const { return params; }
void transformPipeline(QueryPipelineBuilder & pipeline, const BuildQueryPipelineSettings &) override;

View File

@ -12,7 +12,6 @@ static ITransformingStep::Traits getTraits()
return ITransformingStep::Traits
{
{
.preserves_distinct_columns = true,
.returns_single_stream = false,
.preserves_number_of_streams = true,
.preserves_sorting = true,

View File

@ -1,5 +1,7 @@
#include <memory>
#include <Processors/QueryPlan/DistinctStep.h>
#include <Processors/QueryPlan/ExpressionStep.h>
#include <Processors/QueryPlan/FilterStep.h>
#include <Processors/QueryPlan/ITransformingStep.h>
#include <Processors/QueryPlan/Optimizations/Optimizations.h>
#include <Processors/QueryPlan/ReadFromMergeTree.h>
@ -7,6 +9,71 @@
namespace DB::QueryPlanOptimizations
{
/// build actions DAG from stack of steps
static ActionsDAGPtr buildActionsForPlanPath(std::vector<ActionsDAGPtr> & dag_stack)
{
if (dag_stack.empty())
return nullptr;
ActionsDAGPtr path_actions = dag_stack.back()->clone();
dag_stack.pop_back();
while (!dag_stack.empty())
{
ActionsDAGPtr clone = dag_stack.back()->clone();
dag_stack.pop_back();
path_actions->mergeInplace(std::move(*clone));
}
return path_actions;
}
static const ActionsDAG::Node * getOriginalNodeForOutputAlias(const ActionsDAGPtr & actions, const String & output_name)
{
/// find alias in output
const ActionsDAG::Node * output_alias = nullptr;
for (const auto * node : actions->getOutputs())
{
if (node->result_name == output_name)
{
output_alias = node;
break;
}
}
if (!output_alias)
return nullptr;
/// find original(non alias) node it refers to
const ActionsDAG::Node * node = output_alias;
while (node && node->type == ActionsDAG::ActionType::ALIAS)
{
chassert(!node->children.empty());
node = node->children.front();
}
if (node && node->type != ActionsDAG::ActionType::INPUT)
return nullptr;
return node;
}
static std::set<std::string>
getOriginalDistinctColumns(const ColumnsWithTypeAndName & distinct_columns, std::vector<ActionsDAGPtr> & dag_stack)
{
auto actions = buildActionsForPlanPath(dag_stack);
std::set<std::string> original_distinct_columns;
for (const auto & column : distinct_columns)
{
/// const columns doesn't affect DISTINCT, so skip them
if (isColumnConst(*column.column))
continue;
const auto * input_node = getOriginalNodeForOutputAlias(actions, column.name);
if (!input_node)
break;
original_distinct_columns.insert(input_node->result_name);
}
return original_distinct_columns;
}
size_t tryDistinctReadInOrder(QueryPlan::Node * parent_node)
{
/// check if it is preliminary distinct node
@ -22,8 +89,10 @@ size_t tryDistinctReadInOrder(QueryPlan::Node * parent_node)
/// walk through the plan
/// (1) check if nodes below preliminary distinct preserve sorting
/// (2) gather transforming steps to update their sorting properties later
/// (3) gather actions DAG to find original names for columns in distinct step later
std::vector<ITransformingStep *> steps_to_update;
QueryPlan::Node * node = parent_node;
std::vector<ActionsDAGPtr> dag_stack;
while (!node->children.empty())
{
auto * step = dynamic_cast<ITransformingStep *>(node->step.get());
@ -36,6 +105,11 @@ size_t tryDistinctReadInOrder(QueryPlan::Node * parent_node)
steps_to_update.push_back(step);
if (const auto * const expr = typeid_cast<const ExpressionStep *>(step); expr)
dag_stack.push_back(expr->getExpression());
else if (const auto * const filter = typeid_cast<const FilterStep *>(step); filter)
dag_stack.push_back(filter->getExpression());
node = node->children.front();
}
@ -50,28 +124,24 @@ size_t tryDistinctReadInOrder(QueryPlan::Node * parent_node)
if (read_from_merge_tree->getOutputStream().sort_description.empty())
return 0;
/// find non-const columns in DISTINCT
/// get original names for DISTINCT columns
const ColumnsWithTypeAndName & distinct_columns = pre_distinct->getOutputStream().header.getColumnsWithTypeAndName();
std::set<std::string_view> non_const_columns;
for (const auto & column : distinct_columns)
{
if (!isColumnConst(*column.column))
non_const_columns.emplace(column.name);
}
auto original_distinct_columns = getOriginalDistinctColumns(distinct_columns, dag_stack);
const Names& sorting_key_columns = read_from_merge_tree->getStorageMetadata()->getSortingKeyColumns();
/// check if DISTINCT has the same columns as sorting key
const Names & sorting_key_columns = read_from_merge_tree->getStorageMetadata()->getSortingKeyColumns();
size_t number_of_sorted_distinct_columns = 0;
for (const auto & column_name : sorting_key_columns)
{
if (non_const_columns.end() == non_const_columns.find(column_name))
if (!original_distinct_columns.contains(column_name))
break;
++number_of_sorted_distinct_columns;
}
/// apply optimization only when distinct columns match or form prefix of sorting key
/// todo: check if reading in order optimization would be beneficial when sorting key is prefix of columns in DISTINCT
if (number_of_sorted_distinct_columns != non_const_columns.size())
if (number_of_sorted_distinct_columns != original_distinct_columns.size())
return 0;
/// check if another read in order optimization is already applied

View File

@ -11,6 +11,7 @@
#include <Processors/QueryPlan/JoinStep.h>
#include <Processors/QueryPlan/LimitByStep.h>
#include <Processors/QueryPlan/LimitStep.h>
#include <Processors/QueryPlan/MergingAggregatedStep.h>
#include <Processors/QueryPlan/Optimizations/Optimizations.h>
#include <Processors/QueryPlan/RollupStep.h>
#include <Processors/QueryPlan/SortingStep.h>
@ -100,24 +101,29 @@ namespace
logDebug("aggregation_keys", aggregation_keys);
logDebug("aggregation_keys size", aggregation_keys.size());
logDebug("distinct_columns size", distinct_columns.size());
if (aggregation_keys.size() != distinct_columns.size())
return false;
/// compare columns of two DISTINCTs
std::set<std::string_view> original_distinct_columns;
for (const auto & column : distinct_columns)
{
logDebug("distinct column name", column);
const auto * alias_node = getOriginalNodeForOutputAlias(path_actions, String(column));
if (!alias_node)
{
logDebug("original name for alias is not found for", column);
return false;
logDebug("original name for alias is not found", column);
original_distinct_columns.insert(column);
}
logDebug("alias result name", alias_node->result_name);
if (std::find(cbegin(aggregation_keys), cend(aggregation_keys), alias_node->result_name) == aggregation_keys.cend())
else
{
logDebug("alias result name is not found in aggregation keys", alias_node->result_name);
logDebug("alias result name", alias_node->result_name);
original_distinct_columns.insert(alias_node->result_name);
}
}
/// if aggregation keys are part of distinct columns then rows already distinct
for (const auto & key : aggregation_keys)
{
if (!original_distinct_columns.contains(key))
{
logDebug("aggregation key NOT found: {}", key);
return false;
}
}
@ -176,7 +182,7 @@ namespace
while (!node->children.empty())
{
const IQueryPlanStep * current_step = node->step.get();
if (typeid_cast<const AggregatingStep *>(current_step))
if (typeid_cast<const AggregatingStep *>(current_step) || typeid_cast<const MergingAggregatedStep *>(current_step))
{
aggregation_before_distinct = current_step;
break;
@ -208,6 +214,9 @@ namespace
if (const auto * aggregating_step = typeid_cast<const AggregatingStep *>(aggregation_before_distinct); aggregating_step)
return compareAggregationKeysWithDistinctColumns(aggregating_step->getParams().keys, distinct_columns, actions);
else if (const auto * merging_aggregated_step = typeid_cast<const MergingAggregatedStep *>(aggregation_before_distinct);
merging_aggregated_step)
return compareAggregationKeysWithDistinctColumns(merging_aggregated_step->getParams().keys, distinct_columns, actions);
}
return false;

View File

@ -340,57 +340,55 @@ Pipe ReadFromMergeTree::readFromPool(
/ max_block_size * max_block_size / fixed_index_granularity;
}
bool all_parts_are_remote = true;
bool all_parts_are_local = true;
for (const auto & part : parts_with_range)
{
const bool is_remote = part.data_part->isStoredOnRemoteDisk();
all_parts_are_local &= !is_remote;
all_parts_are_remote &= is_remote;
}
bool all_parts_are_remote = true;
bool all_parts_are_local = true;
for (const auto & part : parts_with_range)
{
const bool is_remote = part.data_part->isStoredOnRemoteDisk();
all_parts_are_local &= !is_remote;
all_parts_are_remote &= is_remote;
}
MergeTreeReadPoolPtr pool;
MergeTreeReadPoolPtr pool;
if ((all_parts_are_remote
&& settings.allow_prefetched_read_pool_for_remote_filesystem
&& MergeTreePrefetchedReadPool::checkReadMethodAllowed(reader_settings.read_settings.remote_fs_method))
|| (all_parts_are_local
&& settings.allow_prefetched_read_pool_for_local_filesystem
&& MergeTreePrefetchedReadPool::checkReadMethodAllowed(reader_settings.read_settings.local_fs_method)))
{
pool = std::make_shared<MergeTreePrefetchedReadPool>(
max_streams,
sum_marks,
min_marks_for_concurrent_read,
std::move(parts_with_range),
storage_snapshot,
prewhere_info,
actions_settings,
required_columns,
virt_column_names,
settings.preferred_block_size_bytes,
reader_settings,
context,
use_uncompressed_cache,
all_parts_are_remote,
*data.getSettings());
}
else
{
pool = std::make_shared<MergeTreeReadPool>(
max_streams,
sum_marks,
min_marks_for_concurrent_read,
std::move(parts_with_range),
storage_snapshot,
prewhere_info,
actions_settings,
reader_settings,
required_columns,
virt_column_names,
context,
false);
}
if ((all_parts_are_remote && settings.allow_prefetched_read_pool_for_remote_filesystem
&& MergeTreePrefetchedReadPool::checkReadMethodAllowed(reader_settings.read_settings.remote_fs_method))
|| (all_parts_are_local && settings.allow_prefetched_read_pool_for_local_filesystem
&& MergeTreePrefetchedReadPool::checkReadMethodAllowed(reader_settings.read_settings.local_fs_method)))
{
pool = std::make_shared<MergeTreePrefetchedReadPool>(
max_streams,
sum_marks,
min_marks_for_concurrent_read,
std::move(parts_with_range),
storage_snapshot,
prewhere_info,
actions_settings,
required_columns,
virt_column_names,
settings.preferred_block_size_bytes,
reader_settings,
context,
use_uncompressed_cache,
all_parts_are_remote,
*data.getSettings());
}
else
{
pool = std::make_shared<MergeTreeReadPool>(
max_streams,
sum_marks,
min_marks_for_concurrent_read,
std::move(parts_with_range),
storage_snapshot,
prewhere_info,
actions_settings,
reader_settings,
required_columns,
virt_column_names,
context,
false);
}
auto * logger = &Poco::Logger::get(data.getLogName() + " (SelectExecutor)");
LOG_DEBUG(logger, "Reading approx. {} rows with {} streams", total_rows, max_streams);
@ -1732,6 +1730,36 @@ void ReadFromMergeTree::describeActions(FormatSettings & format_settings) const
format_settings.out << prefix << "Parts: " << result.index_stats.back().num_parts_after << '\n';
format_settings.out << prefix << "Granules: " << result.index_stats.back().num_granules_after << '\n';
}
if (prewhere_info)
{
format_settings.out << prefix << "Prewhere info" << '\n';
format_settings.out << prefix << "Need filter: " << prewhere_info->need_filter << '\n';
prefix.push_back(format_settings.indent_char);
prefix.push_back(format_settings.indent_char);
if (prewhere_info->prewhere_actions)
{
format_settings.out << prefix << "Prewhere filter" << '\n';
format_settings.out << prefix << "Prewhere filter column: " << prewhere_info->prewhere_column_name;
if (prewhere_info->remove_prewhere_column)
format_settings.out << " (removed)";
format_settings.out << '\n';
auto expression = std::make_shared<ExpressionActions>(prewhere_info->prewhere_actions);
expression->describeActions(format_settings.out, prefix);
}
if (prewhere_info->row_level_filter)
{
format_settings.out << prefix << "Row level filter" << '\n';
format_settings.out << prefix << "Row level filter column: " << prewhere_info->row_level_column_name << '\n';
auto expression = std::make_shared<ExpressionActions>(prewhere_info->row_level_filter);
expression->describeActions(format_settings.out, prefix);
}
}
}
void ReadFromMergeTree::describeActions(JSONBuilder::JSONMap & map) const
@ -1743,6 +1771,35 @@ void ReadFromMergeTree::describeActions(JSONBuilder::JSONMap & map) const
map.add("Parts", result.index_stats.back().num_parts_after);
map.add("Granules", result.index_stats.back().num_granules_after);
}
if (prewhere_info)
{
std::unique_ptr<JSONBuilder::JSONMap> prewhere_info_map = std::make_unique<JSONBuilder::JSONMap>();
prewhere_info_map->add("Need filter", prewhere_info->need_filter);
if (prewhere_info->prewhere_actions)
{
std::unique_ptr<JSONBuilder::JSONMap> prewhere_filter_map = std::make_unique<JSONBuilder::JSONMap>();
prewhere_filter_map->add("Prewhere filter column", prewhere_info->prewhere_column_name);
prewhere_filter_map->add("Prewhere filter remove filter column", prewhere_info->remove_prewhere_column);
auto expression = std::make_shared<ExpressionActions>(prewhere_info->prewhere_actions);
prewhere_filter_map->add("Prewhere filter expression", expression->toTree());
prewhere_info_map->add("Prewhere filter", std::move(prewhere_filter_map));
}
if (prewhere_info->row_level_filter)
{
std::unique_ptr<JSONBuilder::JSONMap> row_level_filter_map = std::make_unique<JSONBuilder::JSONMap>();
row_level_filter_map->add("Row level filter column", prewhere_info->row_level_column_name);
auto expression = std::make_shared<ExpressionActions>(prewhere_info->row_level_filter);
row_level_filter_map->add("Row level filter expression", expression->toTree());
prewhere_info_map->add("Row level filter", std::move(row_level_filter_map));
}
map.add("Prewhere info", std::move(prewhere_info_map));
}
}
void ReadFromMergeTree::describeIndexes(FormatSettings & format_settings) const

View File

@ -11,7 +11,6 @@ static ITransformingStep::Traits getTraits()
return ITransformingStep::Traits
{
{
.preserves_distinct_columns = false,
.returns_single_stream = true,
.preserves_number_of_streams = false,
.preserves_sorting = false,
@ -29,9 +28,6 @@ RollupStep::RollupStep(const DataStream & input_stream_, Aggregator::Params para
, final(final_)
, use_nulls(use_nulls_)
{
/// Aggregation keys are distinct
for (const auto & key : params.keys)
output_stream->distinct_columns.insert(key);
}
ProcessorPtr addGroupingSetForTotals(const Block & header, const Names & keys, bool use_nulls, const BuildQueryPipelineSettings & settings, UInt64 grouping_set_number);
@ -54,10 +50,6 @@ void RollupStep::updateOutputStream()
{
output_stream = createOutputStream(
input_streams.front(), appendGroupingSetColumn(params.getHeader(input_streams.front().header, final)), getDataStreamTraits());
/// Aggregation keys are distinct
for (const auto & key : params.keys)
output_stream->distinct_columns.insert(key);
}

View File

@ -45,7 +45,6 @@ static ITransformingStep::Traits getTraits(size_t limit)
return ITransformingStep::Traits
{
{
.preserves_distinct_columns = true,
.returns_single_stream = true,
.preserves_number_of_streams = false,
.preserves_sorting = false,

View File

@ -14,7 +14,6 @@ static ITransformingStep::Traits getTraits(bool has_filter)
return ITransformingStep::Traits
{
{
.preserves_distinct_columns = true,
.returns_single_stream = true,
.preserves_number_of_streams = false,
.preserves_sorting = true,

View File

@ -15,7 +15,6 @@ static ITransformingStep::Traits getTraits()
return ITransformingStep::Traits
{
{
.preserves_distinct_columns = true,
.returns_single_stream = false,
.preserves_number_of_streams = true,
.preserves_sorting = true,

View File

@ -259,6 +259,114 @@ IProcessor::Status FillingTransform::prepare()
return ISimpleTransform::prepare();
}
void FillingTransform::interpolate(const MutableColumns & result_columns, Block & interpolate_block)
{
if (interpolate_description)
{
interpolate_block.clear();
if (!input_positions.empty())
{
/// populate calculation block with required columns with values from previous row
for (const auto & [col_pos, name_type] : input_positions)
{
MutableColumnPtr column = name_type.type->createColumn();
const auto * res_column = result_columns[col_pos].get();
size_t size = res_column->size();
if (size == 0) /// this is the first row in current chunk
{
/// take value from last row of previous chunk if exists, else use default
if (last_row.size() > col_pos && !last_row[col_pos]->empty())
column->insertFrom(*last_row[col_pos], 0);
else
column->insertDefault();
}
else /// take value from previous row of current chunk
column->insertFrom(*res_column, size - 1);
interpolate_block.insert({std::move(column), name_type.type, name_type.name});
}
interpolate_actions->execute(interpolate_block);
}
else /// all INTERPOLATE expressions are constants
{
size_t n = 1;
interpolate_actions->execute(interpolate_block, n);
}
}
}
using MutableColumnRawPtrs = std::vector<IColumn*>;
static void insertFromFillingRow(const MutableColumnRawPtrs & filling_columns, const MutableColumnRawPtrs & interpolate_columns, const MutableColumnRawPtrs & other_columns,
const FillingRow & filling_row, const Block & interpolate_block)
{
for (size_t i = 0, size = filling_columns.size(); i < size; ++i)
{
if (filling_row[i].isNull())
filling_columns[i]->insertDefault();
else
filling_columns[i]->insert(filling_row[i]);
}
if (size_t size = interpolate_block.columns())
{
Columns columns = interpolate_block.getColumns();
for (size_t i = 0; i < size; ++i)
interpolate_columns[i]->insertFrom(*columns[i]->convertToFullColumnIfConst(), 0);
}
else
for (auto * interpolate_column : interpolate_columns)
interpolate_column->insertDefault();
for (auto * other_column : other_columns)
other_column->insertDefault();
}
static void copyRowFromColumns(const MutableColumnRawPtrs & dest, const Columns & source, size_t row_num)
{
for (size_t i = 0, size = source.size(); i < size; ++i)
dest[i]->insertFrom(*source[i], row_num);
}
static void initColumnsByPositions(
const Columns & input_columns,
Columns & input_columns_by_positions,
const MutableColumns & output_columns,
MutableColumnRawPtrs & output_columns_by_position,
const std::vector<size_t> & positions)
{
for (size_t pos : positions)
{
input_columns_by_positions.push_back(input_columns[pos]);
output_columns_by_position.push_back(output_columns[pos].get());
}
}
void FillingTransform::initColumns(
const Columns & input_columns,
Columns & input_fill_columns,
Columns & input_interpolate_columns,
Columns & input_other_columns,
MutableColumns & output_columns,
MutableColumnRawPtrs & output_fill_columns,
MutableColumnRawPtrs & output_interpolate_columns,
MutableColumnRawPtrs & output_other_columns)
{
Columns non_const_columns;
non_const_columns.reserve(input_columns.size());
for (const auto & column : input_columns)
non_const_columns.push_back(column->convertToFullColumnIfConst());
for (const auto & column : non_const_columns)
output_columns.push_back(column->cloneEmpty()->assumeMutable());
initColumnsByPositions(non_const_columns, input_fill_columns, output_columns, output_fill_columns, fill_column_positions);
initColumnsByPositions(
non_const_columns, input_interpolate_columns, output_columns, output_interpolate_columns, interpolate_column_positions);
initColumnsByPositions(non_const_columns, input_other_columns, output_columns, output_other_columns, other_column_positions);
}
void FillingTransform::transform(Chunk & chunk)
{
@ -268,97 +376,58 @@ void FillingTransform::transform(Chunk & chunk)
Columns old_fill_columns;
Columns old_interpolate_columns;
Columns old_other_columns;
MutableColumns res_fill_columns;
MutableColumns res_interpolate_columns;
MutableColumns res_other_columns;
std::vector<std::pair<MutableColumns *, size_t>> res_map;
res_map.resize(input.getHeader().columns());
auto init_columns_by_positions = [&res_map](const Columns & old_columns, Columns & new_columns,
MutableColumns & new_mutable_columns, const Positions & positions)
{
for (size_t pos : positions)
{
auto old_column = old_columns[pos]->convertToFullColumnIfConst();
new_columns.push_back(old_column);
res_map[pos] = {&new_mutable_columns, new_mutable_columns.size()};
new_mutable_columns.push_back(old_column->cloneEmpty()->assumeMutable());
}
};
MutableColumnRawPtrs res_fill_columns;
MutableColumnRawPtrs res_interpolate_columns;
MutableColumnRawPtrs res_other_columns;
MutableColumns result_columns;
Block interpolate_block;
auto interpolate = [&]()
{
if (interpolate_description)
{
interpolate_block.clear();
if (!input_positions.empty())
{
/// populate calculation block with required columns with values from previous row
for (const auto & [col_pos, name_type] : input_positions)
{
MutableColumnPtr column = name_type.type->createColumn();
auto [res_columns, pos] = res_map[col_pos];
size_t size = (*res_columns)[pos]->size();
if (size == 0) /// this is the first row in current chunk
{
/// take value from last row of previous chunk if exists, else use default
if (last_row.size() > col_pos && !last_row[col_pos]->empty())
column->insertFrom(*last_row[col_pos], 0);
else
column->insertDefault();
}
else /// take value from previous row of current chunk
column->insertFrom(*(*res_columns)[pos], size - 1);
interpolate_block.insert({std::move(column), name_type.type, name_type.name});
}
interpolate_actions->execute(interpolate_block);
}
else /// all INTERPOLATE expressions are constants
{
size_t n = 1;
interpolate_actions->execute(interpolate_block, n);
}
}
};
if (generate_suffix)
{
const auto & empty_columns = input.getHeader().getColumns();
init_columns_by_positions(empty_columns, old_fill_columns, res_fill_columns, fill_column_positions);
init_columns_by_positions(empty_columns, old_interpolate_columns, res_interpolate_columns, interpolate_column_positions);
init_columns_by_positions(empty_columns, old_other_columns, res_other_columns, other_column_positions);
initColumns(
empty_columns,
old_fill_columns,
old_interpolate_columns,
old_other_columns,
result_columns,
res_fill_columns,
res_interpolate_columns,
res_other_columns);
if (first)
filling_row.initFromDefaults();
if (should_insert_first && filling_row < next_row)
{
interpolate();
interpolate(result_columns, interpolate_block);
insertFromFillingRow(res_fill_columns, res_interpolate_columns, res_other_columns, filling_row, interpolate_block);
}
interpolate();
interpolate(result_columns, interpolate_block);
while (filling_row.next(next_row))
{
insertFromFillingRow(res_fill_columns, res_interpolate_columns, res_other_columns, filling_row, interpolate_block);
interpolate();
interpolate(result_columns, interpolate_block);
}
setResultColumns(chunk, res_fill_columns, res_interpolate_columns, res_other_columns);
size_t num_output_rows = result_columns[0]->size();
chunk.setColumns(std::move(result_columns), num_output_rows);
return;
}
size_t num_rows = chunk.getNumRows();
const size_t num_rows = chunk.getNumRows();
auto old_columns = chunk.detachColumns();
init_columns_by_positions(old_columns, old_fill_columns, res_fill_columns, fill_column_positions);
init_columns_by_positions(old_columns, old_interpolate_columns, res_interpolate_columns, interpolate_column_positions);
init_columns_by_positions(old_columns, old_other_columns, res_other_columns, other_column_positions);
initColumns(
old_columns,
old_fill_columns,
old_interpolate_columns,
old_other_columns,
result_columns,
res_fill_columns,
res_interpolate_columns,
res_other_columns);
if (first)
{
@ -372,7 +441,7 @@ void FillingTransform::transform(Chunk & chunk)
filling_row.initFromDefaults(i);
if (less(fill_from, current_value, filling_row.getDirection(i)))
{
interpolate();
interpolate(result_columns, interpolate_block);
insertFromFillingRow(res_fill_columns, res_interpolate_columns, res_other_columns, filling_row, interpolate_block);
}
break;
@ -386,7 +455,7 @@ void FillingTransform::transform(Chunk & chunk)
{
should_insert_first = next_row < filling_row;
for (size_t i = 0; i < filling_row.size(); ++i)
for (size_t i = 0, size = filling_row.size(); i < size; ++i)
{
auto current_value = (*old_fill_columns[i])[row_ind];
const auto & fill_to = filling_row.getFillDescription(i).fill_to;
@ -401,15 +470,15 @@ void FillingTransform::transform(Chunk & chunk)
/// and probably we need to insert it to block.
if (should_insert_first && filling_row < next_row)
{
interpolate();
interpolate(result_columns, interpolate_block);
insertFromFillingRow(res_fill_columns, res_interpolate_columns, res_other_columns, filling_row, interpolate_block);
}
interpolate();
interpolate(result_columns, interpolate_block);
while (filling_row.next(next_row))
{
insertFromFillingRow(res_fill_columns, res_interpolate_columns, res_other_columns, filling_row, interpolate_block);
interpolate();
interpolate(result_columns, interpolate_block);
}
copyRowFromColumns(res_fill_columns, old_fill_columns, row_ind);
@ -417,55 +486,24 @@ void FillingTransform::transform(Chunk & chunk)
copyRowFromColumns(res_other_columns, old_other_columns, row_ind);
}
saveLastRow(res_fill_columns, res_interpolate_columns, res_other_columns);
setResultColumns(chunk, res_fill_columns, res_interpolate_columns, res_other_columns);
saveLastRow(result_columns);
size_t num_output_rows = result_columns[0]->size();
chunk.setColumns(std::move(result_columns), num_output_rows);
}
void FillingTransform::setResultColumns(Chunk & chunk, MutableColumns & fill_columns, MutableColumns & interpolate_columns, MutableColumns & other_columns) const
{
MutableColumns result_columns(fill_columns.size() + interpolate_columns.size() + other_columns.size());
/// fill_columns always non-empty.
size_t num_rows = fill_columns[0]->size();
for (size_t i = 0, size = fill_columns.size(); i < size; ++i)
result_columns[fill_column_positions[i]] = std::move(fill_columns[i]);
for (size_t i = 0, size = interpolate_columns.size(); i < size; ++i)
result_columns[interpolate_column_positions[i]] = std::move(interpolate_columns[i]);
for (size_t i = 0, size = other_columns.size(); i < size; ++i)
result_columns[other_column_positions[i]] = std::move(other_columns[i]);
chunk.setColumns(std::move(result_columns), num_rows);
}
void FillingTransform::saveLastRow(const MutableColumns & fill_columns, const MutableColumns & interpolate_columns, const MutableColumns & other_columns)
void FillingTransform::saveLastRow(const MutableColumns & result_columns)
{
last_row.clear();
last_row.resize(fill_columns.size() + interpolate_columns.size() + other_columns.size());
size_t num_rows = fill_columns[0]->size();
const size_t num_rows = result_columns[0]->size();
if (num_rows == 0)
return;
for (size_t i = 0, size = fill_columns.size(); i < size; ++i)
for (const auto & result_column : result_columns)
{
auto column = fill_columns[i]->cloneEmpty();
column->insertFrom(*fill_columns[i], num_rows - 1);
last_row[fill_column_positions[i]] = std::move(column);
}
for (size_t i = 0, size = interpolate_columns.size(); i < size; ++i)
{
auto column = interpolate_columns[i]->cloneEmpty();
column->insertFrom(*interpolate_columns[i], num_rows - 1);
last_row[interpolate_column_positions[i]] = std::move(column);
}
for (size_t i = 0, size = other_columns.size(); i < size; ++i)
{
auto column = other_columns[i]->cloneEmpty();
column->insertFrom(*other_columns[i], num_rows - 1);
last_row[other_column_positions[i]] = std::move(column);
auto column = result_column->cloneEmpty();
column->insertFrom(*result_column, num_rows - 1);
last_row.push_back(std::move(column));
}
}
}

View File

@ -28,8 +28,19 @@ protected:
void transform(Chunk & Chunk) override;
private:
void setResultColumns(Chunk & chunk, MutableColumns & fill_columns, MutableColumns & interpolate_columns, MutableColumns & other_columns) const;
void saveLastRow(const MutableColumns & fill_columns, const MutableColumns & interpolate_columns, const MutableColumns & other_columns);
void saveLastRow(const MutableColumns & result_columns);
void interpolate(const MutableColumns& result_columns, Block & interpolate_block);
using MutableColumnRawPtrs = std::vector<IColumn *>;
void initColumns(
const Columns & input_columns,
Columns & input_fill_columns,
Columns & input_interpolate_columns,
Columns & input_other_columns,
MutableColumns & output_columns,
MutableColumnRawPtrs & output_fill_columns,
MutableColumnRawPtrs & output_interpolate_columns,
MutableColumnRawPtrs & output_other_columns);
const SortDescription sort_description; /// Contains only columns with WITH FILL.
const InterpolateDescriptionPtr interpolate_description; /// Contains INTERPOLATE columns

View File

@ -21,6 +21,7 @@
#include <Client/MultiplexedConnections.h>
#include <Client/HedgedConnections.h>
#include <Storages/MergeTree/MergeTreeDataPartUUID.h>
#include <Storages/StorageMemory.h>
namespace ProfileEvents
@ -602,6 +603,9 @@ void RemoteQueryExecutor::sendExternalTables()
for (const auto & table : external_tables)
{
StoragePtr cur = table.second;
/// Send only temporary tables with StorageMemory
if (!std::dynamic_pointer_cast<StorageMemory>(cur))
continue;
auto data = std::make_unique<ExternalTableData>();
data->table_name = table.first;

View File

@ -783,7 +783,6 @@ void HTTPHandler::processQuery(
/// they will be applied in ProcessList::insert() from executeQuery() itself.
const auto & query = getQuery(request, params, context);
std::unique_ptr<ReadBuffer> in_param = std::make_unique<ReadBufferFromString>(query);
in = has_external_data ? std::move(in_param) : std::make_unique<ConcatReadBuffer>(*in_param, *in_post_maybe_compressed);
/// HTTP response compression is turned on only if the client signalled that they support it
/// (using Accept-Encoding header) and 'enable_http_compression' setting is turned on.
@ -833,7 +832,8 @@ void HTTPHandler::processQuery(
});
}
customizeContext(request, context);
customizeContext(request, context, *in_post_maybe_compressed);
in = has_external_data ? std::move(in_param) : std::make_unique<ConcatReadBuffer>(*in_param, *in_post_maybe_compressed);
executeQuery(*in, *used_output.out_maybe_delayed_and_compressed, /* allow_into_outfile = */ false, context,
[&response, this] (const QueryResultDetails & details)
@ -1153,7 +1153,7 @@ bool PredefinedQueryHandler::customizeQueryParam(ContextMutablePtr context, cons
return false;
}
void PredefinedQueryHandler::customizeContext(HTTPServerRequest & request, ContextMutablePtr context)
void PredefinedQueryHandler::customizeContext(HTTPServerRequest & request, ContextMutablePtr context, ReadBuffer & body)
{
/// If in the configuration file, the handler's header is regex and contains named capture group
/// We will extract regex named capture groups as query parameters
@ -1187,6 +1187,15 @@ void PredefinedQueryHandler::customizeContext(HTTPServerRequest & request, Conte
const auto & header_value = request.get(header_name);
set_query_params(header_value.data(), header_value.data() + header_value.size(), regex);
}
if (unlikely(receive_params.contains("_request_body") && !context->getQueryParameters().contains("_request_body")))
{
WriteBufferFromOwnString value;
const auto & settings = context->getSettingsRef();
copyDataMaxBytes(body, value, settings.http_max_request_param_data_size);
context->setQueryParameter("_request_body", value.str());
}
}
std::string PredefinedQueryHandler::getQuery(HTTPServerRequest & request, HTMLForm & params, ContextMutablePtr context)

View File

@ -36,7 +36,7 @@ public:
void handleRequest(HTTPServerRequest & request, HTTPServerResponse & response) override;
/// This method is called right before the query execution.
virtual void customizeContext(HTTPServerRequest & /* request */, ContextMutablePtr /* context */) {}
virtual void customizeContext(HTTPServerRequest & /* request */, ContextMutablePtr /* context */, ReadBuffer & /* body */) {}
virtual bool customizeQueryParam(ContextMutablePtr context, const std::string & key, const std::string & value) = 0;
@ -163,7 +163,7 @@ public:
, const CompiledRegexPtr & url_regex_, const std::unordered_map<String, CompiledRegexPtr> & header_name_with_regex_
, const std::optional<std::string> & content_type_override_);
virtual void customizeContext(HTTPServerRequest & request, ContextMutablePtr context) override;
void customizeContext(HTTPServerRequest & request, ContextMutablePtr context, ReadBuffer & body) override;
std::string getQuery(HTTPServerRequest & request, HTMLForm & params, ContextMutablePtr context) override;

View File

@ -218,9 +218,10 @@ ReplicatedMergeMutateTaskBase::PrepareResult MergeFromLogEntryTask::prepare()
zero_copy_lock = storage.tryCreateZeroCopyExclusiveLock(entry.new_part_name, disk);
if (!zero_copy_lock)
if (!zero_copy_lock || !zero_copy_lock->isLocked())
{
LOG_DEBUG(log, "Merge of part {} started by some other replica, will wait it and fetch merged part", entry.new_part_name);
storage.watchZeroCopyLock(entry.new_part_name, disk);
/// Don't check for missing part -- it's missing because other replica still not
/// finished merge.
return PrepareResult{

View File

@ -59,6 +59,7 @@ void MergeTreeBackgroundExecutor<Queue>::increaseThreadsAndMaxTasksCount(size_t
for (size_t number = threads_count; number < new_threads_count; ++number)
pool.scheduleOrThrowOnError([this] { threadFunction(); });
max_tasks_metric.changeTo(2 * new_max_tasks_count); // pending + active
max_tasks_count.store(new_max_tasks_count, std::memory_order_relaxed);
threads_count = new_threads_count;
}

View File

@ -13,6 +13,7 @@
#include <boost/circular_buffer.hpp>
#include <boost/noncopyable.hpp>
#include <Common/CurrentMetrics.h>
#include <Common/logger_useful.h>
#include <Common/ThreadPool.h>
#include <Common/Stopwatch.h>
@ -247,11 +248,13 @@ public:
String name_,
size_t threads_count_,
size_t max_tasks_count_,
CurrentMetrics::Metric metric_)
CurrentMetrics::Metric metric_,
CurrentMetrics::Metric max_tasks_metric_)
: name(name_)
, threads_count(threads_count_)
, max_tasks_count(max_tasks_count_)
, metric(metric_)
, max_tasks_metric(max_tasks_metric_, 2 * max_tasks_count) // active + pending
{
if (max_tasks_count == 0)
throw Exception(ErrorCodes::INVALID_CONFIG_PARAMETER, "Task count for MergeTreeBackgroundExecutor must not be zero");
@ -272,9 +275,10 @@ public:
size_t threads_count_,
size_t max_tasks_count_,
CurrentMetrics::Metric metric_,
CurrentMetrics::Metric max_tasks_metric_,
std::string_view policy)
requires requires(Queue queue) { queue.updatePolicy(policy); } // Because we use explicit template instantiation
: MergeTreeBackgroundExecutor(name_, threads_count_, max_tasks_count_, metric_)
: MergeTreeBackgroundExecutor(name_, threads_count_, max_tasks_count_, metric_, max_tasks_metric_)
{
pending.updatePolicy(policy);
}
@ -311,6 +315,7 @@ private:
size_t threads_count TSA_GUARDED_BY(mutex) = 0;
std::atomic<size_t> max_tasks_count = 0;
CurrentMetrics::Metric metric;
CurrentMetrics::Increment max_tasks_metric;
void routine(TaskRuntimeDataPtr item);

View File

@ -7496,7 +7496,7 @@ MovePartsOutcome MergeTreeData::movePartsToSpace(const DataPartsVector & parts,
if (moving_tagger->parts_to_move.empty())
return MovePartsOutcome::NothingToMove;
return moveParts(moving_tagger);
return moveParts(moving_tagger, true);
}
MergeTreeData::CurrentlyMovingPartsTaggerPtr MergeTreeData::selectPartsForMove()
@ -7551,7 +7551,7 @@ MergeTreeData::CurrentlyMovingPartsTaggerPtr MergeTreeData::checkPartsForMove(co
return std::make_shared<CurrentlyMovingPartsTagger>(std::move(parts_to_move), *this);
}
MovePartsOutcome MergeTreeData::moveParts(const CurrentlyMovingPartsTaggerPtr & moving_tagger)
MovePartsOutcome MergeTreeData::moveParts(const CurrentlyMovingPartsTaggerPtr & moving_tagger, bool wait_for_move_if_zero_copy)
{
LOG_INFO(log, "Got {} parts to move.", moving_tagger->parts_to_move.size());
@ -7600,21 +7600,41 @@ MovePartsOutcome MergeTreeData::moveParts(const CurrentlyMovingPartsTaggerPtr &
auto disk = moving_part.reserved_space->getDisk();
if (supportsReplication() && disk->supportZeroCopyReplication() && settings->allow_remote_fs_zero_copy_replication)
{
/// If we acquired lock than let's try to move. After one
/// replica will actually move the part from disk to some
/// zero-copy storage other replicas will just fetch
/// metainformation.
if (auto lock = tryCreateZeroCopyExclusiveLock(moving_part.part->name, disk); lock)
/// This loop is not endless, if shutdown called/connection failed/replica became readonly
/// we will return true from waitZeroCopyLock and createZeroCopyLock will return nullopt.
while (true)
{
cloned_part = parts_mover.clonePart(moving_part);
parts_mover.swapClonedPart(cloned_part);
}
else
{
/// Move will be retried but with backoff.
LOG_DEBUG(log, "Move of part {} postponed, because zero copy mode enabled and someone other moving this part right now", moving_part.part->name);
result = MovePartsOutcome::MoveWasPostponedBecauseOfZeroCopy;
continue;
/// If we acquired lock than let's try to move. After one
/// replica will actually move the part from disk to some
/// zero-copy storage other replicas will just fetch
/// metainformation.
if (auto lock = tryCreateZeroCopyExclusiveLock(moving_part.part->name, disk); lock)
{
if (lock->isLocked())
{
cloned_part = parts_mover.clonePart(moving_part);
parts_mover.swapClonedPart(cloned_part);
break;
}
else if (wait_for_move_if_zero_copy)
{
LOG_DEBUG(log, "Other replica is working on move of {}, will wait until lock disappear", moving_part.part->name);
/// Wait and checks not only for timeout but also for shutdown and so on.
while (!waitZeroCopyLockToDisappear(*lock, 3000))
{
LOG_DEBUG(log, "Waiting until some replica will move {} and zero copy lock disappear", moving_part.part->name);
}
}
else
break;
}
else
{
/// Move will be retried but with backoff.
LOG_DEBUG(log, "Move of part {} postponed, because zero copy mode enabled and someone other moving this part right now", moving_part.part->name);
result = MovePartsOutcome::MoveWasPostponedBecauseOfZeroCopy;
break;
}
}
}
else /// Ordinary move as it should be

View File

@ -1456,7 +1456,7 @@ private:
using CurrentlyMovingPartsTaggerPtr = std::shared_ptr<CurrentlyMovingPartsTagger>;
/// Move selected parts to corresponding disks
MovePartsOutcome moveParts(const CurrentlyMovingPartsTaggerPtr & moving_tagger);
MovePartsOutcome moveParts(const CurrentlyMovingPartsTaggerPtr & moving_tagger, bool wait_for_move_if_zero_copy=false);
/// Select parts for move and disks for them. Used in background moving processes.
CurrentlyMovingPartsTaggerPtr selectPartsForMove();
@ -1511,6 +1511,7 @@ private:
/// Create zero-copy exclusive lock for part and disk. Useful for coordination of
/// distributed operations which can lead to data duplication. Implemented only in ReplicatedMergeTree.
virtual std::optional<ZeroCopyLock> tryCreateZeroCopyExclusiveLock(const String &, const DiskPtr &) { return std::nullopt; }
virtual bool waitZeroCopyLockToDisappear(const ZeroCopyLock &, size_t) { return false; }
/// Remove parts from disk calling part->remove(). Can do it in parallel in case of big set of parts and enabled settings.
/// If we fail to remove some part and throw_on_error equal to `true` will throw an exception on the first failed part.

View File

@ -46,6 +46,10 @@ void MergeTreeDataPartChecksum::checkEqual(const MergeTreeDataPartChecksum & rhs
void MergeTreeDataPartChecksum::checkSize(const IDataPartStorage & storage, const String & name) const
{
/// Skip inverted index files, these have a default MergeTreeDataPartChecksum with file_size == 0
if (name.ends_with(".gin_dict") || name.ends_with(".gin_post") || name.ends_with(".gin_seg") || name.ends_with(".gin_sid"))
return;
if (!storage.exists(name))
throw Exception(ErrorCodes::FILE_DOESNT_EXIST, "{} doesn't exist", fs::path(storage.getRelativePath()) / name);

View File

@ -127,9 +127,11 @@ ReplicatedMergeMutateTaskBase::PrepareResult MutateFromLogEntryTask::prepare()
zero_copy_lock = storage.tryCreateZeroCopyExclusiveLock(entry.new_part_name, disk);
if (!zero_copy_lock)
if (!zero_copy_lock || !zero_copy_lock->isLocked())
{
storage.watchZeroCopyLock(entry.new_part_name, disk);
LOG_DEBUG(log, "Mutation of part {} started by some other replica, will wait it and mutated merged part", entry.new_part_name);
return PrepareResult{
.prepared_successfully = false,
.need_to_check_missing_part_in_fetch = false,

View File

@ -30,7 +30,9 @@ public:
UInt64 getPriority() override { return priority; }
private:
ReplicatedMergeMutateTaskBase::PrepareResult prepare() override;
bool finalize(ReplicatedMergeMutateTaskBase::PartLogWriter write_part_log) override;
bool executeInnerTask() override

View File

@ -1370,6 +1370,7 @@ bool ReplicatedMergeTreeQueue::shouldExecuteLogEntry(
{
constexpr auto fmt_string = "Not executing merge/mutation for the part {}, waiting for {} to execute it and will fetch after.";
out_postpone_reason = fmt::format(fmt_string, entry.new_part_name, replica_to_execute_merge);
LOG_TEST(log, fmt_string, entry.new_part_name, replica_to_execute_merge);
return false;
}
}

View File

@ -14,6 +14,7 @@ struct ZeroCopyLock
{
ZeroCopyLock(const zkutil::ZooKeeperPtr & zookeeper, const std::string & lock_path, const std::string & lock_message);
bool isLocked() const { return lock->isLocked(); }
/// Actual lock
std::unique_ptr<zkutil::ZooKeeperLock> lock;
};

View File

@ -15,6 +15,7 @@ using namespace DB;
namespace CurrentMetrics
{
extern const Metric BackgroundMergesAndMutationsPoolTask;
extern const Metric BackgroundMergesAndMutationsPoolSize;
}
std::random_device device;
@ -102,7 +103,8 @@ TEST(Executor, Simple)
"GTest",
1, // threads
100, // max_tasks
CurrentMetrics::BackgroundMergesAndMutationsPoolTask
CurrentMetrics::BackgroundMergesAndMutationsPoolTask,
CurrentMetrics::BackgroundMergesAndMutationsPoolSize
);
String schedule; // mutex is not required because we have a single worker
@ -144,7 +146,8 @@ TEST(Executor, RemoveTasks)
"GTest",
tasks_kinds,
tasks_kinds * batch,
CurrentMetrics::BackgroundMergesAndMutationsPoolTask
CurrentMetrics::BackgroundMergesAndMutationsPoolTask,
CurrentMetrics::BackgroundMergesAndMutationsPoolSize
);
for (size_t i = 0; i < batch; ++i)
@ -184,7 +187,8 @@ TEST(Executor, RemoveTasksStress)
"GTest",
tasks_kinds,
tasks_kinds * batch * (schedulers_count + removers_count),
CurrentMetrics::BackgroundMergesAndMutationsPoolTask
CurrentMetrics::BackgroundMergesAndMutationsPoolTask,
CurrentMetrics::BackgroundMergesAndMutationsPoolSize
);
std::barrier barrier(schedulers_count + removers_count);
@ -234,7 +238,8 @@ TEST(Executor, UpdatePolicy)
"GTest",
1, // threads
100, // max_tasks
CurrentMetrics::BackgroundMergesAndMutationsPoolTask
CurrentMetrics::BackgroundMergesAndMutationsPoolTask,
CurrentMetrics::BackgroundMergesAndMutationsPoolSize
);
String schedule; // mutex is not required because we have a single worker

View File

@ -8555,7 +8555,6 @@ String StorageReplicatedMergeTree::getSharedDataReplica(
return best_replica;
}
Strings StorageReplicatedMergeTree::getZeroCopyPartPath(
const MergeTreeSettings & settings, const std::string & disk_type, const String & table_uuid,
const String & part_name, const String & zookeeper_path_old)
@ -8575,18 +8574,65 @@ Strings StorageReplicatedMergeTree::getZeroCopyPartPath(
return res;
}
bool StorageReplicatedMergeTree::checkZeroCopyLockExists(const String & part_name, const DiskPtr & disk, String & lock_replica)
void StorageReplicatedMergeTree::watchZeroCopyLock(const String & part_name, const DiskPtr & disk)
{
auto path = getZeroCopyPartPath(part_name, disk);
if (path)
{
/// FIXME
auto zookeeper = getZooKeeper();
auto lock_path = fs::path(*path) / "part_exclusive_lock";
if (getZooKeeper()->tryGet(lock_path, lock_replica))
LOG_TEST(log, "Adding zero-copy lock on {}", lock_path);
/// Looks ugly, but we cannot touch any storage fields inside Watch callback
/// because it could lead to use-after-free (storage dropped and watch triggered)
std::shared_ptr<std::atomic<bool>> flag = std::make_shared<std::atomic<bool>>(true);
std::string replica;
bool exists = zookeeper->tryGetWatch(lock_path, replica, nullptr, [flag] (const Coordination::WatchResponse &)
{
return true;
*flag = false;
});
if (exists)
{
std::lock_guard lock(existing_zero_copy_locks_mutex);
existing_zero_copy_locks[lock_path] = ZeroCopyLockDescription{replica, flag};
}
}
}
bool StorageReplicatedMergeTree::checkZeroCopyLockExists(const String & part_name, const DiskPtr & disk, String & lock_replica)
{
auto path = getZeroCopyPartPath(part_name, disk);
std::lock_guard lock(existing_zero_copy_locks_mutex);
/// Cleanup abandoned locks during each check. The set of locks is small and this is quite fast loop.
/// Also it's hard to properly remove locks because we can execute replication queue
/// in arbitrary order and some parts can be replaced by covering parts without merges.
for (auto it = existing_zero_copy_locks.begin(); it != existing_zero_copy_locks.end();)
{
if (*it->second.exists)
++it;
else
{
LOG_TEST(log, "Removing zero-copy lock on {}", it->first);
it = existing_zero_copy_locks.erase(it);
}
}
if (path)
{
auto lock_path = fs::path(*path) / "part_exclusive_lock";
if (auto it = existing_zero_copy_locks.find(lock_path); it != existing_zero_copy_locks.end())
{
lock_replica = it->second.replica;
if (*it->second.exists)
{
LOG_TEST(log, "Zero-copy lock on path {} exists", it->first);
return true;
}
}
LOG_TEST(log, "Zero-copy lock on path {} doesn't exist", lock_path);
}
return false;
}
@ -8599,11 +8645,37 @@ std::optional<String> StorageReplicatedMergeTree::getZeroCopyPartPath(const Stri
return getZeroCopyPartPath(*getSettings(), toString(disk->getDataSourceDescription().type), getTableSharedID(), part_name, zookeeper_path)[0];
}
bool StorageReplicatedMergeTree::waitZeroCopyLockToDisappear(const ZeroCopyLock & lock, size_t milliseconds_to_wait)
{
if (lock.isLocked())
return true;
if (partial_shutdown_called.load(std::memory_order_relaxed))
return true;
auto lock_path = lock.lock->getLockPath();
zkutil::ZooKeeperPtr zookeeper = tryGetZooKeeper();
if (!zookeeper)
return true;
Stopwatch time_waiting;
const auto & stop_waiting = [&]()
{
bool timeout_exceeded = milliseconds_to_wait < time_waiting.elapsedMilliseconds();
return partial_shutdown_called.load(std::memory_order_relaxed) || is_readonly.load(std::memory_order_relaxed) || timeout_exceeded;
};
return zookeeper->waitForDisappear(lock_path, stop_waiting);
}
std::optional<ZeroCopyLock> StorageReplicatedMergeTree::tryCreateZeroCopyExclusiveLock(const String & part_name, const DiskPtr & disk)
{
if (!disk || !disk->supportZeroCopyReplication())
return std::nullopt;
if (partial_shutdown_called.load(std::memory_order_relaxed) || is_readonly.load(std::memory_order_relaxed))
return std::nullopt;
zkutil::ZooKeeperPtr zookeeper = tryGetZooKeeper();
if (!zookeeper)
return std::nullopt;
@ -8616,10 +8688,8 @@ std::optional<ZeroCopyLock> StorageReplicatedMergeTree::tryCreateZeroCopyExclusi
/// Create actual lock
ZeroCopyLock lock(zookeeper, zc_zookeeper_path, replica_name);
if (lock.lock->tryLock())
return lock;
else
return std::nullopt;
lock.lock->tryLock();
return lock;
}
String StorageReplicatedMergeTree::findReplicaHavingPart(

View File

@ -482,6 +482,16 @@ private:
std::mutex last_broken_disks_mutex;
std::set<String> last_broken_disks;
std::mutex existing_zero_copy_locks_mutex;
struct ZeroCopyLockDescription
{
std::string replica;
std::shared_ptr<std::atomic<bool>> exists;
};
std::unordered_map<String, ZeroCopyLockDescription> existing_zero_copy_locks;
static std::optional<QueryPipeline> distributedWriteFromClusterStorage(const std::shared_ptr<IStorageCluster> & src_storage_cluster, const ASTInsertQuery & query, ContextPtr context);
template <class Func>
@ -859,13 +869,19 @@ private:
void createTableSharedID() const;
bool checkZeroCopyLockExists(const String & part_name, const DiskPtr & disk, String & lock_replica);
void watchZeroCopyLock(const String & part_name, const DiskPtr & disk);
std::optional<String> getZeroCopyPartPath(const String & part_name, const DiskPtr & disk);
/// Create ephemeral lock in zookeeper for part and disk which support zero copy replication.
/// If somebody already holding the lock -- return std::nullopt.
/// If no connection to zookeeper, shutdown, readonly -- return std::nullopt.
/// If somebody already holding the lock -- return unlocked ZeroCopyLock object (not std::nullopt).
std::optional<ZeroCopyLock> tryCreateZeroCopyExclusiveLock(const String & part_name, const DiskPtr & disk) override;
/// Wait for ephemral lock to disappear. Return true if table shutdown/readonly/timeout exceeded, etc.
/// Or if node actually disappeared.
bool waitZeroCopyLockToDisappear(const ZeroCopyLock & lock, size_t milliseconds_to_wait) override;
void startupImpl(bool from_attach_thread);
};

View File

@ -0,0 +1,64 @@
#include <Storages/System/StorageSystemMarkedDroppedTables.h>
#include <DataTypes/DataTypeArray.h>
#include <DataTypes/DataTypeString.h>
#include <DataTypes/DataTypesNumber.h>
#include <DataTypes/DataTypeUUID.h>
#include <Columns/ColumnString.h>
#include <Columns/ColumnsNumber.h>
#include <Interpreters/Context.h>
#include <Interpreters/DatabaseCatalog.h>
#include "base/types.h"
namespace DB
{
NamesAndTypesList StorageSystemMarkedDroppedTables::getNamesAndTypes()
{
NamesAndTypesList names_and_types{
{"index", std::make_shared<DataTypeUInt32>()},
{"database", std::make_shared<DataTypeString>()},
{"table", std::make_shared<DataTypeString>()},
{"uuid", std::make_shared<DataTypeUUID>()},
{"engine", std::make_shared<DataTypeString>()},
{"metadata_dropped_path", std::make_shared<DataTypeString>()},
{"table_dropped_time", std::make_shared<DataTypeDateTime>()},
};
return names_and_types;
}
void StorageSystemMarkedDroppedTables::fillData(MutableColumns & res_columns, ContextPtr, const SelectQueryInfo &) const
{
auto tables_mark_dropped = DatabaseCatalog::instance().getTablesMarkedDropped();
size_t index = 0;
auto & column_index = assert_cast<ColumnUInt32 &>(*res_columns[index++]);
auto & column_database = assert_cast<ColumnString &>(*res_columns[index++]);
auto & column_table = assert_cast<ColumnString &>(*res_columns[index++]);
auto & column_uuid = assert_cast<ColumnUUID &>(*res_columns[index++]).getData();
auto & column_engine = assert_cast<ColumnString &>(*res_columns[index++]);
auto & column_metadata_dropped_path = assert_cast<ColumnString &>(*res_columns[index++]);
auto & column_table_dropped_time = assert_cast<ColumnUInt32 &>(*res_columns[index++]);
auto add_row = [&](UInt32 idx, const DatabaseCatalog::TableMarkedAsDropped & table_mark_dropped)
{
column_index.insertValue(idx);
column_database.insertData(table_mark_dropped.table_id.getDatabaseName().data(), table_mark_dropped.table_id.getDatabaseName().size());
column_table.insertData(table_mark_dropped.table_id.getTableName().data(), table_mark_dropped.table_id.getTableName().size());
column_uuid.push_back(table_mark_dropped.table_id.uuid.toUnderType());
if (table_mark_dropped.table)
column_engine.insertData(table_mark_dropped.table->getName().data(), table_mark_dropped.table->getName().size());
else
column_engine.insertData({}, 0);
column_metadata_dropped_path.insertData(table_mark_dropped.metadata_path.data(), table_mark_dropped.metadata_path.size());
column_table_dropped_time.insertValue(static_cast<UInt32>(table_mark_dropped.drop_time));
};
UInt32 idx = 0;
for (const auto & table_mark_dropped : tables_mark_dropped)
add_row(idx++, table_mark_dropped);
}
}

View File

@ -0,0 +1,20 @@
#pragma once
#include <Storages/System/IStorageSystemOneBlock.h>
namespace DB
{
class StorageSystemMarkedDroppedTables final : public IStorageSystemOneBlock<StorageSystemMarkedDroppedTables>
{
public:
std::string getName() const override { return "SystemMarkedDroppedTables"; }
static NamesAndTypesList getNamesAndTypes();
protected:
using IStorageSystemOneBlock::IStorageSystemOneBlock;
void fillData(MutableColumns & res_columns, ContextPtr context, const SelectQueryInfo &) const override;
};
}

View File

@ -79,6 +79,7 @@
#include <Storages/System/StorageSystemRemoteDataPaths.h>
#include <Storages/System/StorageSystemCertificates.h>
#include <Storages/System/StorageSystemSchemaInferenceCache.h>
#include <Storages/System/StorageSystemMarkedDroppedTables.h>
#ifdef OS_LINUX
#include <Storages/System/StorageSystemStackTrace.h>
@ -140,6 +141,7 @@ void attachSystemTablesLocal(ContextPtr context, IDatabase & system_database)
attach<StorageSystemTimeZones>(context, system_database, "time_zones");
attach<StorageSystemBackups>(context, system_database, "backups");
attach<StorageSystemSchemaInferenceCache>(context, system_database, "schema_inference_cache");
attach<StorageSystemMarkedDroppedTables>(context, system_database, "marked_dropped_tables");
#ifdef OS_LINUX
attach<StorageSystemStackTrace>(context, system_database, "stack_trace");
#endif

View File

@ -437,7 +437,7 @@ class FailureReason(enum.Enum):
SERVER_DIED = "server died"
EXIT_CODE = "return code: "
STDERR = "having stderror: "
EXCEPTION = "having having exception in stdout: "
EXCEPTION = "having exception in stdout: "
RESULT_DIFF = "result differs with reference: "
TOO_LONG = "Test runs too long (> 60s). Make it faster."
INTERNAL_QUERY_FAIL = "Internal query (CREATE/DROP DATABASE) failed:"

View File

@ -147,6 +147,18 @@ def test_predefined_query_handler():
assert b"max_final_threads\t1\nmax_threads\t1\n" == res2.content
assert "application/generic+one" == res2.headers["content-type"]
cluster.instance.query(
"CREATE TABLE test_table (id UInt32, data String) Engine=TinyLog"
)
res3 = cluster.instance.http_request(
"test_predefined_handler_post_body?id=100",
method="POST",
data="TEST".encode("utf8"),
)
assert res3.status_code == 200
assert cluster.instance.query("SELECT * FROM test_table") == "100\tTEST\n"
cluster.instance.query("DROP TABLE test_table")
def test_fixed_static_handler():
with contextlib.closing(

View File

@ -21,5 +21,13 @@
<content_type>application/generic+one</content_type>
</handler>
</rule>
<rule>
<methods>POST</methods>
<url>/test_predefined_handler_post_body</url>
<handler>
<type>predefined_query_handler</type>
<query>INSERT INTO test_table(id, data) SELECT {id:UInt32}, {_request_body:String}</query>
</handler>
</rule>
</http_handlers>
</clickhouse>

View File

@ -1,4 +1,8 @@
<clickhouse>
<logger>
<level>test</level>
</logger>
<storage_configuration>
<disks>
<s3>
@ -21,6 +25,13 @@
</main>
</volumes>
</s3>
<s3_only>
<volumes>
<main>
<disk>s3</disk>
</main>
</volumes>
</s3_only>
</policies>
</storage_configuration>

Some files were not shown because too many files have changed in this diff Show More