Merge branch 'master' of github.com:ClickHouse/ClickHouse into better-stress-upgrade-checks

This commit is contained in:
avogar 2024-03-18 15:27:58 +00:00
commit aadc610739
316 changed files with 7907 additions and 7478 deletions

View File

@ -43,8 +43,7 @@ jobs:
runs-on: [self-hosted, '${{inputs.runner_type}}']
steps:
- name: Check out repository code
# WIP: temporary try commit with limited perallelization of checkout
uses: ClickHouse/checkout@0be3f7b3098bae494d3ef5d29d2e0676fb606232
uses: ClickHouse/checkout@v1
with:
clear-repository: true
ref: ${{ fromJson(inputs.data).git_ref }}

View File

@ -110,11 +110,6 @@ endif()
# - sanitize.cmake
add_library(global-libs INTERFACE)
# We don't want to instrument everything with fuzzer, but only specific targets (see below),
# also, since we build our own llvm, we specifically don't want to instrument
# libFuzzer library itself - it would result in infinite recursion
#include (cmake/fuzzer.cmake)
include (cmake/sanitize.cmake)
option(ENABLE_COLORED_BUILD "Enable colors in compiler output" ON)
@ -554,7 +549,9 @@ if (ENABLE_RUST)
endif()
endif()
if (CMAKE_BUILD_TYPE_UC STREQUAL "RELWITHDEBINFO" AND NOT SANITIZE AND NOT SANITIZE_COVERAGE AND OS_LINUX AND (ARCH_AMD64 OR ARCH_AARCH64))
if (CMAKE_BUILD_TYPE_UC STREQUAL "RELWITHDEBINFO"
AND NOT SANITIZE AND NOT SANITIZE_COVERAGE AND NOT ENABLE_FUZZING
AND OS_LINUX AND (ARCH_AMD64 OR ARCH_AARCH64))
set(CHECK_LARGE_OBJECT_SIZES_DEFAULT ON)
else ()
set(CHECK_LARGE_OBJECT_SIZES_DEFAULT OFF)
@ -577,10 +574,7 @@ if (FUZZER)
if (NOT(target_type STREQUAL "INTERFACE_LIBRARY" OR target_type STREQUAL "UTILITY"))
target_compile_options(${target} PRIVATE "-fsanitize=fuzzer-no-link")
endif()
# clickhouse fuzzer isn't working correctly
# initial PR https://github.com/ClickHouse/ClickHouse/pull/27526
#if (target MATCHES ".+_fuzzer" OR target STREQUAL "clickhouse")
if (target_type STREQUAL "EXECUTABLE" AND target MATCHES ".+_fuzzer")
if (target_type STREQUAL "EXECUTABLE" AND (target MATCHES ".+_fuzzer" OR target STREQUAL "clickhouse"))
message(STATUS "${target} instrumented with fuzzer")
target_link_libraries(${target} PUBLIC ch_contrib::fuzzer)
# Add to fuzzers bundle

View File

@ -50,9 +50,6 @@ std::optional<uint64_t> getCgroupsV2MemoryLimit()
}
/** Returns the size of physical memory (RAM) in bytes.
* Returns 0 on unsupported platform
*/
uint64_t getMemoryAmountOrZero()
{
int64_t num_pages = sysconf(_SC_PHYS_PAGES);

View File

@ -2,11 +2,10 @@
#include <cstdint>
/** Returns the size of physical memory (RAM) in bytes.
* Returns 0 on unsupported platform or if it cannot determine the size of physical memory.
*/
/// Returns the size in bytes of physical memory (RAM) available to the process. The value can
/// be smaller than the total available RAM available to the system due to cgroups settings.
/// Returns 0 on unsupported platform or if it cannot determine the size of physical memory.
uint64_t getMemoryAmountOrZero();
/** Throws exception if it cannot determine the size of physical memory.
*/
/// Throws exception if it cannot determine the size of physical memory.
uint64_t getMemoryAmount();

View File

@ -1,17 +0,0 @@
# see ./CMakeLists.txt for variable declaration
if (FUZZER)
if (FUZZER STREQUAL "libfuzzer")
# NOTE: Eldar Zaitov decided to name it "libfuzzer" instead of "fuzzer" to keep in mind another possible fuzzer backends.
# NOTE: no-link means that all the targets are built with instrumentation for fuzzer, but only some of them
# (tests) have entry point for fuzzer and it's not checked.
set (CMAKE_CXX_FLAGS "${CMAKE_CXX_FLAGS} ${SAN_FLAGS} -fsanitize=fuzzer-no-link -DFUZZER=1")
set (CMAKE_C_FLAGS "${CMAKE_C_FLAGS} ${SAN_FLAGS} -fsanitize=fuzzer-no-link -DFUZZER=1")
# NOTE: oss-fuzz can change LIB_FUZZING_ENGINE variable
if (NOT LIB_FUZZING_ENGINE)
set (LIB_FUZZING_ENGINE "-fsanitize=fuzzer")
endif ()
else ()
message (FATAL_ERROR "Unknown fuzzer type: ${FUZZER}")
endif ()
endif()

View File

@ -6,6 +6,11 @@ sidebar_label: JDBC
# JDBC
:::note
clickhouse-jdbc-bridge contains experimental codes and is no longer supported. It may contain reliability issues and security vulnerabilities. Use it at your own risk.
ClickHouse recommend using built-in table functions in ClickHouse which provide a better alternative for ad-hoc querying scenarios (Postgres, MySQL, MongoDB, etc).
:::
Allows ClickHouse to connect to external databases via [JDBC](https://en.wikipedia.org/wiki/Java_Database_Connectivity).
To implement the JDBC connection, ClickHouse uses the separate program [clickhouse-jdbc-bridge](https://github.com/ClickHouse/clickhouse-jdbc-bridge) that should run as a daemon.

View File

@ -21,3 +21,79 @@ When restarting a server, data disappears from the table and the table becomes e
Normally, using this table engine is not justified. However, it can be used for tests, and for tasks where maximum speed is required on a relatively small number of rows (up to approximately 100,000,000).
The Memory engine is used by the system for temporary tables with external query data (see the section “External data for processing a query”), and for implementing `GLOBAL IN` (see the section “IN operators”).
Upper and lower bounds can be specified to limit Memory engine table size, effectively allowing it to act as a circular buffer (see [Engine Parameters](#engine-parameters)).
## Engine Parameters {#engine-parameters}
- `min_bytes_to_keep` — Minimum bytes to keep when memory table is size-capped.
- Default value: `0`
- Requires `max_bytes_to_keep`
- `max_bytes_to_keep` — Maximum bytes to keep within memory table where oldest rows are deleted on each insertion (i.e circular buffer). Max bytes can exceed the stated limit if the oldest batch of rows to remove falls under the `min_bytes_to_keep` limit when adding a large block.
- Default value: `0`
- `min_rows_to_keep` — Minimum rows to keep when memory table is size-capped.
- Default value: `0`
- Requires `max_rows_to_keep`
- `max_rows_to_keep` — Maximum rows to keep within memory table where oldest rows are deleted on each insertion (i.e circular buffer). Max rows can exceed the stated limit if the oldest batch of rows to remove falls under the `min_rows_to_keep` limit when adding a large block.
- Default value: `0`
## Usage {#usage}
**Initialize settings**
``` sql
CREATE TABLE memory (i UInt32) ENGINE = Memory SETTINGS min_rows_to_keep = 100, max_rows_to_keep = 1000;
```
**Note:** Both `bytes` and `rows` capping parameters can be set at the same time, however, the lower bounds of `max` and `min` will be adhered to.
## Examples {#examples}
``` sql
CREATE TABLE memory (i UInt32) ENGINE = Memory SETTINGS min_bytes_to_keep = 4096, max_bytes_to_keep = 16384;
/* 1. testing oldest block doesn't get deleted due to min-threshold - 3000 rows */
INSERT INTO memory SELECT * FROM numbers(0, 1600); -- 8'192 bytes
/* 2. adding block that doesn't get deleted */
INSERT INTO memory SELECT * FROM numbers(1000, 100); -- 1'024 bytes
/* 3. testing oldest block gets deleted - 9216 bytes - 1100 */
INSERT INTO memory SELECT * FROM numbers(9000, 1000); -- 8'192 bytes
/* 4. checking a very large block overrides all */
INSERT INTO memory SELECT * FROM numbers(9000, 10000); -- 65'536 bytes
SELECT total_bytes, total_rows FROM system.tables WHERE name = 'memory' and database = currentDatabase();
```
``` text
┌─total_bytes─┬─total_rows─┐
│ 65536 │ 10000 │
└─────────────┴────────────┘
```
also, for rows:
``` sql
CREATE TABLE memory (i UInt32) ENGINE = Memory SETTINGS min_rows_to_keep = 4000, max_rows_to_keep = 10000;
/* 1. testing oldest block doesn't get deleted due to min-threshold - 3000 rows */
INSERT INTO memory SELECT * FROM numbers(0, 1600); -- 1'600 rows
/* 2. adding block that doesn't get deleted */
INSERT INTO memory SELECT * FROM numbers(1000, 100); -- 100 rows
/* 3. testing oldest block gets deleted - 9216 bytes - 1100 */
INSERT INTO memory SELECT * FROM numbers(9000, 1000); -- 1'000 rows
/* 4. checking a very large block overrides all */
INSERT INTO memory SELECT * FROM numbers(9000, 10000); -- 10'000 rows
SELECT total_bytes, total_rows FROM system.tables WHERE name = 'memory' and database = currentDatabase();
```
``` text
┌─total_bytes─┬─total_rows─┐
│ 65536 │ 10000 │
└─────────────┴────────────┘
```

View File

@ -55,7 +55,7 @@ CREATE TABLE criteo_log (
) ENGINE = Log;
```
Download the data:
Insert the data:
``` bash
$ for i in {00..23}; do echo $i; zcat datasets/criteo/day_${i#0}.gz | sed -r 's/^/2000-01-'${i/00/24}'\t/' | clickhouse-client --host=example-perftest01j --query="INSERT INTO criteo_log FORMAT TabSeparated"; done

View File

@ -95,9 +95,11 @@ which is equal to
## Substituting Configuration {#substitution}
The config can also define “substitutions”. If an element has the `incl` attribute, the corresponding substitution from the file will be used as the value. By default, the path to the file with substitutions is `/etc/metrika.xml`. This can be changed in the [include_from](../operations/server-configuration-parameters/settings.md#server_configuration_parameters-include_from) element in the server config. The substitution values are specified in `/clickhouse/substitution_name` elements in this file. If a substitution specified in `incl` does not exist, it is recorded in the log. To prevent ClickHouse from logging missing substitutions, specify the `optional="true"` attribute (for example, settings for [macros](../operations/server-configuration-parameters/settings.md#macros)).
The config can define substitutions. There are two types of substitutions:
If you want to replace an entire element with a substitution use `include` as the element name.
- If an element has the `incl` attribute, the corresponding substitution from the file will be used as the value. By default, the path to the file with substitutions is `/etc/metrika.xml`. This can be changed in the [include_from](../operations/server-configuration-parameters/settings.md#server_configuration_parameters-include_from) element in the server config. The substitution values are specified in `/clickhouse/substitution_name` elements in this file. If a substitution specified in `incl` does not exist, it is recorded in the log. To prevent ClickHouse from logging missing substitutions, specify the `optional="true"` attribute (for example, settings for [macros](../operations/server-configuration-parameters/settings.md#macros)).
- If you want to replace an entire element with a substitution, use `include` as the element name. Substitutions can also be performed from ZooKeeper by specifying attribute `from_zk = "/path/to/node"`. In this case, the element value is replaced with the contents of the Zookeeper node at `/path/to/node`. This also works with you store an entire XML subtree as a Zookeeper node, it will be fully inserted into the source element.
XML substitution example:
@ -114,7 +116,7 @@ XML substitution example:
</clickhouse>
```
Substitutions can also be performed from ZooKeeper. To do this, specify the attribute `from_zk = "/path/to/node"`. The element value is replaced with the contents of the node at `/path/to/node` in ZooKeeper. You can also put an entire XML subtree on the ZooKeeper node, and it will be fully inserted into the source element.
If you want to merge the substituting content with the existing configuration instead of appending you can use attribute `merge="true"`, for example: `<include from_zk="/some_path" merge="true">`. In this case, the existing configuration will be merged with the content from the substitution and the existing configuration settings will be replaced with values from substitution.
## Encrypting and Hiding Configuration {#encryption}

View File

@ -933,9 +933,9 @@ Hard limit is configured via system tools
## database_atomic_delay_before_drop_table_sec {#database_atomic_delay_before_drop_table_sec}
Sets the delay before remove table data in seconds. If the query has `SYNC` modifier, this setting is ignored.
The delay before a table data is dropped in seconds. If the `DROP TABLE` query has a `SYNC` modifier, this setting is ignored.
Default value: `480` (8 minute).
Default value: `480` (8 minutes).
## database_catalog_unused_dir_hide_timeout_sec {#database_catalog_unused_dir_hide_timeout_sec}

View File

@ -4337,6 +4337,18 @@ Possible values:
Default value: `0`.
## function_locate_has_mysql_compatible_argument_order {#function-locate-has-mysql-compatible-argument-order}
Controls the order of arguments in function [locate](../../sql-reference/functions/string-search-functions.md#locate).
Possible values:
- 0 — Function `locate` accepts arguments `(haystack, needle[, start_pos])`.
- 1 — Function `locate` accepts arguments `(needle, haystack, [, start_pos])` (MySQL-compatible behavior)
Default value: `1`.
## date_time_overflow_behavior {#date_time_overflow_behavior}
Defines the behavior when [Date](../../sql-reference/data-types/date.md), [Date32](../../sql-reference/data-types/date32.md), [DateTime](../../sql-reference/data-types/datetime.md), [DateTime64](../../sql-reference/data-types/datetime64.md) or integers are converted into Date, Date32, DateTime or DateTime64 but the value cannot be represented in the result type.

View File

@ -30,7 +30,6 @@ position(haystack, needle[, start_pos])
Alias:
- `position(needle IN haystack)`
- `locate(haystack, needle[, start_pos])`.
**Arguments**
@ -49,7 +48,7 @@ If substring `needle` is empty, these rules apply:
- if `start_pos >= 1` and `start_pos <= length(haystack) + 1`: return `start_pos`
- otherwise: return `0`
The same rules also apply to functions `positionCaseInsensitive`, `positionUTF8` and `positionCaseInsensitiveUTF8`
The same rules also apply to functions `locate`, `positionCaseInsensitive`, `positionUTF8` and `positionCaseInsensitiveUTF8`.
Type: `Integer`.
@ -114,6 +113,21 @@ SELECT
└─────────────────────┴────────────────────────┴────────────────────────┴────────────────────────┴────────────────────────┴────────────────────────┴────────────────────────┘
```
## locate
Like [position](#position) but with arguments `haystack` and `locate` switched.
The behavior of this function depends on the ClickHouse version:
- in versions < v24.3, `locate` was an alias of function `position` and accepted arguments `(haystack, needle[, start_pos])`.
- in versions >= 24.3,, `locate` is an individual function (for better compatibility with MySQL) and accepts arguments `(needle, haystack[, start_pos])`. The previous behavior
can be restored using setting [function_locate_has_mysql_compatible_argument_order = false](../../operations/settings/settings.md#function-locate-has-mysql-compatible-argument-order);
**Syntax**
``` sql
locate(needle, haystack[, start_pos])
```
## positionCaseInsensitive
Like [position](#position) but searches case-insensitively.

View File

@ -13,13 +13,6 @@ a system table called `system.dropped_tables`.
If you have a materialized view without a `TO` clause associated with the dropped table, then you will also have to UNDROP the inner table of that view.
:::note
UNDROP TABLE is experimental. To use it add this setting:
```sql
set allow_experimental_undrop_table_query = 1;
```
:::
:::tip
Also see [DROP TABLE](/docs/en/sql-reference/statements/drop.md)
:::
@ -32,60 +25,53 @@ UNDROP TABLE [db.]name [UUID '<uuid>'] [ON CLUSTER cluster]
**Example**
``` sql
set allow_experimental_undrop_table_query = 1;
```
```sql
CREATE TABLE undropMe
CREATE TABLE tab
(
`id` UInt8
)
ENGINE = MergeTree
ORDER BY id
```
ORDER BY id;
DROP TABLE tab;
```sql
DROP TABLE undropMe
```
```sql
SELECT *
FROM system.dropped_tables
FORMAT Vertical
FORMAT Vertical;
```
```response
Row 1:
──────
index: 0
database: default
table: undropMe
table: tab
uuid: aa696a1a-1d70-4e60-a841-4c80827706cc
engine: MergeTree
metadata_dropped_path: /var/lib/clickhouse/metadata_dropped/default.undropMe.aa696a1a-1d70-4e60-a841-4c80827706cc.sql
metadata_dropped_path: /var/lib/clickhouse/metadata_dropped/default.tab.aa696a1a-1d70-4e60-a841-4c80827706cc.sql
table_dropped_time: 2023-04-05 14:12:12
1 row in set. Elapsed: 0.001 sec.
```
```sql
UNDROP TABLE undropMe
```
```response
Ok.
```
```sql
UNDROP TABLE tab;
SELECT *
FROM system.dropped_tables
FORMAT Vertical
```
FORMAT Vertical;
```response
Ok.
0 rows in set. Elapsed: 0.001 sec.
```
```sql
DESCRIBE TABLE undropMe
FORMAT Vertical
DESCRIBE TABLE tab
FORMAT Vertical;
```
```response
Row 1:
──────

View File

@ -6,6 +6,11 @@ sidebar_label: jdbc
# jdbc
:::note
clickhouse-jdbc-bridge contains experimental codes and is no longer supported. It may contain reliability issues and security vulnerabilities. Use it at your own risk.
ClickHouse recommend using built-in table functions in ClickHouse which provide a better alternative for ad-hoc querying scenarios (Postgres, MySQL, MongoDB, etc).
:::
`jdbc(datasource, schema, table)` - returns table that is connected via JDBC driver.
This table function requires separate [clickhouse-jdbc-bridge](https://github.com/ClickHouse/clickhouse-jdbc-bridge) program to be running.

View File

@ -143,7 +143,7 @@ int mainEntryClickHouseCompressor(int argc, char ** argv)
ParserCodec codec_parser;
std::string codecs_line = boost::algorithm::join(codecs, ",");
auto ast = parseQuery(codec_parser, "(" + codecs_line + ")", 0, DBMS_DEFAULT_MAX_PARSER_DEPTH);
auto ast = parseQuery(codec_parser, "(" + codecs_line + ")", 0, DBMS_DEFAULT_MAX_PARSER_DEPTH, DBMS_DEFAULT_MAX_PARSER_BACKTRACKS);
codec = CompressionCodecFactory::instance().get(ast, nullptr);
}
else

View File

@ -234,7 +234,7 @@ int mainEntryClickHouseFormat(int argc, char ** argv)
size_t approx_query_length = multiple ? find_first_symbols<';'>(pos, end) - pos : end - pos;
ASTPtr res = parseQueryAndMovePosition(
parser, pos, end, "query", multiple, cmd_settings.max_query_size, cmd_settings.max_parser_depth);
parser, pos, end, "query", multiple, cmd_settings.max_query_size, cmd_settings.max_parser_depth, cmd_settings.max_parser_backtracks);
std::unique_ptr<ReadBuffer> insert_query_payload = nullptr;
/// If the query is INSERT ... VALUES, then we will try to parse the data.

View File

@ -44,7 +44,7 @@ String KeeperClient::executeFourLetterCommand(const String & command)
std::vector<String> KeeperClient::getCompletions(const String & prefix) const
{
Tokens tokens(prefix.data(), prefix.data() + prefix.size(), 0, false);
IParser::Pos pos(tokens, 0);
IParser::Pos pos(tokens, DBMS_DEFAULT_MAX_PARSER_DEPTH, DBMS_DEFAULT_MAX_PARSER_BACKTRACKS);
if (pos->type != TokenType::BareWord)
return registered_commands_and_four_letter_words;
@ -278,6 +278,7 @@ bool KeeperClient::processQueryText(const String & text)
/* allow_multi_statements = */ true,
/* max_query_size = */ 0,
/* max_parser_depth = */ 0,
/* max_parser_backtracks = */ 0,
/* skip_insignificant = */ false);
if (!res)

View File

@ -10,6 +10,7 @@
#include <IO/UseSSL.h>
#include <Core/ServerUUID.h>
#include <Common/logger_useful.h>
#include <Common/CgroupsMemoryUsageObserver.h>
#include <Common/ErrorHandlers.h>
#include <Common/assertProcessUserMatchesDataOwner.h>
#include <Common/makeSocketAddress.h>
@ -623,6 +624,25 @@ try
buildLoggers(config(), logger());
main_config_reloader->start();
std::optional<CgroupsMemoryUsageObserver> cgroups_memory_usage_observer;
try
{
auto wait_time = config().getUInt64("keeper_server.cgroups_memory_observer_wait_time", 15);
if (wait_time != 0)
{
cgroups_memory_usage_observer.emplace(std::chrono::seconds(wait_time));
/// Not calling cgroups_memory_usage_observer->setLimits() here (as for the normal ClickHouse server) because Keeper controls
/// its memory usage by other means (via setting 'max_memory_usage_soft_limit').
cgroups_memory_usage_observer->setOnMemoryAmountAvailableChangedFn([&]() { main_config_reloader->reload(); });
cgroups_memory_usage_observer->startThread();
}
}
catch (Exception &)
{
tryLogCurrentException(log, "Disabling cgroup memory observer because of an error during initialization");
}
LOG_INFO(log, "Ready for connections.");
waitForTerminationRequest();

View File

@ -11,6 +11,7 @@ set (CLICKHOUSE_LIBRARY_BRIDGE_SOURCES
LibraryBridgeHandlers.cpp
SharedLibrary.cpp
library-bridge.cpp
createFunctionBaseCast.cpp
)
clickhouse_add_executable(clickhouse-library-bridge ${CLICKHOUSE_LIBRARY_BRIDGE_SOURCES})

View File

@ -1,6 +1,6 @@
#pragma once
#include <Interpreters/Context.h>
#include <Interpreters/Context_fwd.h>
#include <Bridge/IBridge.h>
#include "LibraryBridgeHandlerFactory.h"

View File

@ -0,0 +1,23 @@
#include <memory>
#include <Functions/CastOverloadResolver.h>
#include <Core/ColumnsWithTypeAndName.h>
namespace DB
{
namespace ErrorCodes
{
extern const int NOT_IMPLEMENTED;
}
class IFunctionBase;
using FunctionBasePtr = std::shared_ptr<const IFunctionBase>;
FunctionBasePtr createFunctionBaseCast(
ContextPtr, const char *, const ColumnsWithTypeAndName &, const DataTypePtr &, std::optional<CastDiagnostic>, CastType)
{
throw Exception(ErrorCodes::NOT_IMPLEMENTED, "Type conversions are not implemented for Library Bridge");
}
}

View File

@ -1000,12 +1000,6 @@ extern "C" int LLVMFuzzerInitialize(int * pargc, char *** pargv)
{
std::vector<char *> argv(*pargv, *pargv + (*pargc + 1));
if (!isClickhouseApp("local", argv))
{
std::cerr << "\033[31m" << "ClickHouse compiled in fuzzing mode, only clickhouse local is available." << "\033[0m" << std::endl;
exit(1);
}
/// As a user you can add flags to clickhouse binary in fuzzing mode as follows
/// clickhouse local <set of clickhouse-local specific flag> -- <set of libfuzzer flags>
@ -1013,13 +1007,16 @@ extern "C" int LLVMFuzzerInitialize(int * pargc, char *** pargv)
auto it = argv.begin() + 1;
for (; *it; ++it)
{
if (strcmp(*it, "--") == 0)
{
++it;
break;
}
}
while (*it)
{
if (strncmp(*it, "--", 2) != 0)
{
*(p++) = *it;
@ -1027,6 +1024,7 @@ extern "C" int LLVMFuzzerInitialize(int * pargc, char *** pargv)
}
else
++it;
}
*pargc = static_cast<int>(p - &(*pargv)[0]);
*p = nullptr;

View File

@ -68,7 +68,6 @@ namespace
using MainFunc = int (*)(int, char**);
#if !defined(FUZZING_MODE)
/// Add an item here to register new application
std::pair<std::string_view, MainFunc> clickhouse_applications[] =
{
@ -105,13 +104,6 @@ std::pair<std::string_view, MainFunc> clickhouse_applications[] =
{"restart", mainEntryClickHouseRestart},
};
/// Add an item here to register a new short name
std::pair<std::string_view, std::string_view> clickhouse_short_names[] =
{
{"chl", "local"},
{"chc", "client"},
};
int printHelp(int, char **)
{
std::cerr << "Use one of the following commands:" << std::endl;
@ -121,6 +113,13 @@ int printHelp(int, char **)
}
#endif
/// Add an item here to register a new short name
std::pair<std::string_view, std::string_view> clickhouse_short_names[] =
{
{"chl", "local"},
{"chc", "client"},
};
enum class InstructionFail
{

View File

@ -13,6 +13,7 @@ set (CLICKHOUSE_ODBC_BRIDGE_SOURCES
getIdentifierQuote.cpp
odbc-bridge.cpp
validateODBCConnectionString.cpp
createFunctionBaseCast.cpp
)
clickhouse_add_executable(clickhouse-odbc-bridge ${CLICKHOUSE_ODBC_BRIDGE_SOURCES})

View File

@ -0,0 +1,23 @@
#include <memory>
#include <Functions/CastOverloadResolver.h>
#include <Core/ColumnsWithTypeAndName.h>
namespace DB
{
namespace ErrorCodes
{
extern const int NOT_IMPLEMENTED;
}
class IFunctionBase;
using FunctionBasePtr = std::shared_ptr<const IFunctionBase>;
FunctionBasePtr createFunctionBaseCast(
ContextPtr, const char *, const ColumnsWithTypeAndName &, const DataTypePtr &, std::optional<CastDiagnostic>, CastType)
{
throw Exception(ErrorCodes::NOT_IMPLEMENTED, "Type conversions are not implemented for Library Bridge");
}
}

View File

@ -733,8 +733,6 @@ try
LOG_INFO(log, "Available CPU instruction sets: {}", cpu_info);
#endif
sanityChecks(*this);
// Initialize global thread pool. Do it before we fetch configs from zookeeper
// nodes (`from_zk`), because ZooKeeper interface uses the pool. We will
// ignore `max_thread_pool_size` in configs we fetch from ZK, but oh well.
@ -904,6 +902,7 @@ try
config_processor.savePreprocessedConfig(loaded_config, config().getString("path", DBMS_DEFAULT_PATH));
config().removeConfiguration(old_configuration.get());
config().add(loaded_config.configuration.duplicate(), PRIO_DEFAULT, false);
global_context->setConfig(loaded_config.configuration);
}
Settings::checkNoSettingNamesAtTopLevel(config(), config_path);
@ -911,6 +910,9 @@ try
/// We need to reload server settings because config could be updated via zookeeper.
server_settings.loadSettingsFromConfig(config());
/// NOTE: Do sanity checks after we loaded all possible substitutions (for the configuration) from ZK
sanityChecks(*this);
#if defined(OS_LINUX)
std::string executable_path = getExecutablePath();
@ -1294,7 +1296,7 @@ try
std::optional<CgroupsMemoryUsageObserver> cgroups_memory_usage_observer;
try
{
UInt64 wait_time = server_settings.cgroups_memory_usage_observer_wait_time;
auto wait_time = server_settings.cgroups_memory_usage_observer_wait_time;
if (wait_time != 0)
cgroups_memory_usage_observer.emplace(std::chrono::seconds(wait_time));
}
@ -1360,7 +1362,7 @@ try
{
double hard_limit_ratio = new_server_settings.cgroup_memory_watcher_hard_limit_ratio;
double soft_limit_ratio = new_server_settings.cgroup_memory_watcher_soft_limit_ratio;
cgroups_memory_usage_observer->setLimits(
cgroups_memory_usage_observer->setMemoryUsageLimits(
static_cast<uint64_t>(max_server_memory_usage * hard_limit_ratio),
static_cast<uint64_t>(max_server_memory_usage * soft_limit_ratio));
}
@ -1718,6 +1720,12 @@ try
throw;
}
if (cgroups_memory_usage_observer)
{
cgroups_memory_usage_observer->setOnMemoryAmountAvailableChangedFn([&]() { main_config_reloader->reload(); });
cgroups_memory_usage_observer->startThread();
}
/// Reload config in SYSTEM RELOAD CONFIG query.
global_context->setConfigReloadCallback([&]()
{

View File

@ -62,7 +62,7 @@ AccessEntityPtr deserializeAccessEntityImpl(const String & definition)
const char * end = begin + definition.size();
while (pos < end)
{
queries.emplace_back(parseQueryAndMovePosition(parser, pos, end, "", true, 0, DBMS_DEFAULT_MAX_PARSER_DEPTH));
queries.emplace_back(parseQueryAndMovePosition(parser, pos, end, "", true, 0, DBMS_DEFAULT_MAX_PARSER_DEPTH, DBMS_DEFAULT_MAX_PARSER_BACKTRACKS));
while (isWhitespaceASCII(*pos) || *pos == ';')
++pos;
}

View File

@ -86,7 +86,7 @@ void RowPolicyCache::PolicyInfo::setPolicy(const RowPolicyPtr & policy_)
try
{
ParserExpression parser;
parsed_filters[filter_type_i] = parseQuery(parser, filter, 0, DBMS_DEFAULT_MAX_PARSER_DEPTH);
parsed_filters[filter_type_i] = parseQuery(parser, filter, 0, DBMS_DEFAULT_MAX_PARSER_DEPTH, DBMS_DEFAULT_MAX_PARSER_BACKTRACKS);
}
catch (...)
{

View File

@ -66,7 +66,7 @@ namespace
String error_message;
const char * pos = string_query.data();
auto ast = tryParseQuery(parser, pos, pos + string_query.size(), error_message, false, "", false, 0, 0);
auto ast = tryParseQuery(parser, pos, pos + string_query.size(), error_message, false, "", false, 0, DBMS_DEFAULT_MAX_PARSER_DEPTH, DBMS_DEFAULT_MAX_PARSER_BACKTRACKS, true);
if (!ast)
throw Exception(ErrorCodes::BAD_ARGUMENTS, "Failed to parse grant query. Error: {}", error_message);

View File

@ -27,6 +27,9 @@ extern "C" int LLVMFuzzerTestOneInput(const uint8_t * data, size_t size)
auto initialize = [&]() mutable
{
if (context)
return true;
shared_context = Context::createShared();
context = Context::createGlobal(shared_context.get());
context->makeGlobalContext();

View File

@ -81,7 +81,8 @@ void getAggregateFunctionNameAndParametersArray(
ParserExpressionList params_parser(false);
ASTPtr args_ast = parseQuery(params_parser,
parameters_str.data(), parameters_str.data() + parameters_str.size(),
"parameters of aggregate function in " + error_context, 0, DBMS_DEFAULT_MAX_PARSER_DEPTH);
"parameters of aggregate function in " + error_context,
0, DBMS_DEFAULT_MAX_PARSER_DEPTH, DBMS_DEFAULT_MAX_PARSER_BACKTRACKS);
if (args_ast->children.empty())
throw Exception(ErrorCodes::BAD_ARGUMENTS, "Incorrect list of parameters to aggregate function {}",

View File

@ -25,7 +25,7 @@ String BackupInfo::toString() const
BackupInfo BackupInfo::fromString(const String & str)
{
ParserIdentifierWithOptionalParameters parser;
ASTPtr ast = parseQuery(parser, str, 0, DBMS_DEFAULT_MAX_PARSER_DEPTH);
ASTPtr ast = parseQuery(parser, str, 0, DBMS_DEFAULT_MAX_PARSER_DEPTH, DBMS_DEFAULT_MAX_PARSER_BACKTRACKS);
return fromAST(*ast);
}

View File

@ -101,10 +101,12 @@ RestorerFromBackup::RestorerFromBackup(
RestorerFromBackup::~RestorerFromBackup()
{
if (!futures.empty())
/// If an exception occurs we can come here to the destructor having some tasks still unfinished.
/// We have to wait until they finish.
if (getNumFutures() > 0)
{
LOG_ERROR(log, "RestorerFromBackup must not be destroyed while {} tasks are still running", futures.size());
chassert(false && "RestorerFromBackup must not be destroyed while some tasks are still running");
LOG_INFO(log, "Waiting for {} tasks to finish", getNumFutures());
waitFutures();
}
}
@ -422,7 +424,7 @@ void RestorerFromBackup::findTableInBackupImpl(const QualifiedTableName & table_
readStringUntilEOF(create_query_str, *read_buffer);
read_buffer.reset();
ParserCreateQuery create_parser;
ASTPtr create_table_query = parseQuery(create_parser, create_query_str, 0, DBMS_DEFAULT_MAX_PARSER_DEPTH);
ASTPtr create_table_query = parseQuery(create_parser, create_query_str, 0, DBMS_DEFAULT_MAX_PARSER_DEPTH, DBMS_DEFAULT_MAX_PARSER_BACKTRACKS);
applyCustomStoragePolicy(create_table_query);
renameDatabaseAndTableNameInCreateQuery(create_table_query, renaming_map, context->getGlobalContext());
String create_table_query_str = serializeAST(*create_table_query);
@ -532,7 +534,7 @@ void RestorerFromBackup::findDatabaseInBackupImpl(const String & database_name_i
readStringUntilEOF(create_query_str, *read_buffer);
read_buffer.reset();
ParserCreateQuery create_parser;
ASTPtr create_database_query = parseQuery(create_parser, create_query_str, 0, DBMS_DEFAULT_MAX_PARSER_DEPTH);
ASTPtr create_database_query = parseQuery(create_parser, create_query_str, 0, DBMS_DEFAULT_MAX_PARSER_DEPTH, DBMS_DEFAULT_MAX_PARSER_BACKTRACKS);
renameDatabaseAndTableNameInCreateQuery(create_database_query, renaming_map, context->getGlobalContext());
String create_database_query_str = serializeAST(*create_database_query);

View File

@ -348,7 +348,7 @@ ASTPtr ClientBase::parseQuery(const char *& pos, const char * end, bool allow_mu
if (dialect == Dialect::kusto)
parser = std::make_unique<ParserKQLStatement>(end, global_context->getSettings().allow_settings_after_format_in_insert);
else if (dialect == Dialect::prql)
parser = std::make_unique<ParserPRQLQuery>(max_length, settings.max_parser_depth);
parser = std::make_unique<ParserPRQLQuery>(max_length, settings.max_parser_depth, settings.max_parser_backtracks);
else
parser = std::make_unique<ParserQuery>(end, global_context->getSettings().allow_settings_after_format_in_insert);
@ -356,9 +356,9 @@ ASTPtr ClientBase::parseQuery(const char *& pos, const char * end, bool allow_mu
{
String message;
if (dialect == Dialect::kusto)
res = tryParseKQLQuery(*parser, pos, end, message, true, "", allow_multi_statements, max_length, settings.max_parser_depth);
res = tryParseKQLQuery(*parser, pos, end, message, true, "", allow_multi_statements, max_length, settings.max_parser_depth, settings.max_parser_backtracks, true);
else
res = tryParseQuery(*parser, pos, end, message, true, "", allow_multi_statements, max_length, settings.max_parser_depth);
res = tryParseQuery(*parser, pos, end, message, true, "", allow_multi_statements, max_length, settings.max_parser_depth, settings.max_parser_backtracks, true);
if (!res)
{
@ -369,9 +369,9 @@ ASTPtr ClientBase::parseQuery(const char *& pos, const char * end, bool allow_mu
else
{
if (dialect == Dialect::kusto)
res = parseKQLQueryAndMovePosition(*parser, pos, end, "", allow_multi_statements, max_length, settings.max_parser_depth);
res = parseKQLQueryAndMovePosition(*parser, pos, end, "", allow_multi_statements, max_length, settings.max_parser_depth, settings.max_parser_backtracks);
else
res = parseQueryAndMovePosition(*parser, pos, end, "", allow_multi_statements, max_length, settings.max_parser_depth);
res = parseQueryAndMovePosition(*parser, pos, end, "", allow_multi_statements, max_length, settings.max_parser_depth, settings.max_parser_backtracks);
}
if (is_interactive)
@ -388,12 +388,12 @@ ASTPtr ClientBase::parseQuery(const char *& pos, const char * end, bool allow_mu
/// Consumes trailing semicolons and tries to consume the same-line trailing comment.
void ClientBase::adjustQueryEnd(const char *& this_query_end, const char * all_queries_end, uint32_t max_parser_depth)
void ClientBase::adjustQueryEnd(const char *& this_query_end, const char * all_queries_end, uint32_t max_parser_depth, uint32_t max_parser_backtracks)
{
// We have to skip the trailing semicolon that might be left
// after VALUES parsing or just after a normal semicolon-terminated query.
Tokens after_query_tokens(this_query_end, all_queries_end);
IParser::Pos after_query_iterator(after_query_tokens, max_parser_depth);
IParser::Pos after_query_iterator(after_query_tokens, max_parser_depth, max_parser_backtracks);
while (after_query_iterator.isValid() && after_query_iterator->type == TokenType::Semicolon)
{
this_query_end = after_query_iterator->end;
@ -2049,6 +2049,7 @@ MultiQueryProcessingStage ClientBase::analyzeMultiQueryText(
return MultiQueryProcessingStage::QUERIES_END;
unsigned max_parser_depth = static_cast<unsigned>(global_context->getSettingsRef().max_parser_depth);
unsigned max_parser_backtracks = static_cast<unsigned>(global_context->getSettingsRef().max_parser_backtracks);
// If there are only comments left until the end of file, we just
// stop. The parser can't handle this situation because it always
@ -2059,7 +2060,7 @@ MultiQueryProcessingStage ClientBase::analyzeMultiQueryText(
// and it makes more sense to treat them as such.
{
Tokens tokens(this_query_begin, all_queries_end);
IParser::Pos token_iterator(tokens, max_parser_depth);
IParser::Pos token_iterator(tokens, max_parser_depth, max_parser_backtracks);
if (!token_iterator.isValid())
return MultiQueryProcessingStage::QUERIES_END;
}
@ -2080,7 +2081,7 @@ MultiQueryProcessingStage ClientBase::analyzeMultiQueryText(
if (ignore_error)
{
Tokens tokens(this_query_begin, all_queries_end);
IParser::Pos token_iterator(tokens, max_parser_depth);
IParser::Pos token_iterator(tokens, max_parser_depth, max_parser_backtracks);
while (token_iterator->type != TokenType::Semicolon && token_iterator.isValid())
++token_iterator;
this_query_begin = token_iterator->end;
@ -2120,7 +2121,7 @@ MultiQueryProcessingStage ClientBase::analyzeMultiQueryText(
// after we have processed the query. But even this guess is
// beneficial so that we see proper trailing comments in "echo" and
// server log.
adjustQueryEnd(this_query_end, all_queries_end, max_parser_depth);
adjustQueryEnd(this_query_end, all_queries_end, max_parser_depth, max_parser_backtracks);
return MultiQueryProcessingStage::EXECUTE_QUERY;
}
@ -2316,7 +2317,8 @@ bool ClientBase::executeMultiQuery(const String & all_queries_text)
this_query_end = insert_ast->end;
adjustQueryEnd(
this_query_end, all_queries_end,
static_cast<unsigned>(global_context->getSettingsRef().max_parser_depth));
static_cast<unsigned>(global_context->getSettingsRef().max_parser_depth),
static_cast<unsigned>(global_context->getSettingsRef().max_parser_backtracks));
}
// Report error.

View File

@ -94,7 +94,7 @@ protected:
void processParsedSingleQuery(const String & full_query, const String & query_to_execute,
ASTPtr parsed_query, std::optional<bool> echo_query_ = {}, bool report_error = false);
static void adjustQueryEnd(const char *& this_query_end, const char * all_queries_end, uint32_t max_parser_depth);
static void adjustQueryEnd(const char *& this_query_end, const char * all_queries_end, uint32_t max_parser_depth, uint32_t max_parser_backtracks);
ASTPtr parseQuery(const char *& pos, const char * end, bool allow_multi_statements) const;
static void setupSignalHandler();

View File

@ -569,7 +569,8 @@ void QueryFuzzer::fuzzColumnDeclaration(ASTColumnDeclaration & column)
auto data_type = fuzzDataType(DataTypeFactory::instance().get(column.type));
ParserDataType parser;
column.type = parseQuery(parser, data_type->getName(), DBMS_DEFAULT_MAX_QUERY_SIZE, DBMS_DEFAULT_MAX_PARSER_DEPTH);
column.type = parseQuery(parser, data_type->getName(),
DBMS_DEFAULT_MAX_QUERY_SIZE, DBMS_DEFAULT_MAX_PARSER_DEPTH, DBMS_DEFAULT_MAX_PARSER_BACKTRACKS);
}
}
@ -821,7 +822,8 @@ static ASTPtr tryParseInsertQuery(const String & full_query)
ParserInsertQuery parser(end, false);
String message;
return tryParseQuery(parser, pos, end, message, false, "", false, DBMS_DEFAULT_MAX_QUERY_SIZE, DBMS_DEFAULT_MAX_PARSER_DEPTH);
return tryParseQuery(parser, pos, end, message, false, "", false,
DBMS_DEFAULT_MAX_QUERY_SIZE, DBMS_DEFAULT_MAX_PARSER_DEPTH, DBMS_DEFAULT_MAX_PARSER_BACKTRACKS, true);
}
ASTs QueryFuzzer::getInsertQueriesForFuzzedTables(const String & full_query)

View File

@ -39,7 +39,7 @@ void logAboutProgress(LoggerPtr log, size_t processed, size_t total, AtomicStopw
{
if (total && (processed % PRINT_MESSAGE_EACH_N_OBJECTS == 0 || watch.compareAndRestart(PRINT_MESSAGE_EACH_N_SECONDS)))
{
LOG_INFO(log, "Processed: {}%", static_cast<Int64>(processed * 1000.0 / total) * 0.1);
LOG_INFO(log, "Processed: {:.1f}%", static_cast<double>(processed) * 100.0 / total);
watch.restart();
}
}

View File

@ -9,6 +9,7 @@
#include <IO/ReadBufferFromFileDescriptor.h>
#include <IO/ReadHelpers.h>
#include <base/cgroupsv2.h>
#include <base/getMemoryAmount.h>
#include <base/sleep.h>
#include <filesystem>
@ -36,7 +37,7 @@ namespace ErrorCodes
CgroupsMemoryUsageObserver::CgroupsMemoryUsageObserver(std::chrono::seconds wait_time_)
: log(getLogger("CgroupsMemoryUsageObserver"))
, wait_time(wait_time_)
, file(log)
, memory_usage_file(log)
{
LOG_INFO(log, "Initialized cgroups memory limit observer, wait time is {} sec", wait_time.count());
}
@ -46,13 +47,13 @@ CgroupsMemoryUsageObserver::~CgroupsMemoryUsageObserver()
stopThread();
}
void CgroupsMemoryUsageObserver::setLimits(uint64_t hard_limit_, uint64_t soft_limit_)
void CgroupsMemoryUsageObserver::setMemoryUsageLimits(uint64_t hard_limit_, uint64_t soft_limit_)
{
std::lock_guard<std::mutex> limit_lock(limit_mutex);
if (hard_limit_ == hard_limit && soft_limit_ == soft_limit)
return;
stopThread();
hard_limit = hard_limit_;
soft_limit = soft_limit_;
@ -83,10 +84,10 @@ void CgroupsMemoryUsageObserver::setLimits(uint64_t hard_limit_, uint64_t soft_l
mallctl("arena." STRINGIFY(MALLCTL_ARENAS_ALL) ".purge", nullptr, nullptr, nullptr, 0);
#endif
/// Reset current usage in memory tracker. Expect zero for free_memory_in_allocator_arenas as we just purged them.
uint64_t current_usage = readMemoryUsage();
MemoryTracker::setRSS(current_usage, 0);
uint64_t memory_usage = memory_usage_file.readMemoryUsage();
MemoryTracker::setRSS(memory_usage, 0);
LOG_INFO(log, "Purged jemalloc arenas. Current memory usage is {}", ReadableSize(current_usage));
LOG_INFO(log, "Purged jemalloc arenas. Current memory usage is {}", ReadableSize(memory_usage));
}
else
{
@ -94,14 +95,13 @@ void CgroupsMemoryUsageObserver::setLimits(uint64_t hard_limit_, uint64_t soft_l
}
};
startThread();
LOG_INFO(log, "Set new limits, soft limit: {}, hard limit: {}", ReadableSize(soft_limit_), ReadableSize(hard_limit_));
}
uint64_t CgroupsMemoryUsageObserver::readMemoryUsage() const
void CgroupsMemoryUsageObserver::setOnMemoryAmountAvailableChangedFn(OnMemoryAmountAvailableChangedFn on_memory_amount_available_changed_)
{
return file.readMemoryUsage();
std::lock_guard<std::mutex> memory_amount_available_changed_lock(memory_amount_available_changed_mutex);
on_memory_amount_available_changed = on_memory_amount_available_changed_;
}
namespace
@ -163,7 +163,7 @@ std::pair<std::string, CgroupsMemoryUsageObserver::CgroupsVersion> getCgroupsFil
}
CgroupsMemoryUsageObserver::File::File(LoggerPtr log_)
CgroupsMemoryUsageObserver::MemoryUsageFile::MemoryUsageFile(LoggerPtr log_)
: log(log_)
{
std::tie(file_name, version) = getCgroupsFileName();
@ -177,7 +177,7 @@ CgroupsMemoryUsageObserver::File::File(LoggerPtr log_)
file_name, "Cannot open file '{}'", file_name);
}
CgroupsMemoryUsageObserver::File::~File()
CgroupsMemoryUsageObserver::MemoryUsageFile::~MemoryUsageFile()
{
assert(fd != -1);
if (::close(fd) != 0)
@ -195,7 +195,7 @@ CgroupsMemoryUsageObserver::File::~File()
}
}
uint64_t CgroupsMemoryUsageObserver::File::readMemoryUsage() const
uint64_t CgroupsMemoryUsageObserver::MemoryUsageFile::readMemoryUsage() const
{
/// File read is probably not read is thread-safe, just to be sure
std::lock_guard lock(mutex);
@ -278,6 +278,9 @@ void CgroupsMemoryUsageObserver::runThread()
{
setThreadName("CgrpMemUsgObsr");
last_available_memory_amount = getMemoryAmount();
LOG_INFO(log, "Memory amount initially available to the process is {}", ReadableSize(last_available_memory_amount));
std::unique_lock lock(thread_mutex);
while (true)
{
@ -286,8 +289,42 @@ void CgroupsMemoryUsageObserver::runThread()
try
{
uint64_t memory_usage = file.readMemoryUsage();
processMemoryUsage(memory_usage);
uint64_t available_memory_amount = getMemoryAmount();
if (available_memory_amount != last_available_memory_amount)
{
LOG_INFO(log, "Memory amount available to the process changed from {} to {}", ReadableSize(last_available_memory_amount), ReadableSize(available_memory_amount));
last_available_memory_amount = available_memory_amount;
std::lock_guard<std::mutex> memory_amount_available_changed_lock(memory_amount_available_changed_mutex);
on_memory_amount_available_changed();
}
std::lock_guard<std::mutex> limit_lock(limit_mutex);
if (soft_limit > 0 && hard_limit > 0)
{
uint64_t memory_usage = memory_usage_file.readMemoryUsage();
if (memory_usage > hard_limit)
{
if (last_memory_usage <= hard_limit)
on_hard_limit(true);
}
else
{
if (last_memory_usage > hard_limit)
on_hard_limit(false);
}
if (memory_usage > soft_limit)
{
if (last_memory_usage <= soft_limit)
on_soft_limit(true);
}
else
{
if (last_memory_usage > soft_limit)
on_soft_limit(false);
}
last_memory_usage = memory_usage;
}
}
catch (...)
{
@ -296,33 +333,6 @@ void CgroupsMemoryUsageObserver::runThread()
}
}
void CgroupsMemoryUsageObserver::processMemoryUsage(uint64_t current_usage)
{
if (current_usage > hard_limit)
{
if (last_usage <= hard_limit)
on_hard_limit(true);
}
else
{
if (last_usage > hard_limit)
on_hard_limit(false);
}
if (current_usage > soft_limit)
{
if (last_usage <= soft_limit)
on_soft_limit(true);
}
else
{
if (last_usage > soft_limit)
on_soft_limit(false);
}
last_usage = current_usage;
}
}
#endif

View File

@ -2,57 +2,71 @@
#include <Common/ThreadPool.h>
#include <atomic>
#include <chrono>
#include <mutex>
namespace DB
{
/// Periodically reads the current memory usage from Linux cgroups.
/// You can specify soft or hard memory limits:
/// - When the soft memory limit is hit, drop jemalloc cache.
/// - When the hard memory limit is hit, update MemoryTracking metric to throw memory exceptions faster.
/// Does two things:
/// 1. Periodically reads the memory usage of the process from Linux cgroups.
/// You can specify soft or hard memory limits:
/// - When the soft memory limit is hit, drop jemalloc cache.
/// - When the hard memory limit is hit, update MemoryTracking metric to throw memory exceptions faster.
/// The goal of this is to avoid that the process hits the maximum allowed memory limit at which there is a good
/// chance that the Limux OOM killer terminates it. All of this is done is because internal memory tracking in
/// ClickHouse can unfortunately under-estimate the actually used memory.
/// 2. Periodically reads the the maximum memory available to the process (which can change due to cgroups settings).
/// You can specify a callback to react on changes. The callback typically reloads the configuration, i.e. Server
/// or Keeper configuration file. This reloads settings 'max_server_memory_usage' (Server) and 'max_memory_usage_soft_limit'
/// (Keeper) from which various other internal limits are calculated, including the soft and hard limits for (1.).
/// The goal of this is to provide elasticity when the container is scaled-up/scaled-down. The mechanism (polling
/// cgroups) is quite implicit, unfortunately there is currently no better way to communicate memory threshold changes
/// to the database.
#if defined(OS_LINUX)
class CgroupsMemoryUsageObserver
{
public:
using OnMemoryLimitFn = std::function<void(bool)>;
using OnMemoryAmountAvailableChangedFn = std::function<void()>;
enum class CgroupsVersion
{
V1,
V2
};
explicit CgroupsMemoryUsageObserver(std::chrono::seconds wait_time_);
~CgroupsMemoryUsageObserver();
void setLimits(uint64_t hard_limit_, uint64_t soft_limit_);
void setMemoryUsageLimits(uint64_t hard_limit_, uint64_t soft_limit_);
void setOnMemoryAmountAvailableChangedFn(OnMemoryAmountAvailableChangedFn on_memory_amount_available_changed_);
size_t getHardLimit() const { return hard_limit; }
size_t getSoftLimit() const { return soft_limit; }
uint64_t readMemoryUsage() const;
void startThread();
private:
LoggerPtr log;
std::atomic<size_t> hard_limit = 0;
std::atomic<size_t> soft_limit = 0;
const std::chrono::seconds wait_time;
using CallbackFn = std::function<void(bool)>;
CallbackFn on_hard_limit;
CallbackFn on_soft_limit;
std::mutex limit_mutex;
size_t hard_limit TSA_GUARDED_BY(limit_mutex) = 0;
size_t soft_limit TSA_GUARDED_BY(limit_mutex) = 0;
OnMemoryLimitFn on_hard_limit TSA_GUARDED_BY(limit_mutex);
OnMemoryLimitFn on_soft_limit TSA_GUARDED_BY(limit_mutex);
uint64_t last_usage = 0;
std::mutex memory_amount_available_changed_mutex;
OnMemoryAmountAvailableChangedFn on_memory_amount_available_changed TSA_GUARDED_BY(memory_amount_available_changed_mutex);
uint64_t last_memory_usage = 0; /// how much memory does the process use
uint64_t last_available_memory_amount; /// how much memory can the process use
/// Represents the cgroup virtual file that shows the memory consumption of the process's cgroup.
struct File
struct MemoryUsageFile
{
public:
explicit File(LoggerPtr log_);
~File();
explicit MemoryUsageFile(LoggerPtr log_);
~MemoryUsageFile();
uint64_t readMemoryUsage() const;
private:
LoggerPtr log;
@ -62,13 +76,11 @@ private:
std::string file_name;
};
File file;
MemoryUsageFile memory_usage_file;
void startThread();
void stopThread();
void runThread();
void processMemoryUsage(uint64_t usage);
std::mutex thread_mutex;
std::condition_variable cond;
@ -79,13 +91,13 @@ private:
#else
class CgroupsMemoryUsageObserver
{
using OnMemoryAmountAvailableChangedFn = std::function<void()>;
public:
explicit CgroupsMemoryUsageObserver(std::chrono::seconds) {}
void setLimits(uint64_t, uint64_t) {}
size_t readMemoryUsage() { return 0; }
size_t getHardLimit() { return 0; }
size_t getSoftLimit() { return 0; }
void setMemoryUsageLimits(uint64_t, uint64_t) {}
void setOnMemoryAmountAvailableChangedFn(OnMemoryAmountAvailableChangedFn) {}
void startThread() {}
};
#endif

View File

@ -427,6 +427,8 @@ void ConfigProcessor::doIncludesRecursive(
/// Replace the original contents, not add to it.
bool replace = attributes->getNamedItem("replace");
/// Merge with the original contents
bool merge = attributes->getNamedItem("merge");
bool included_something = false;
@ -450,7 +452,6 @@ void ConfigProcessor::doIncludesRecursive(
}
else
{
/// Replace the whole node not just contents.
if (node->nodeName() == "include")
{
const NodeListPtr children = node_to_include->childNodes();
@ -458,8 +459,18 @@ void ConfigProcessor::doIncludesRecursive(
for (Node * child = children->item(0); child; child = next_child)
{
next_child = child->nextSibling();
NodePtr new_node = config->importNode(child, true);
node->parentNode()->insertBefore(new_node, node);
/// Recursively replace existing nodes in merge mode
if (merge)
{
NodePtr new_node = config->importNode(child->parentNode(), true);
mergeRecursive(config, node->parentNode(), new_node);
}
else /// Append to existing node by default
{
NodePtr new_node = config->importNode(child, true);
node->parentNode()->insertBefore(new_node, node);
}
}
node->parentNode()->removeChild(node);
@ -777,9 +788,9 @@ ConfigProcessor::LoadedConfig ConfigProcessor::loadConfig(bool allow_zk_includes
}
ConfigProcessor::LoadedConfig ConfigProcessor::loadConfigWithZooKeeperIncludes(
zkutil::ZooKeeperNodeCache & zk_node_cache,
const zkutil::EventPtr & zk_changed_event,
bool fallback_to_preprocessed)
zkutil::ZooKeeperNodeCache & zk_node_cache,
const zkutil::EventPtr & zk_changed_event,
bool fallback_to_preprocessed)
{
XMLDocumentPtr config_xml;
bool has_zk_includes;

View File

@ -584,10 +584,6 @@
M(703, INVALID_IDENTIFIER) \
M(704, QUERY_CACHE_USED_WITH_NONDETERMINISTIC_FUNCTIONS) \
M(705, TABLE_NOT_EMPTY) \
\
M(900, DISTRIBUTED_CACHE_ERROR) \
M(901, CANNOT_USE_DISTRIBUTED_CACHE) \
\
M(706, LIBSSH_ERROR) \
M(707, GCP_ERROR) \
M(708, ILLEGAL_STATISTIC) \
@ -599,6 +595,10 @@
M(715, CANNOT_DETECT_FORMAT) \
M(716, CANNOT_FORGET_PARTITION) \
M(717, EXPERIMENTAL_FEATURE_ERROR) \
M(718, TOO_SLOW_PARSING) \
\
M(900, DISTRIBUTED_CACHE_ERROR) \
M(901, CANNOT_USE_DISTRIBUTED_CACHE) \
\
M(999, KEEPER_EXCEPTION) \
M(1000, POCO_EXCEPTION) \

View File

@ -25,6 +25,18 @@ inline bool isFinite(T x)
return true;
}
template <typename T>
bool canConvertTo(Float64 x)
{
if constexpr (std::is_floating_point_v<T>)
return true;
if (!isFinite(x))
return false;
if (x > Float64(std::numeric_limits<T>::max()) || x < Float64(std::numeric_limits<T>::lowest()))
return false;
return true;
}
template <typename T>
T NaNOrZero()

View File

@ -302,7 +302,7 @@ private:
readStringUntilEOF(query, in);
ParserCreateNamedCollectionQuery parser;
auto ast = parseQuery(parser, query, "in file " + path, 0, settings.max_parser_depth);
auto ast = parseQuery(parser, query, "in file " + path, 0, settings.max_parser_depth, settings.max_parser_backtracks);
const auto & create_query = ast->as<const ASTCreateNamedCollectionQuery &>();
return create_query;
}

View File

@ -10,9 +10,3 @@ ContextHolder & getMutableContext()
static ContextHolder holder;
return holder;
}
void destroyContext()
{
auto & holder = getMutableContext();
return holder.destroy();
}

View File

@ -28,5 +28,3 @@ struct ContextHolder
const ContextHolder & getContext();
ContextHolder & getMutableContext();
void destroyContext();

View File

@ -1,5 +1,2 @@
clickhouse_add_executable (compressed_buffer compressed_buffer.cpp)
target_link_libraries (compressed_buffer PRIVATE dbms)
clickhouse_add_executable (cached_compressed_read_buffer cached_compressed_read_buffer.cpp)
target_link_libraries (cached_compressed_read_buffer PRIVATE dbms)
target_link_libraries (compressed_buffer PRIVATE clickhouse_common_io clickhouse_compression)

View File

@ -1,79 +0,0 @@
#include <iostream>
#include <iomanip>
#include <limits>
#include <Compression/CompressionFactory.h>
#include <Compression/CachedCompressedReadBuffer.h>
#include <IO/WriteBufferFromFile.h>
#include <Disks/IO/createReadBufferFromFileBase.h>
#include <IO/copyData.h>
#include <Common/Stopwatch.h>
int main(int argc, char ** argv)
{
using namespace DB;
if (argc < 2)
{
std::cerr << "Usage: program path\n";
return 1;
}
try
{
UncompressedCache cache("SLRU", 1024, 0.5);
std::string path = argv[1];
std::cerr << std::fixed << std::setprecision(3);
size_t hits = 0;
size_t misses = 0;
{
Stopwatch watch;
CachedCompressedReadBuffer in(
path,
[&]()
{
return createReadBufferFromFileBase(path, {});
},
&cache
);
WriteBufferFromFile out("/dev/null");
copyData(in, out);
std::cerr << "Elapsed: " << watch.elapsedSeconds() << std::endl;
}
cache.getStats(hits, misses);
std::cerr << "Hits: " << hits << ", misses: " << misses << std::endl;
{
Stopwatch watch;
CachedCompressedReadBuffer in(
path,
[&]()
{
return createReadBufferFromFileBase(path, {});
},
&cache
);
WriteBufferFromFile out("/dev/null");
copyData(in, out);
std::cerr << "Elapsed: " << watch.elapsedSeconds() << std::endl;
}
cache.getStats(hits, misses);
std::cerr << "Hits: " << hits << ", misses: " << misses << std::endl;
}
catch (const Exception & e)
{
std::cerr << e.what() << ", " << e.displayText() << std::endl;
return 1;
}
return 0;
}

View File

@ -1,7 +1,4 @@
#include <string>
#include <iostream>
#include <fstream>
#include <iomanip>
#include <Common/Stopwatch.h>

View File

@ -442,7 +442,7 @@ CompressionCodecPtr makeCodec(const std::string & codec_string, const DataTypePt
{
const std::string codec_statement = "(" + codec_string + ")";
Tokens tokens(codec_statement.begin().base(), codec_statement.end().base());
IParser::Pos token_iterator(tokens, 0);
IParser::Pos token_iterator(tokens, 0, 0);
Expected expected;
ASTPtr codec_ast;

View File

@ -41,13 +41,13 @@ BaseSettingsHelpers::Flags BaseSettingsHelpers::readFlags(ReadBuffer & in)
void BaseSettingsHelpers::throwSettingNotFound(std::string_view name)
{
throw Exception(ErrorCodes::UNKNOWN_SETTING, "Unknown setting {}", String{name});
throw Exception(ErrorCodes::UNKNOWN_SETTING, "Unknown setting '{}'", String{name});
}
void BaseSettingsHelpers::warningSettingNotFound(std::string_view name)
{
LOG_WARNING(getLogger("Settings"), "Unknown setting {}, skipping", name);
LOG_WARNING(getLogger("Settings"), "Unknown setting '{}', skipping", name);
}
}

View File

@ -63,6 +63,8 @@ static constexpr auto DBMS_DEFAULT_LOCK_ACQUIRE_TIMEOUT_SEC = 120;
/// Default limit on recursion depth of recursive descend parser.
static constexpr auto DBMS_DEFAULT_MAX_PARSER_DEPTH = 1000;
/// Default limit on the amount of backtracking of recursive descend parser.
static constexpr auto DBMS_DEFAULT_MAX_PARSER_BACKTRACKS = 1000000;
/// Default limit on query size.
static constexpr auto DBMS_DEFAULT_MAX_QUERY_SIZE = 262144;

View File

@ -175,6 +175,7 @@ class IColumn;
M(Bool, enable_positional_arguments, true, "Enable positional arguments in ORDER BY, GROUP BY and LIMIT BY", 0) \
M(Bool, enable_extended_results_for_datetime_functions, false, "Enable date functions like toLastDayOfMonth return Date32 results (instead of Date results) for Date32/DateTime64 arguments.", 0) \
M(Bool, allow_nonconst_timezone_arguments, false, "Allow non-const timezone arguments in certain time-related functions like toTimeZone(), fromUnixTimestamp*(), snowflakeToDateTime*()", 0) \
M(Bool, function_locate_has_mysql_compatible_argument_order, true, "Function locate() has arguments (needle, haystack[, start_pos]) like in MySQL instead of (haystack, needle[, start_pos]) like function position()", 0) \
\
M(Bool, group_by_use_nulls, false, "Treat columns mentioned in ROLLUP, CUBE or GROUPING SETS as Nullable", 0) \
\
@ -607,6 +608,7 @@ class IColumn;
M(Bool, use_compact_format_in_distributed_parts_names, true, "Changes format of directories names for distributed table insert parts.", 0) \
M(Bool, validate_polygons, true, "Throw exception if polygon is invalid in function pointInPolygon (e.g. self-tangent, self-intersecting). If the setting is false, the function will accept invalid polygons but may silently return wrong result.", 0) \
M(UInt64, max_parser_depth, DBMS_DEFAULT_MAX_PARSER_DEPTH, "Maximum parser depth (recursion depth of recursive descend parser).", 0) \
M(UInt64, max_parser_backtracks, DBMS_DEFAULT_MAX_PARSER_BACKTRACKS, "Maximum parser backtracking (how many times it tries different alternatives in the recursive descend parsing process).", 0) \
M(Bool, allow_settings_after_format_in_insert, false, "Allow SETTINGS after FORMAT, but note, that this is not always safe (note: this is a compatibility setting).", 0) \
M(Seconds, periodic_live_view_refresh, 60, "Interval after which periodically refreshed live view is forced to refresh.", 0) \
M(Bool, transform_null_in, false, "If enabled, NULL values will be matched with 'IN' operator as if they are considered equal.", 0) \
@ -839,6 +841,9 @@ class IColumn;
M(Bool, compatibility_ignore_auto_increment_in_create_table, false, "Ignore AUTO_INCREMENT keyword in column declaration if true, otherwise return error. It simplifies migration from MySQL", 0) \
M(Bool, multiple_joins_try_to_keep_original_names, false, "Do not add aliases to top level expression list on multiple joins rewrite", 0) \
M(Bool, optimize_sorting_by_input_stream_properties, true, "Optimize sorting by sorting properties of input stream", 0) \
M(UInt64, keeper_max_retries, 10, "Max retries for general keeper operations", 0) \
M(UInt64, keeper_retry_initial_backoff_ms, 100, "Initial backoff timeout for general keeper operations", 0) \
M(UInt64, keeper_retry_max_backoff_ms, 5000, "Max backoff timeout for general keeper operations", 0) \
M(UInt64, insert_keeper_max_retries, 20, "Max retries for keeper operations during insert", 0) \
M(UInt64, insert_keeper_retry_initial_backoff_ms, 100, "Initial backoff timeout for keeper operations during insert", 0) \
M(UInt64, insert_keeper_retry_max_backoff_ms, 10000, "Max backoff timeout for keeper operations during insert", 0) \

View File

@ -94,7 +94,12 @@ static std::map<ClickHouseVersion, SettingsChangesHistory::SettingsChanges> sett
{"input_format_json_use_string_type_for_ambiguous_paths_in_named_tuples_inference_from_objects", false, false, "Allow to use String type for ambiguous paths during named tuple inference from JSON objects"},
{"throw_if_deduplication_in_dependent_materialized_views_enabled_with_async_insert", false, true, "Deduplication is dependent materialized view cannot work together with async inserts."},
{"parallel_replicas_allow_in_with_subquery", false, true, "If true, subquery for IN will be executed on every follower replica"},
{"function_locate_has_mysql_compatible_argument_order", false, true, "Increase compatibility with MySQL's locate function."},
{"filesystem_cache_reserve_space_wait_lock_timeout_milliseconds", 1000, 1000, "Wait time to lock cache for sapce reservation in filesystem cache"},
{"max_parser_backtracks", 0, 1000000, "Limiting the complexity of parsing"},
{"keeper_max_retries", 10, 10, "Max retries for general keeper operations"},
{"keeper_retry_initial_backoff_ms", 100, 100, "Initial backoff timeout for general keeper operations"},
{"keeper_retry_max_backoff_ms", 5000, 5000, "Max backoff timeout for general keeper operations"},
}},
{"24.2", {{"allow_suspicious_variant_types", true, false, "Don't allow creating Variant type with suspicious variants by default"},
{"validate_experimental_and_suspicious_types_inside_nested_types", false, true, "Validate usage of experimental and suspicious types inside nested types"},

View File

@ -6,6 +6,3 @@ target_link_libraries (field PRIVATE dbms)
clickhouse_add_executable (string_ref_hash string_ref_hash.cpp)
target_link_libraries (string_ref_hash PRIVATE clickhouse_common_io)
clickhouse_add_executable (mysql_protocol mysql_protocol.cpp)
target_link_libraries (mysql_protocol PRIVATE dbms)

View File

@ -1,390 +0,0 @@
#include <string>
#include <Core/MySQL/Authentication.h>
#include <Core/MySQL/MySQLClient.h>
#include <Core/MySQL/PacketsConnection.h>
#include <Core/MySQL/PacketsGeneric.h>
#include <Core/MySQL/PacketsProtocolText.h>
#include <IO/ReadBufferFromString.h>
#include <IO/WriteBufferFromString.h>
#include <IO/WriteBufferFromOStream.h>
#include <boost/program_options.hpp>
int main(int argc, char ** argv)
{
using namespace DB;
using namespace MySQLProtocol;
using namespace MySQLProtocol::Generic;
using namespace MySQLProtocol::Authentication;
using namespace MySQLProtocol::ConnectionPhase;
using namespace MySQLProtocol::ProtocolText;
uint8_t server_sequence_id = 1;
uint8_t client_sequence_id = 1;
String user = "default";
String password = "123";
String database;
UInt8 charset_utf8 = 33;
UInt32 max_packet_size = MAX_PACKET_LENGTH;
String mysql_native_password = "mysql_native_password";
UInt32 server_capability_flags = CLIENT_PROTOCOL_41 | CLIENT_SECURE_CONNECTION | CLIENT_PLUGIN_AUTH
| CLIENT_PLUGIN_AUTH_LENENC_CLIENT_DATA | CLIENT_CONNECT_WITH_DB | CLIENT_DEPRECATE_EOF;
UInt32 client_capability_flags = CLIENT_PROTOCOL_41 | CLIENT_PLUGIN_AUTH | CLIENT_SECURE_CONNECTION;
/// Handshake packet
{
/// 1. Greeting:
/// 1.1 Server writes greeting to client
std::string s0;
WriteBufferFromString out0(s0);
Handshake server_handshake(
server_capability_flags, -1, "ClickHouse", "mysql_native_password", "aaaaaaaaaaaaaaaaaaaaa", CharacterSet::utf8_general_ci);
server_handshake.writePayload(out0, server_sequence_id);
/// 1.2 Client reads the greeting
ReadBufferFromString in0(s0);
Handshake client_handshake;
client_handshake.readPayload(in0, client_sequence_id);
/// Check packet
ASSERT(server_handshake.capability_flags == client_handshake.capability_flags)
ASSERT(server_handshake.status_flags == client_handshake.status_flags)
ASSERT(server_handshake.server_version == client_handshake.server_version)
ASSERT(server_handshake.protocol_version == client_handshake.protocol_version)
ASSERT(server_handshake.auth_plugin_data.substr(0, 20) == client_handshake.auth_plugin_data)
ASSERT(server_handshake.auth_plugin_name == client_handshake.auth_plugin_name)
/// 2. Greeting Response:
std::string s1;
WriteBufferFromString out1(s1);
/// 2.1 Client writes to server
Native41 native41(password, client_handshake.auth_plugin_data);
String auth_plugin_data = native41.getAuthPluginData();
HandshakeResponse client_handshake_response(
client_capability_flags, max_packet_size, charset_utf8, user, database, auth_plugin_data, mysql_native_password);
client_handshake_response.writePayload(out1, client_sequence_id);
/// 2.2 Server reads the response
ReadBufferFromString in1(s1);
HandshakeResponse server_handshake_response;
server_handshake_response.readPayload(in1, server_sequence_id);
/// Check
ASSERT(server_handshake_response.capability_flags == client_handshake_response.capability_flags)
ASSERT(server_handshake_response.character_set == client_handshake_response.character_set)
ASSERT(server_handshake_response.username == client_handshake_response.username)
ASSERT(server_handshake_response.database == client_handshake_response.database)
ASSERT(server_handshake_response.auth_response == client_handshake_response.auth_response)
ASSERT(server_handshake_response.auth_plugin_name == client_handshake_response.auth_plugin_name)
}
/// OK Packet
{
// 1. Server writes packet
std::string s0;
WriteBufferFromString out0(s0);
OKPacket server(0x00, server_capability_flags, 0, 0, 0, "", "");
server.writePayload(out0, server_sequence_id);
// 2. Client reads packet
ReadBufferFromString in0(s0);
ResponsePacket client(server_capability_flags);
client.readPayload(in0, client_sequence_id);
// Check
ASSERT(client.getType() == PACKET_OK)
ASSERT(client.ok.header == server.header)
ASSERT(client.ok.status_flags == server.status_flags)
ASSERT(client.ok.capabilities == server.capabilities)
}
/// ERR Packet
{
// 1. Server writes packet
std::string s0;
WriteBufferFromString out0(s0);
ERRPacket server(123, "12345", "This is the error message");
server.writePayload(out0, server_sequence_id);
// 2. Client reads packet
ReadBufferFromString in0(s0);
ResponsePacket client(server_capability_flags);
client.readPayload(in0, client_sequence_id);
// Check
ASSERT(client.getType() == PACKET_ERR)
ASSERT(client.err.header == server.header)
ASSERT(client.err.error_code == server.error_code)
ASSERT(client.err.sql_state == server.sql_state)
ASSERT(client.err.error_message == server.error_message)
}
/// EOF Packet
{
// 1. Server writes packet
std::string s0;
WriteBufferFromString out0(s0);
EOFPacket server(1, 1);
server.writePayload(out0, server_sequence_id);
// 2. Client reads packet
ReadBufferFromString in0(s0);
ResponsePacket client(server_capability_flags);
client.readPayload(in0, client_sequence_id);
// Check
ASSERT(client.getType() == PACKET_EOF)
ASSERT(client.eof.header == server.header)
ASSERT(client.eof.warnings == server.warnings)
ASSERT(client.eof.status_flags == server.status_flags)
}
/// ColumnDefinition Packet
{
// 1. Server writes packet
std::string s0;
WriteBufferFromString out0(s0);
ColumnDefinition server("schema", "tbl", "org_tbl", "name", "org_name", 33, 0x00, MYSQL_TYPE_STRING, 0x00, 0x00);
server.writePayload(out0, server_sequence_id);
// 2. Client reads packet
ReadBufferFromString in0(s0);
ColumnDefinition client;
client.readPayload(in0, client_sequence_id);
// Check
ASSERT(client.column_type == server.column_type)
ASSERT(client.column_length == server.column_length)
ASSERT(client.next_length == server.next_length)
ASSERT(client.character_set == server.character_set)
ASSERT(client.decimals == server.decimals)
ASSERT(client.name == server.name)
ASSERT(client.org_name == server.org_name)
ASSERT(client.table == server.table)
ASSERT(client.org_table == server.org_table)
ASSERT(client.schema == server.schema)
}
/// GTID sets tests.
{
struct Testcase
{
String name;
String sets;
String want;
};
Testcase cases[] = {
{"gtid-sets-without-whitespace",
"2c5adab4-d64a-11e5-82df-ac162d72dac0:1-247743812,9f58c169-d121-11e7-835b-ac162db9c048:1-56060985:56060987-56061175:56061177-"
"56061224:56061226-75201528:75201530-75201755:75201757-75201983:75201985-75407550:75407552-75407604:75407606-75407661:"
"75407663-87889848:87889850-87889935:87889937-87890042:87890044-88391955:88391957-88392125:88392127-88392245:88392247-"
"88755771:88755773-88755826:88755828-88755921:88755923-100279047:100279049-100279126:100279128-100279247:100279249-121672430:"
"121672432-121672503:121672505-121672524:121672526-122946019:122946021-122946291:122946293-122946469:122946471-134313284:"
"134313286-134313415:134313417-134313648:134313650-136492728:136492730-136492784:136492786-136492904:136492906-145582402:"
"145582404-145582439:145582441-145582463:145582465-147455222:147455224-147455262:147455264-147455277:147455279-149319049:"
"149319051-149319261:149319263-150635915,a6d83ff6-bfcf-11e7-8c93-246e96158550:1-126618302",
"2c5adab4-d64a-11e5-82df-ac162d72dac0:1-247743812,9f58c169-d121-11e7-835b-ac162db9c048:1-56060985:56060987-56061175:56061177-"
"56061224:56061226-75201528:75201530-75201755:75201757-75201983:75201985-75407550:75407552-75407604:75407606-75407661:"
"75407663-87889848:87889850-87889935:87889937-87890042:87890044-88391955:88391957-88392125:88392127-88392245:88392247-"
"88755771:88755773-88755826:88755828-88755921:88755923-100279047:100279049-100279126:100279128-100279247:100279249-121672430:"
"121672432-121672503:121672505-121672524:121672526-122946019:122946021-122946291:122946293-122946469:122946471-134313284:"
"134313286-134313415:134313417-134313648:134313650-136492728:136492730-136492784:136492786-136492904:136492906-145582402:"
"145582404-145582439:145582441-145582463:145582465-147455222:147455224-147455262:147455264-147455277:147455279-149319049:"
"149319051-149319261:149319263-150635915,a6d83ff6-bfcf-11e7-8c93-246e96158550:1-126618302"},
{"gtid-sets-with-whitespace",
"2c5adab4-d64a-11e5-82df-ac162d72dac0:1-247743812, 9f58c169-d121-11e7-835b-ac162db9c048:1-56060985:56060987-56061175:56061177",
"2c5adab4-d64a-11e5-82df-ac162d72dac0:1-247743812,9f58c169-d121-11e7-835b-ac162db9c048:1-56060985:56060987-56061175:56061177"},
{"gtid-sets-single", "2c5adab4-d64a-11e5-82df-ac162d72dac0:1-247743812", "2c5adab4-d64a-11e5-82df-ac162d72dac0:1-247743812"}};
for (auto & tc : cases)
{
GTIDSets gtid_sets;
gtid_sets.parse(tc.sets);
String want = tc.want;
String got = gtid_sets.toString();
ASSERT(want == got)
}
}
{
struct Testcase
{
String name;
String gtid_sets;
String gtid_str;
String want;
};
Testcase cases[] = {
{"merge",
"10662d71-9d91-11ea-bbc2-0242ac110003:1-2:4-7",
"10662d71-9d91-11ea-bbc2-0242ac110003:3",
"10662d71-9d91-11ea-bbc2-0242ac110003:1-7"},
{"merge-front",
"10662d71-9d91-11ea-bbc2-0242ac110003:1-2:5-7",
"10662d71-9d91-11ea-bbc2-0242ac110003:3",
"10662d71-9d91-11ea-bbc2-0242ac110003:1-3:5-7"},
{"extend-interval",
"10662d71-9d91-11ea-bbc2-0242ac110003:1-2:6-7",
"10662d71-9d91-11ea-bbc2-0242ac110003:4",
"10662d71-9d91-11ea-bbc2-0242ac110003:1-2:4:6-7"},
{"extend-interval",
"10662d71-9d91-11ea-bbc2-0242ac110003:1-2:4:7-9",
"10662d71-9d91-11ea-bbc2-0242ac110003:5",
"10662d71-9d91-11ea-bbc2-0242ac110003:1-2:4-5:7-9"},
{"extend-interval",
"10662d71-9d91-11ea-bbc2-0242ac110003:6-7",
"10662d71-9d91-11ea-bbc2-0242ac110003:4",
"10662d71-9d91-11ea-bbc2-0242ac110003:4:6-7"},
{"extend-interval",
"10662d71-9d91-11ea-bbc2-0242ac110003:6-7",
"10662d71-9d91-11ea-bbc2-0242ac110003:9",
"10662d71-9d91-11ea-bbc2-0242ac110003:6-7:9"},
{"extend-interval",
"10662d71-9d91-11ea-bbc2-0242ac110003:6-7",
"20662d71-9d91-11ea-bbc2-0242ac110003:9",
"10662d71-9d91-11ea-bbc2-0242ac110003:6-7,20662d71-9d91-11ea-bbc2-0242ac110003:9"},
{"shrink-sequence",
"10662d71-9d91-11ea-bbc2-0242ac110003:1-3:4-5:7",
"10662d71-9d91-11ea-bbc2-0242ac110003:6",
"10662d71-9d91-11ea-bbc2-0242ac110003:1-7"},
{"shrink-sequence",
"10662d71-9d91-11ea-bbc2-0242ac110003:1-3:4-5:10",
"10662d71-9d91-11ea-bbc2-0242ac110003:8",
"10662d71-9d91-11ea-bbc2-0242ac110003:1-5:8:10"
}
};
for (auto & tc : cases)
{
GTIDSets gtid_sets;
gtid_sets.parse(tc.gtid_sets);
ASSERT(tc.gtid_sets == gtid_sets.toString())
GTIDSets gtid_sets1;
gtid_sets1.parse(tc.gtid_str);
GTID gtid;
gtid.uuid = gtid_sets1.sets[0].uuid;
gtid.seq_no = gtid_sets1.sets[0].intervals[0].start;
gtid_sets.update(gtid);
String want = tc.want;
String got = gtid_sets.toString();
ASSERT(want == got)
}
}
{
/// mysql_protocol --host=172.17.0.3 --user=root --password=123 --db=sbtest
try
{
boost::program_options::options_description desc("Allowed options");
desc.add_options()("host", boost::program_options::value<std::string>()->required(), "master host")(
"port", boost::program_options::value<std::int32_t>()->default_value(3306), "master port")(
"user", boost::program_options::value<std::string>()->default_value("root"), "master user")(
"password", boost::program_options::value<std::string>()->required(), "master password")(
"gtid", boost::program_options::value<std::string>()->default_value(""), "executed GTID sets")(
"db", boost::program_options::value<std::string>()->required(), "replicate do db")(
"binlog_checksum", boost::program_options::value<std::string>()->default_value("CRC32"), "master binlog_checksum");
boost::program_options::variables_map options;
boost::program_options::store(boost::program_options::parse_command_line(argc, argv, desc), options);
if (argc == 0)
{
return 1;
}
auto host = options.at("host").as<String>();
auto port = options.at("port").as<Int32>();
auto master_user = options.at("user").as<String>();
auto master_password = options.at("password").as<String>();
auto gtid_sets = options.at("gtid").as<String>();
auto replicate_db = options.at("db").as<String>();
auto binlog_checksum = options.at("binlog_checksum").as<String>();
std::cerr << "Master Host: " << host << ", Port: " << port << ", User: " << master_user << ", Password: " << master_password
<< ", Replicate DB: " << replicate_db << ", GTID: " << gtid_sets << std::endl;
UInt32 slave_id = 9004;
MySQLClient slave(host, port, master_user, master_password);
/// Connect to the master.
slave.connect();
slave.startBinlogDumpGTID(slave_id, replicate_db, {}, gtid_sets, binlog_checksum);
WriteBufferFromOStream cerr(std::cerr);
/// Read one binlog event on by one.
while (true)
{
auto event = slave.readOneBinlogEvent();
switch (event->type())
{
case MYSQL_QUERY_EVENT: {
auto binlog_event = std::static_pointer_cast<QueryEvent>(event);
binlog_event->dump(cerr);
Position pos = slave.getPosition();
pos.dump(cerr);
break;
}
case MYSQL_WRITE_ROWS_EVENT: {
auto binlog_event = std::static_pointer_cast<WriteRowsEvent>(event);
binlog_event->dump(cerr);
Position pos = slave.getPosition();
pos.dump(cerr);
break;
}
case MYSQL_UPDATE_ROWS_EVENT: {
auto binlog_event = std::static_pointer_cast<UpdateRowsEvent>(event);
binlog_event->dump(cerr);
Position pos = slave.getPosition();
pos.dump(cerr);
break;
}
case MYSQL_DELETE_ROWS_EVENT: {
auto binlog_event = std::static_pointer_cast<DeleteRowsEvent>(event);
binlog_event->dump(cerr);
Position pos = slave.getPosition();
pos.dump(cerr);
break;
}
default:
if (event->header.type != MySQLReplication::EventType::HEARTBEAT_EVENT)
{
event->dump(cerr);
}
break;
}
}
}
catch (const Exception & ex)
{
std::cerr << "Error: " << ex.message() << std::endl;
return 1;
}
}
}

View File

@ -56,13 +56,14 @@ DataTypePtr DataTypeFactory::getImpl(const String & full_name) const
{
String out_err;
const char * start = full_name.data();
ast = tryParseQuery(parser, start, start + full_name.size(), out_err, false, "data type", false, DBMS_DEFAULT_MAX_QUERY_SIZE, data_type_max_parse_depth);
ast = tryParseQuery(parser, start, start + full_name.size(), out_err, false, "data type", false,
DBMS_DEFAULT_MAX_QUERY_SIZE, data_type_max_parse_depth, DBMS_DEFAULT_MAX_PARSER_BACKTRACKS, true);
if (!ast)
return nullptr;
}
else
{
ast = parseQuery(parser, full_name.data(), full_name.data() + full_name.size(), "data type", false, data_type_max_parse_depth);
ast = parseQuery(parser, full_name.data(), full_name.data() + full_name.size(), "data type", false, data_type_max_parse_depth, DBMS_DEFAULT_MAX_PARSER_BACKTRACKS);
}
return getImpl<nullptr_on_error>(ast);

View File

@ -533,6 +533,8 @@ class DataTypeDateTime;
class DataTypeDateTime64;
template <is_decimal T> constexpr bool IsDataTypeDecimal<DataTypeDecimal<T>> = true;
/// TODO: this is garbage, remove it.
template <> inline constexpr bool IsDataTypeDecimal<DataTypeDateTime64> = true;
template <typename T> constexpr bool IsDataTypeNumber<DataTypeNumber<T>> = true;

View File

@ -5,13 +5,11 @@
#include <DataTypes/DataTypeNullable.h>
#include <DataTypes/ObjectUtils.h>
#include <DataTypes/DataTypeFactory.h>
#include <DataTypes/NestedUtils.h>
#include <Common/JSONParsers/SimdJSONParser.h>
#include <Common/JSONParsers/RapidJSONParser.h>
#include <Common/HashTable/HashSet.h>
#include <Columns/ColumnObject.h>
#include <Columns/ColumnTuple.h>
#include <Columns/ColumnString.h>
#include <Functions/FunctionsConversion.h>
#include <IO/ReadHelpers.h>
#include <IO/WriteHelpers.h>
@ -29,6 +27,7 @@ namespace ErrorCodes
extern const int INCORRECT_DATA;
extern const int CANNOT_READ_ALL_DATA;
extern const int ARGUMENT_OUT_OF_BOUND;
extern const int CANNOT_PARSE_TEXT;
extern const int EXPERIMENTAL_FEATURE_ERROR;
}
@ -344,7 +343,20 @@ void SerializationObject<Parser>::deserializeBinaryBulkFromString(
state.nested_serialization->deserializeBinaryBulkWithMultipleStreams(
column_string, limit, settings, state.nested_state, cache);
ConvertImplGenericFromString<ColumnString>::executeImpl(*column_string, column_object, *this, column_string->size());
size_t input_rows_count = column_string->size();
column_object.reserve(input_rows_count);
FormatSettings format_settings;
for (size_t i = 0; i < input_rows_count; ++i)
{
const auto & val = column_string->getDataAt(i);
ReadBufferFromMemory read_buffer(val.data, val.size);
deserializeWholeText(column_object, read_buffer, format_settings);
if (!read_buffer.eof())
throw Exception(ErrorCodes::CANNOT_PARSE_TEXT,
"Cannot parse string to column Object. Expected eof");
}
}
template <typename Parser>

View File

@ -24,6 +24,9 @@ extern "C" int LLVMFuzzerTestOneInput(const uint8_t * data, size_t size)
auto initialize = [&]() mutable
{
if (context)
return true;
shared_context = Context::createShared();
context = Context::createGlobal(shared_context.get());
context->makeGlobalContext();

View File

@ -444,8 +444,9 @@ namespace
ParserSelectWithUnionQuery parser;
String description = fmt::format("Query for ClickHouse dictionary {}", data.table_name);
String fixed_query = removeWhereConditionPlaceholder(query);
const Settings & settings = data.context->getSettingsRef();
ASTPtr select = parseQuery(parser, fixed_query, description,
data.context->getSettingsRef().max_query_size, data.context->getSettingsRef().max_parser_depth);
settings.max_query_size, settings.max_parser_depth, settings.max_parser_backtracks);
DDLDependencyVisitor::Visitor visitor{data};
visitor.visit(select);

View File

@ -115,7 +115,7 @@ ASTPtr DatabaseDictionary::getCreateTableQueryImpl(const String & table_name, Co
const char * pos = query.data();
std::string error_message;
auto ast = tryParseQuery(parser, pos, pos + query.size(), error_message,
/* hilite = */ false, "", /* allow_multi_statements = */ false, 0, settings.max_parser_depth);
/* hilite = */ false, "", /* allow_multi_statements = */ false, 0, settings.max_parser_depth, settings.max_parser_backtracks, true);
if (!ast && throw_on_error)
throw Exception::createDeprecated(error_message, ErrorCodes::SYNTAX_ERROR);
@ -134,7 +134,7 @@ ASTPtr DatabaseDictionary::getCreateDatabaseQuery() const
}
auto settings = getContext()->getSettingsRef();
ParserCreateQuery parser;
return parseQuery(parser, query.data(), query.data() + query.size(), "", 0, settings.max_parser_depth);
return parseQuery(parser, query.data(), query.data() + query.size(), "", 0, settings.max_parser_depth, settings.max_parser_backtracks);
}
void DatabaseDictionary::shutdown()

View File

@ -187,7 +187,7 @@ ASTPtr DatabaseFilesystem::getCreateDatabaseQuery() const
const String query = fmt::format("CREATE DATABASE {} ENGINE = Filesystem('{}')", backQuoteIfNeed(getDatabaseName()), path);
ParserCreateQuery parser;
ASTPtr ast = parseQuery(parser, query.data(), query.data() + query.size(), "", 0, settings.max_parser_depth);
ASTPtr ast = parseQuery(parser, query.data(), query.data() + query.size(), "", 0, settings.max_parser_depth, settings.max_parser_backtracks);
if (const auto database_comment = getDatabaseComment(); !database_comment.empty())
{

View File

@ -183,7 +183,7 @@ ASTPtr DatabaseHDFS::getCreateDatabaseQuery() const
ParserCreateQuery parser;
const String query = fmt::format("CREATE DATABASE {} ENGINE = HDFS('{}')", backQuoteIfNeed(getDatabaseName()), source);
ASTPtr ast = parseQuery(parser, query.data(), query.data() + query.size(), "", 0, settings.max_parser_depth);
ASTPtr ast = parseQuery(parser, query.data(), query.data() + query.size(), "", 0, settings.max_parser_depth, settings.max_parser_backtracks);
if (const auto database_comment = getDatabaseComment(); !database_comment.empty())
{

View File

@ -526,7 +526,7 @@ ASTPtr DatabaseOnDisk::getCreateDatabaseQuery() const
/// If database.sql doesn't exist, then engine is Ordinary
String query = "CREATE DATABASE " + backQuoteIfNeed(getDatabaseName()) + " ENGINE = Ordinary";
ParserCreateQuery parser;
ast = parseQuery(parser, query.data(), query.data() + query.size(), "", 0, settings.max_parser_depth);
ast = parseQuery(parser, query.data(), query.data() + query.size(), "", 0, settings.max_parser_depth, settings.max_parser_backtracks);
}
if (const auto database_comment = getDatabaseComment(); !database_comment.empty())
@ -707,7 +707,7 @@ ASTPtr DatabaseOnDisk::parseQueryFromMetadata(
const char * pos = query.data();
std::string error_message;
auto ast = tryParseQuery(parser, pos, pos + query.size(), error_message, /* hilite = */ false,
"in file " + metadata_file_path, /* allow_multi_statements = */ false, 0, settings.max_parser_depth);
"in file " + metadata_file_path, /* allow_multi_statements = */ false, 0, settings.max_parser_depth, settings.max_parser_backtracks, true);
if (!ast && throw_on_error)
throw Exception::createDeprecated(error_message, ErrorCodes::SYNTAX_ERROR);
@ -765,12 +765,14 @@ ASTPtr DatabaseOnDisk::getCreateQueryFromStorage(const String & table_name, cons
auto ast_storage = std::make_shared<ASTStorage>();
ast_storage->set(ast_storage->engine, ast_engine);
unsigned max_parser_depth = static_cast<unsigned>(getContext()->getSettingsRef().max_parser_depth);
auto create_table_query = DB::getCreateQueryFromStorage(storage,
ast_storage,
false,
max_parser_depth,
throw_on_error);
const Settings & settings = getContext()->getSettingsRef();
auto create_table_query = DB::getCreateQueryFromStorage(
storage,
ast_storage,
false,
static_cast<unsigned>(settings.max_parser_depth),
static_cast<unsigned>(settings.max_parser_backtracks),
throw_on_error);
create_table_query->set(create_table_query->as<ASTCreateQuery>()->comment,
std::make_shared<ASTLiteral>("SYSTEM TABLE is built on the fly."));

View File

@ -440,10 +440,22 @@ void DatabaseOrdinary::stopLoading()
DatabaseTablesIteratorPtr DatabaseOrdinary::getTablesIterator(ContextPtr local_context, const DatabaseOnDisk::FilterByNameFunction & filter_by_table_name) const
{
auto result = DatabaseWithOwnTablesBase::getTablesIterator(local_context, filter_by_table_name);
std::scoped_lock lock(mutex);
typeid_cast<DatabaseTablesSnapshotIterator &>(*result).setLoadTasks(startup_table);
return result;
// Wait for every table (matching the filter) to be loaded and started up before we make the snapshot.
// It is important, because otherwise table might be:
// - not attached and thus will be missed in the snapshot;
// - not started, which is not good for DDL operations.
LoadTaskPtrs tasks_to_wait;
{
std::lock_guard lock(mutex);
if (!filter_by_table_name)
tasks_to_wait.reserve(startup_table.size());
for (const auto & [table_name, task] : startup_table)
if (!filter_by_table_name || filter_by_table_name(table_name))
tasks_to_wait.emplace_back(task);
}
waitLoad(currentPoolOr(TablesLoaderForegroundPoolId), tasks_to_wait);
return DatabaseWithOwnTablesBase::getTablesIterator(local_context, filter_by_table_name);
}
void DatabaseOrdinary::alterTable(ContextPtr local_context, const StorageID & table_id, const StorageInMemoryMetadata & metadata)
@ -469,7 +481,7 @@ void DatabaseOrdinary::alterTable(ContextPtr local_context, const StorageID & ta
statement.data() + statement.size(),
"in file " + table_metadata_path,
0,
local_context->getSettingsRef().max_parser_depth);
local_context->getSettingsRef().max_parser_depth, local_context->getSettingsRef().max_parser_backtracks);
applyMetadataChangesToCreateQuery(ast, metadata);

View File

@ -812,7 +812,8 @@ static UUID getTableUUIDIfReplicated(const String & metadata, ContextPtr context
ParserCreateQuery parser;
auto size = context->getSettingsRef().max_query_size;
auto depth = context->getSettingsRef().max_parser_depth;
ASTPtr query = parseQuery(parser, metadata, size, depth);
auto backtracks = context->getSettingsRef().max_parser_backtracks;
ASTPtr query = parseQuery(parser, metadata, size, depth, backtracks);
const ASTCreateQuery & create = query->as<const ASTCreateQuery &>();
if (!create.storage || !create.storage->engine)
return UUIDHelpers::Nil;
@ -1234,7 +1235,7 @@ ASTPtr DatabaseReplicated::parseQueryFromMetadataInZooKeeper(const String & node
{
ParserCreateQuery parser;
String description = "in ZooKeeper " + zookeeper_path + "/metadata/" + node_name;
auto ast = parseQuery(parser, query, description, 0, getContext()->getSettingsRef().max_parser_depth);
auto ast = parseQuery(parser, query, description, 0, getContext()->getSettingsRef().max_parser_depth, getContext()->getSettingsRef().max_parser_backtracks);
auto & create = ast->as<ASTCreateQuery &>();
if (create.uuid == UUIDHelpers::Nil || create.getTable() != TABLE_WITH_UUID_NAME_PLACEHOLDER || create.database)
@ -1559,7 +1560,7 @@ DatabaseReplicated::getTablesForBackup(const FilterByNameFunction & filter, cons
for (const auto & [table_name, metadata] : snapshot)
{
ParserCreateQuery parser;
auto create_table_query = parseQuery(parser, metadata, 0, getContext()->getSettingsRef().max_parser_depth);
auto create_table_query = parseQuery(parser, metadata, 0, getContext()->getSettingsRef().max_parser_depth, getContext()->getSettingsRef().max_parser_backtracks);
auto & create = create_table_query->as<ASTCreateQuery &>();
create.attach = false;

View File

@ -191,7 +191,7 @@ ASTPtr DatabaseS3::getCreateDatabaseQuery() const
creation_args += fmt::format(", '{}', '{}'", config.access_key_id.value(), config.secret_access_key.value());
const String query = fmt::format("CREATE DATABASE {} ENGINE = S3({})", backQuoteIfNeed(getDatabaseName()), creation_args);
ASTPtr ast = parseQuery(parser, query.data(), query.data() + query.size(), "", 0, settings.max_parser_depth);
ASTPtr ast = parseQuery(parser, query.data(), query.data() + query.size(), "", 0, settings.max_parser_depth, settings.max_parser_backtracks);
if (const auto database_comment = getDatabaseComment(); !database_comment.empty())
{

View File

@ -108,7 +108,8 @@ void applyMetadataChangesToCreateQuery(const ASTPtr & query, const StorageInMemo
}
ASTPtr getCreateQueryFromStorage(const StoragePtr & storage, const ASTPtr & ast_storage, bool only_ordinary, uint32_t max_parser_depth, bool throw_on_error)
ASTPtr getCreateQueryFromStorage(const StoragePtr & storage, const ASTPtr & ast_storage, bool only_ordinary,
uint32_t max_parser_depth, uint32_t max_parser_backtracks, bool throw_on_error)
{
auto table_id = storage->getStorageID();
auto metadata_ptr = storage->getInMemoryMetadataPtr();
@ -148,7 +149,7 @@ ASTPtr getCreateQueryFromStorage(const StoragePtr & storage, const ASTPtr & ast_
Expected expected;
expected.max_parsed_pos = string_end;
Tokens tokens(type_name.c_str(), string_end);
IParser::Pos pos(tokens, max_parser_depth);
IParser::Pos pos(tokens, max_parser_depth, max_parser_backtracks);
ParserDataType parser;
if (!parser.parse(pos, ast_type, expected))
{

View File

@ -13,7 +13,8 @@ namespace DB
{
void applyMetadataChangesToCreateQuery(const ASTPtr & query, const StorageInMemoryMetadata & metadata);
ASTPtr getCreateQueryFromStorage(const StoragePtr & storage, const ASTPtr & ast_storage, bool only_ordinary, uint32_t max_parser_depth, bool throw_on_error);
ASTPtr getCreateQueryFromStorage(const StoragePtr & storage, const ASTPtr & ast_storage, bool only_ordinary,
uint32_t max_parser_depth, uint32_t max_parser_backtracks, bool throw_on_error);
/// Cleans a CREATE QUERY from temporary flags like "IF NOT EXISTS", "OR REPLACE", "AS SELECT" (for non-views), etc.
void cleanupObjectDefinitionFromTemporaryFlags(ASTCreateQuery & query);

View File

@ -77,17 +77,12 @@ private:
Tables tables;
Tables::iterator it;
// Tasks to wait before returning a table
using Tasks = std::unordered_map<String, LoadTaskPtr>;
Tasks tasks;
protected:
DatabaseTablesSnapshotIterator(DatabaseTablesSnapshotIterator && other) noexcept
: IDatabaseTablesIterator(std::move(other.database_name))
{
size_t idx = std::distance(other.tables.begin(), other.it);
std::swap(tables, other.tables);
std::swap(tasks, other.tasks);
other.it = other.tables.end();
it = tables.begin();
std::advance(it, idx);
@ -110,17 +105,7 @@ public:
const String & name() const override { return it->first; }
const StoragePtr & table() const override
{
if (auto task = tasks.find(it->first); task != tasks.end())
waitLoad(currentPoolOr(TablesLoaderForegroundPoolId), task->second);
return it->second;
}
void setLoadTasks(const Tasks & tasks_)
{
tasks = tasks_;
}
const StoragePtr & table() const override { return it->second; }
};
using DatabaseTablesIteratorPtr = std::unique_ptr<IDatabaseTablesIterator>;

View File

@ -174,12 +174,14 @@ ASTPtr DatabaseMySQL::getCreateTableQueryImpl(const String & table_name, Context
ast_storage->settings = nullptr;
}
unsigned max_parser_depth = static_cast<unsigned>(getContext()->getSettingsRef().max_parser_depth);
auto create_table_query = DB::getCreateQueryFromStorage(storage,
table_storage_define,
true,
max_parser_depth,
throw_on_error);
const Settings & settings = getContext()->getSettingsRef();
auto create_table_query = DB::getCreateQueryFromStorage(
storage,
table_storage_define,
true,
static_cast<unsigned>(settings.max_parser_depth),
static_cast<unsigned>(settings.max_parser_backtracks),
throw_on_error);
return create_table_query;
}

View File

@ -61,7 +61,7 @@ static bool tryReadCharset(
bool tryConvertStringLiterals(String & query)
{
Tokens tokens(query.data(), query.data() + query.size());
IParser::Pos pos(tokens, 0);
IParser::Pos pos(tokens, DBMS_DEFAULT_MAX_PARSER_DEPTH, DBMS_DEFAULT_MAX_PARSER_BACKTRACKS);
Expected expected;
String rewritten_query;
rewritten_query.reserve(query.size());

View File

@ -10,7 +10,7 @@ StorageID tryParseTableIDFromDDL(const String & query, const String & default_da
{
bool is_ddl = false;
Tokens tokens(query.data(), query.data() + query.size());
IParser::Pos pos(tokens, 0);
IParser::Pos pos(tokens, DBMS_DEFAULT_MAX_PARSER_DEPTH, DBMS_DEFAULT_MAX_PARSER_BACKTRACKS);
Expected expected;
if (ParserKeyword("CREATE TEMPORARY TABLE").ignore(pos, expected) || ParserKeyword("CREATE TABLE").ignore(pos, expected))
{

View File

@ -37,7 +37,7 @@ static void quoteLiteral(
bool tryQuoteUnrecognizedTokens(String & query)
{
Tokens tokens(query.data(), query.data() + query.size());
IParser::Pos pos(tokens, 0);
IParser::Pos pos(tokens, DBMS_DEFAULT_MAX_PARSER_DEPTH, DBMS_DEFAULT_MAX_PARSER_BACKTRACKS);
Expected expected;
String rewritten_query;
const char * copy_from = query.data();

View File

@ -194,10 +194,10 @@ ASTPtr DatabaseSQLite::getCreateTableQueryImpl(const String & table_name, Contex
/// Add table_name to engine arguments
storage_engine_arguments->children.insert(storage_engine_arguments->children.begin() + 1, std::make_shared<ASTLiteral>(table_id.table_name));
unsigned max_parser_depth = static_cast<unsigned>(getContext()->getSettingsRef().max_parser_depth);
const Settings & settings = getContext()->getSettingsRef();
auto create_table_query = DB::getCreateQueryFromStorage(storage, table_storage_define, true,
max_parser_depth,
throw_on_error);
static_cast<uint32_t>(settings.max_parser_depth), static_cast<uint32_t>(settings.max_parser_backtracks), throw_on_error);
return create_table_query;
}

View File

@ -9,7 +9,7 @@
#include <Common/ProfilingScopedRWLock.h>
#include <Dictionaries/DictionarySource.h>
#include <Dictionaries/DictionarySourceHelpers.h>
#include <Dictionaries/DictionaryPipelineExecutor.h>
#include <Dictionaries/HierarchyDictionariesUtils.h>
#include <QueryPipeline/QueryPipelineBuilder.h>

View File

@ -0,0 +1,42 @@
#include <Dictionaries/DictionaryPipelineExecutor.h>
#include <Core/Block.h>
#include <Processors/Executors/PullingAsyncPipelineExecutor.h>
#include <Processors/Executors/PullingPipelineExecutor.h>
#include <QueryPipeline/QueryPipeline.h>
namespace DB
{
namespace ErrorCodes
{
extern const int LOGICAL_ERROR;
}
DictionaryPipelineExecutor::DictionaryPipelineExecutor(QueryPipeline & pipeline_, bool async)
: async_executor(async ? std::make_unique<PullingAsyncPipelineExecutor>(pipeline_) : nullptr)
, executor(async ? nullptr : std::make_unique<PullingPipelineExecutor>(pipeline_))
{
}
bool DictionaryPipelineExecutor::pull(Block & block)
{
if (async_executor)
{
while (true)
{
bool has_data = async_executor->pull(block);
if (has_data && !block)
continue;
return has_data;
}
}
else if (executor)
return executor->pull(block);
else
throw Exception(ErrorCodes::LOGICAL_ERROR, "DictionaryPipelineExecutor is not initialized");
}
DictionaryPipelineExecutor::~DictionaryPipelineExecutor() = default;
}

View File

@ -0,0 +1,27 @@
#pragma once
#include <memory>
namespace DB
{
class Block;
class QueryPipeline;
class PullingAsyncPipelineExecutor;
class PullingPipelineExecutor;
/// Wrapper for `Pulling(Async)PipelineExecutor` to dynamically dispatch calls to the right executor
class DictionaryPipelineExecutor
{
public:
DictionaryPipelineExecutor(QueryPipeline & pipeline_, bool async);
bool pull(Block & block);
~DictionaryPipelineExecutor();
private:
std::unique_ptr<PullingAsyncPipelineExecutor> async_executor;
std::unique_ptr<PullingPipelineExecutor> executor;
};
}

View File

@ -9,15 +9,11 @@
#include <Poco/Util/AbstractConfiguration.h>
#include <Common/SettingsChanges.h>
#include <Processors/Executors/PullingPipelineExecutor.h>
#include <Processors/Executors/PullingAsyncPipelineExecutor.h>
namespace DB
{
namespace ErrorCodes
{
extern const int LOGICAL_ERROR;
extern const int SIZES_OF_COLUMNS_DOESNT_MATCH;
}
@ -135,29 +131,4 @@ String TransformWithAdditionalColumns::getName() const
return "TransformWithAdditionalColumns";
}
DictionaryPipelineExecutor::DictionaryPipelineExecutor(QueryPipeline & pipeline_, bool async)
: async_executor(async ? std::make_unique<PullingAsyncPipelineExecutor>(pipeline_) : nullptr)
, executor(async ? nullptr : std::make_unique<PullingPipelineExecutor>(pipeline_))
{}
bool DictionaryPipelineExecutor::pull(Block & block)
{
if (async_executor)
{
while (true)
{
bool has_data = async_executor->pull(block);
if (has_data && !block)
continue;
return has_data;
}
}
else if (executor)
return executor->pull(block);
else
throw Exception(ErrorCodes::LOGICAL_ERROR, "DictionaryPipelineExecutor is not initialized");
}
DictionaryPipelineExecutor::~DictionaryPipelineExecutor() = default;
}

View File

@ -16,10 +16,6 @@ namespace DB
struct DictionaryStructure;
class SettingsChanges;
class PullingPipelineExecutor;
class PullingAsyncPipelineExecutor;
class QueryPipeline;
/// For simple key
Block blockForIds(
@ -55,17 +51,4 @@ private:
size_t current_range_index = 0;
};
/// Wrapper for `Pulling(Async)PipelineExecutor` to dynamically dispatch calls to the right executor
class DictionaryPipelineExecutor
{
public:
DictionaryPipelineExecutor(QueryPipeline & pipeline_, bool async);
bool pull(Block & block);
~DictionaryPipelineExecutor();
private:
std::unique_ptr<PullingAsyncPipelineExecutor> async_executor;
std::unique_ptr<PullingPipelineExecutor> executor;
};
}

View File

@ -41,6 +41,33 @@ enum class AttributeUnderlyingType : TypeIndexUnderlying
#undef map_item
#define CALL_FOR_ALL_DICTIONARY_ATTRIBUTE_TYPES(M) \
M(UInt8) \
M(UInt16) \
M(UInt32) \
M(UInt64) \
M(UInt128) \
M(UInt256) \
M(Int8) \
M(Int16) \
M(Int32) \
M(Int64) \
M(Int128) \
M(Int256) \
M(Decimal32) \
M(Decimal64) \
M(Decimal128) \
M(Decimal256) \
M(DateTime64) \
M(Float32) \
M(Float64) \
M(UUID) \
M(IPv4) \
M(IPv6) \
M(String) \
M(Array)
/// Min and max lifetimes for a dictionary or its entry
using DictionaryLifetime = ExternalLoadableLifetime;

View File

@ -15,7 +15,7 @@
#include <QueryPipeline/QueryPipelineBuilder.h>
#include <Dictionaries/DictionarySource.h>
#include <Dictionaries/DictionarySourceHelpers.h>
#include <Dictionaries/DictionaryPipelineExecutor.h>
#include <Dictionaries/DictionaryFactory.h>
#include <Dictionaries/HierarchyDictionariesUtils.h>

View File

@ -10,6 +10,7 @@
#include <Dictionaries/ClickHouseDictionarySource.h>
#include <Dictionaries/DictionarySource.h>
#include <Dictionaries/DictionarySourceHelpers.h>
#include <Dictionaries/DictionaryPipelineExecutor.h>
#include <Dictionaries/DictionaryFactory.h>
#include <Dictionaries/HierarchyDictionariesUtils.h>

View File

@ -6,7 +6,7 @@
#include <Dictionaries/DictionaryHelpers.h>
#include <Dictionaries/ClickHouseDictionarySource.h>
#include <Dictionaries/DictionarySource.h>
#include <Dictionaries/DictionarySourceHelpers.h>
#include <Dictionaries/DictionaryPipelineExecutor.h>
#include <Dictionaries/HierarchyDictionariesUtils.h>
#include <Dictionaries/HashedDictionaryCollectionType.h>
#include <Dictionaries/HashedDictionaryCollectionTraits.h>

View File

@ -19,6 +19,7 @@
#include <Dictionaries/ClickHouseDictionarySource.h>
#include <Dictionaries/DictionarySource.h>
#include <Dictionaries/DictionarySourceHelpers.h>
#include <Dictionaries/DictionaryPipelineExecutor.h>
#include <Dictionaries/DictionaryFactory.h>
#include <Functions/FunctionHelpers.h>

View File

@ -1,6 +1,5 @@
#include "PolygonDictionary.h"
#include <numeric>
#include <cmath>
#include <base/sort.h>
@ -15,7 +14,7 @@
#include <Processors/Sources/SourceFromSingleChunk.h>
#include <Dictionaries/DictionaryFactory.h>
#include <Dictionaries/DictionarySource.h>
#include <Dictionaries/DictionarySourceHelpers.h>
#include <Dictionaries/DictionaryPipelineExecutor.h>
namespace DB

View File

@ -0,0 +1,225 @@
#include <Dictionaries/RangeHashedDictionary.h>
namespace DB
{
template <DictionaryKeyType dictionary_key_type>
ColumnPtr RangeHashedDictionary<dictionary_key_type>::getColumn(
const std::string & attribute_name,
const DataTypePtr & attribute_type,
const Columns & key_columns,
const DataTypes & key_types,
DefaultOrFilter default_or_filter) const
{
bool is_short_circuit = std::holds_alternative<RefFilter>(default_or_filter);
assert(is_short_circuit || std::holds_alternative<RefDefault>(default_or_filter));
if (dictionary_key_type == DictionaryKeyType::Complex)
{
auto key_types_copy = key_types;
key_types_copy.pop_back();
dict_struct.validateKeyTypes(key_types_copy);
}
ColumnPtr result;
const auto & dictionary_attribute = dict_struct.getAttribute(attribute_name, attribute_type);
const size_t attribute_index = dict_struct.attribute_name_to_index.find(attribute_name)->second;
const auto & attribute = attributes[attribute_index];
/// Cast range column to storage type
Columns modified_key_columns = key_columns;
const ColumnPtr & range_storage_column = key_columns.back();
ColumnWithTypeAndName column_to_cast = {range_storage_column->convertToFullColumnIfConst(), key_types.back(), ""};
modified_key_columns.back() = castColumnAccurate(column_to_cast, dict_struct.range_min->type);
size_t keys_size = key_columns.front()->size();
bool is_attribute_nullable = attribute.is_value_nullable.has_value();
ColumnUInt8::MutablePtr col_null_map_to;
ColumnUInt8::Container * vec_null_map_to = nullptr;
if (is_attribute_nullable)
{
col_null_map_to = ColumnUInt8::create(keys_size, false);
vec_null_map_to = &col_null_map_to->getData();
}
auto type_call = [&](const auto & dictionary_attribute_type)
{
using Type = std::decay_t<decltype(dictionary_attribute_type)>;
using AttributeType = typename Type::AttributeType;
using ValueType = DictionaryValueType<AttributeType>;
using ColumnProvider = DictionaryAttributeColumnProvider<AttributeType>;
auto column = ColumnProvider::getColumn(dictionary_attribute, keys_size);
if (is_short_circuit)
{
IColumn::Filter & default_mask = std::get<RefFilter>(default_or_filter).get();
size_t keys_found = 0;
if constexpr (std::is_same_v<ValueType, Array>)
{
auto * out = column.get();
keys_found = getItemsShortCircuitImpl<ValueType, false>(
attribute,
modified_key_columns,
[&](size_t, const Array & value, bool)
{
out->insert(value);
},
default_mask);
}
else if constexpr (std::is_same_v<ValueType, StringRef>)
{
auto * out = column.get();
if (is_attribute_nullable)
keys_found = getItemsShortCircuitImpl<ValueType, true>(
attribute,
modified_key_columns,
[&](size_t row, StringRef value, bool is_null)
{
(*vec_null_map_to)[row] = is_null;
out->insertData(value.data, value.size);
},
default_mask);
else
keys_found = getItemsShortCircuitImpl<ValueType, false>(
attribute,
modified_key_columns,
[&](size_t, StringRef value, bool)
{
out->insertData(value.data, value.size);
},
default_mask);
}
else
{
auto & out = column->getData();
if (is_attribute_nullable)
keys_found = getItemsShortCircuitImpl<ValueType, true>(
attribute,
modified_key_columns,
[&](size_t row, const auto value, bool is_null)
{
(*vec_null_map_to)[row] = is_null;
out[row] = value;
},
default_mask);
else
keys_found = getItemsShortCircuitImpl<ValueType, false>(
attribute,
modified_key_columns,
[&](size_t row, const auto value, bool)
{
out[row] = value;
},
default_mask);
out.resize(keys_found);
}
if (is_attribute_nullable)
vec_null_map_to->resize(keys_found);
}
else
{
const ColumnPtr & default_values_column = std::get<RefDefault>(default_or_filter).get();
DictionaryDefaultValueExtractor<AttributeType> default_value_extractor(
dictionary_attribute.null_value, default_values_column);
if constexpr (std::is_same_v<ValueType, Array>)
{
auto * out = column.get();
getItemsImpl<ValueType, false>(
attribute,
modified_key_columns,
[&](size_t, const Array & value, bool)
{
out->insert(value);
},
default_value_extractor);
}
else if constexpr (std::is_same_v<ValueType, StringRef>)
{
auto * out = column.get();
if (is_attribute_nullable)
getItemsImpl<ValueType, true>(
attribute,
modified_key_columns,
[&](size_t row, StringRef value, bool is_null)
{
(*vec_null_map_to)[row] = is_null;
out->insertData(value.data, value.size);
},
default_value_extractor);
else
getItemsImpl<ValueType, false>(
attribute,
modified_key_columns,
[&](size_t, StringRef value, bool)
{
out->insertData(value.data, value.size);
},
default_value_extractor);
}
else
{
auto & out = column->getData();
if (is_attribute_nullable)
getItemsImpl<ValueType, true>(
attribute,
modified_key_columns,
[&](size_t row, const auto value, bool is_null)
{
(*vec_null_map_to)[row] = is_null;
out[row] = value;
},
default_value_extractor);
else
getItemsImpl<ValueType, false>(
attribute,
modified_key_columns,
[&](size_t row, const auto value, bool)
{
out[row] = value;
},
default_value_extractor);
}
}
result = std::move(column);
};
callOnDictionaryAttributeType(attribute.type, type_call);
if (is_attribute_nullable)
result = ColumnNullable::create(result, std::move(col_null_map_to));
return result;
}
template
ColumnPtr RangeHashedDictionary<DictionaryKeyType::Simple>::getColumn(
const std::string & attribute_name,
const DataTypePtr & attribute_type,
const Columns & key_columns,
const DataTypes & key_types,
DefaultOrFilter default_or_filter) const;
template
ColumnPtr RangeHashedDictionary<DictionaryKeyType::Complex>::getColumn(
const std::string & attribute_name,
const DataTypePtr & attribute_type,
const Columns & key_columns,
const DataTypes & key_types,
DefaultOrFilter default_or_filter) const;
}

View File

@ -15,6 +15,8 @@
#include <Dictionaries/IDictionary.h>
#include <Dictionaries/IDictionarySource.h>
#include <Dictionaries/DictionaryHelpers.h>
#include <Dictionaries/DictionarySource.h>
#include <Dictionaries/DictionaryPipelineExecutor.h>
#include <DataTypes/DataTypesNumber.h>
#include <DataTypes/DataTypesDecimal.h>
@ -29,11 +31,6 @@
#include <Functions/FunctionHelpers.h>
#include <Interpreters/castColumn.h>
#include <Dictionaries/ClickHouseDictionarySource.h>
#include <Dictionaries/DictionarySource.h>
#include <Dictionaries/DictionarySourceHelpers.h>
namespace DB
{
@ -46,7 +43,6 @@ namespace ErrorCodes
extern const int TYPE_MISMATCH;
}
enum class RangeHashedDictionaryLookupStrategy : uint8_t
{
min,
@ -238,18 +234,21 @@ private:
static Attribute createAttribute(const DictionaryAttribute & dictionary_attribute);
template <typename AttributeType, bool is_nullable, typename ValueSetter, typename DefaultValueExtractor>
template <typename ValueType>
using ValueSetterFunc = std::function<void(size_t, const ValueType &, bool)>;
template <typename ValueType, bool is_nullable, typename DefaultValueExtractor>
void getItemsImpl(
const Attribute & attribute,
const Columns & key_columns,
ValueSetter && set_value,
ValueSetterFunc<ValueType> && set_value,
DefaultValueExtractor & default_value_extractor) const;
template <typename AttributeType, bool is_nullable, typename ValueSetter>
template <typename ValueType, bool is_nullable>
size_t getItemsShortCircuitImpl(
const Attribute & attribute,
const Columns & key_columns,
ValueSetter && set_value,
ValueSetterFunc<ValueType> && set_value,
IColumn::Filter & default_mask) const;
ColumnPtr getColumnInternal(
@ -341,209 +340,6 @@ RangeHashedDictionary<dictionary_key_type>::RangeHashedDictionary(
calculateBytesAllocated();
}
template <DictionaryKeyType dictionary_key_type>
ColumnPtr RangeHashedDictionary<dictionary_key_type>::getColumn(
const std::string & attribute_name,
const DataTypePtr & attribute_type,
const Columns & key_columns,
const DataTypes & key_types,
DefaultOrFilter default_or_filter) const
{
bool is_short_circuit = std::holds_alternative<RefFilter>(default_or_filter);
assert(is_short_circuit || std::holds_alternative<RefDefault>(default_or_filter));
if (dictionary_key_type == DictionaryKeyType::Complex)
{
auto key_types_copy = key_types;
key_types_copy.pop_back();
dict_struct.validateKeyTypes(key_types_copy);
}
ColumnPtr result;
const auto & dictionary_attribute = dict_struct.getAttribute(attribute_name, attribute_type);
const size_t attribute_index = dict_struct.attribute_name_to_index.find(attribute_name)->second;
const auto & attribute = attributes[attribute_index];
/// Cast range column to storage type
Columns modified_key_columns = key_columns;
const ColumnPtr & range_storage_column = key_columns.back();
ColumnWithTypeAndName column_to_cast = {range_storage_column->convertToFullColumnIfConst(), key_types.back(), ""};
modified_key_columns.back() = castColumnAccurate(column_to_cast, dict_struct.range_min->type);
size_t keys_size = key_columns.front()->size();
bool is_attribute_nullable = attribute.is_value_nullable.has_value();
ColumnUInt8::MutablePtr col_null_map_to;
ColumnUInt8::Container * vec_null_map_to = nullptr;
if (is_attribute_nullable)
{
col_null_map_to = ColumnUInt8::create(keys_size, false);
vec_null_map_to = &col_null_map_to->getData();
}
auto type_call = [&](const auto & dictionary_attribute_type)
{
using Type = std::decay_t<decltype(dictionary_attribute_type)>;
using AttributeType = typename Type::AttributeType;
using ValueType = DictionaryValueType<AttributeType>;
using ColumnProvider = DictionaryAttributeColumnProvider<AttributeType>;
auto column = ColumnProvider::getColumn(dictionary_attribute, keys_size);
if (is_short_circuit)
{
IColumn::Filter & default_mask = std::get<RefFilter>(default_or_filter).get();
size_t keys_found = 0;
if constexpr (std::is_same_v<ValueType, Array>)
{
auto * out = column.get();
keys_found = getItemsShortCircuitImpl<ValueType, false>(
attribute,
modified_key_columns,
[&](size_t, const Array & value, bool)
{
out->insert(value);
},
default_mask);
}
else if constexpr (std::is_same_v<ValueType, StringRef>)
{
auto * out = column.get();
if (is_attribute_nullable)
keys_found = getItemsShortCircuitImpl<ValueType, true>(
attribute,
modified_key_columns,
[&](size_t row, StringRef value, bool is_null)
{
(*vec_null_map_to)[row] = is_null;
out->insertData(value.data, value.size);
},
default_mask);
else
keys_found = getItemsShortCircuitImpl<ValueType, false>(
attribute,
modified_key_columns,
[&](size_t, StringRef value, bool)
{
out->insertData(value.data, value.size);
},
default_mask);
}
else
{
auto & out = column->getData();
if (is_attribute_nullable)
keys_found = getItemsShortCircuitImpl<ValueType, true>(
attribute,
modified_key_columns,
[&](size_t row, const auto value, bool is_null)
{
(*vec_null_map_to)[row] = is_null;
out[row] = value;
},
default_mask);
else
keys_found = getItemsShortCircuitImpl<ValueType, false>(
attribute,
modified_key_columns,
[&](size_t row, const auto value, bool)
{
out[row] = value;
},
default_mask);
out.resize(keys_found);
}
if (is_attribute_nullable)
vec_null_map_to->resize(keys_found);
}
else
{
const ColumnPtr & default_values_column = std::get<RefDefault>(default_or_filter).get();
DictionaryDefaultValueExtractor<AttributeType> default_value_extractor(
dictionary_attribute.null_value, default_values_column);
if constexpr (std::is_same_v<ValueType, Array>)
{
auto * out = column.get();
getItemsImpl<ValueType, false>(
attribute,
modified_key_columns,
[&](size_t, const Array & value, bool)
{
out->insert(value);
},
default_value_extractor);
}
else if constexpr (std::is_same_v<ValueType, StringRef>)
{
auto * out = column.get();
if (is_attribute_nullable)
getItemsImpl<ValueType, true>(
attribute,
modified_key_columns,
[&](size_t row, StringRef value, bool is_null)
{
(*vec_null_map_to)[row] = is_null;
out->insertData(value.data, value.size);
},
default_value_extractor);
else
getItemsImpl<ValueType, false>(
attribute,
modified_key_columns,
[&](size_t, StringRef value, bool)
{
out->insertData(value.data, value.size);
},
default_value_extractor);
}
else
{
auto & out = column->getData();
if (is_attribute_nullable)
getItemsImpl<ValueType, true>(
attribute,
modified_key_columns,
[&](size_t row, const auto value, bool is_null)
{
(*vec_null_map_to)[row] = is_null;
out[row] = value;
},
default_value_extractor);
else
getItemsImpl<ValueType, false>(
attribute,
modified_key_columns,
[&](size_t row, const auto value, bool)
{
out[row] = value;
},
default_value_extractor);
}
}
result = std::move(column);
};
callOnDictionaryAttributeType(attribute.type, type_call);
if (is_attribute_nullable)
result = ColumnNullable::create(result, std::move(col_null_map_to));
return result;
}
template <DictionaryKeyType dictionary_key_type>
ColumnPtr RangeHashedDictionary<dictionary_key_type>::getColumnInternal(
const std::string & attribute_name,
@ -842,224 +638,6 @@ typename RangeHashedDictionary<dictionary_key_type>::Attribute RangeHashedDictio
return attribute;
}
template <DictionaryKeyType dictionary_key_type>
template <typename AttributeType, bool is_nullable, typename ValueSetter, typename DefaultValueExtractor>
void RangeHashedDictionary<dictionary_key_type>::getItemsImpl(
const Attribute & attribute,
const Columns & key_columns,
ValueSetter && set_value,
DefaultValueExtractor & default_value_extractor) const
{
const auto & attribute_container = std::get<AttributeContainerType<AttributeType>>(attribute.container);
size_t keys_found = 0;
const ColumnPtr & range_column = key_columns.back();
auto key_columns_copy = key_columns;
key_columns_copy.pop_back();
DictionaryKeysArenaHolder<dictionary_key_type> arena_holder;
DictionaryKeysExtractor<dictionary_key_type> keys_extractor(key_columns_copy, arena_holder.getComplexKeyArena());
const size_t keys_size = keys_extractor.getKeysSize();
callOnRangeType(dict_struct.range_min->type, [&](const auto & types)
{
using Types = std::decay_t<decltype(types)>;
using RangeColumnType = typename Types::LeftType;
using RangeStorageType = typename RangeColumnType::ValueType;
using RangeInterval = Interval<RangeStorageType>;
const auto * range_column_typed = typeid_cast<const RangeColumnType *>(range_column.get());
if (!range_column_typed)
throw Exception(ErrorCodes::TYPE_MISMATCH,
"Dictionary {} range column type should be equal to {}",
getFullName(),
dict_struct.range_min->type->getName());
const auto & range_column_data = range_column_typed->getData();
const auto & key_attribute_container = std::get<KeyAttributeContainerType<RangeStorageType>>(key_attribute.container);
for (size_t key_index = 0; key_index < keys_size; ++key_index)
{
auto key = keys_extractor.extractCurrentKey();
const auto it = key_attribute_container.find(key);
if (it)
{
const auto date = range_column_data[key_index];
const auto & interval_tree = it->getMapped();
size_t value_index = 0;
std::optional<RangeInterval> range;
interval_tree.find(date, [&](auto & interval, auto & interval_value_index)
{
if (range)
{
if (likely(configuration.lookup_strategy == RangeHashedDictionaryLookupStrategy::min) && interval < *range)
{
range = interval;
value_index = interval_value_index;
}
else if (configuration.lookup_strategy == RangeHashedDictionaryLookupStrategy::max && interval > * range)
{
range = interval;
value_index = interval_value_index;
}
}
else
{
range = interval;
value_index = interval_value_index;
}
return true;
});
if (range.has_value())
{
++keys_found;
AttributeType value = attribute_container[value_index];
if constexpr (is_nullable)
{
bool is_null = (*attribute.is_value_nullable)[value_index];
set_value(key_index, value, is_null);
}
else
{
set_value(key_index, value, false);
}
keys_extractor.rollbackCurrentKey();
continue;
}
}
if constexpr (is_nullable)
set_value(key_index, default_value_extractor[key_index], default_value_extractor.isNullAt(key_index));
else
set_value(key_index, default_value_extractor[key_index], false);
keys_extractor.rollbackCurrentKey();
}
});
query_count.fetch_add(keys_size, std::memory_order_relaxed);
found_count.fetch_add(keys_found, std::memory_order_relaxed);
}
template <DictionaryKeyType dictionary_key_type>
template <typename AttributeType, bool is_nullable, typename ValueSetter>
size_t RangeHashedDictionary<dictionary_key_type>::getItemsShortCircuitImpl(
const Attribute & attribute,
const Columns & key_columns,
ValueSetter && set_value,
IColumn::Filter & default_mask) const
{
const auto & attribute_container = std::get<AttributeContainerType<AttributeType>>(attribute.container);
size_t keys_found = 0;
const ColumnPtr & range_column = key_columns.back();
auto key_columns_copy = key_columns;
key_columns_copy.pop_back();
DictionaryKeysArenaHolder<dictionary_key_type> arena_holder;
DictionaryKeysExtractor<dictionary_key_type> keys_extractor(key_columns_copy, arena_holder.getComplexKeyArena());
const size_t keys_size = keys_extractor.getKeysSize();
default_mask.resize(keys_size);
callOnRangeType(dict_struct.range_min->type, [&](const auto & types)
{
using Types = std::decay_t<decltype(types)>;
using RangeColumnType = typename Types::LeftType;
using RangeStorageType = typename RangeColumnType::ValueType;
using RangeInterval = Interval<RangeStorageType>;
const auto * range_column_typed = typeid_cast<const RangeColumnType *>(range_column.get());
if (!range_column_typed)
throw Exception(ErrorCodes::TYPE_MISMATCH,
"Dictionary {} range column type should be equal to {}",
getFullName(),
dict_struct.range_min->type->getName());
const auto & range_column_data = range_column_typed->getData();
const auto & key_attribute_container = std::get<KeyAttributeContainerType<RangeStorageType>>(key_attribute.container);
for (size_t key_index = 0; key_index < keys_size; ++key_index)
{
auto key = keys_extractor.extractCurrentKey();
const auto it = key_attribute_container.find(key);
if (it)
{
const auto date = range_column_data[key_index];
const auto & interval_tree = it->getMapped();
size_t value_index = 0;
std::optional<RangeInterval> range;
interval_tree.find(date, [&](auto & interval, auto & interval_value_index)
{
if (range)
{
if (likely(configuration.lookup_strategy == RangeHashedDictionaryLookupStrategy::min) && interval < *range)
{
range = interval;
value_index = interval_value_index;
}
else if (configuration.lookup_strategy == RangeHashedDictionaryLookupStrategy::max && interval > * range)
{
range = interval;
value_index = interval_value_index;
}
}
else
{
range = interval;
value_index = interval_value_index;
}
return true;
});
if (range.has_value())
{
default_mask[key_index] = 0;
++keys_found;
AttributeType value = attribute_container[value_index];
if constexpr (is_nullable)
{
bool is_null = (*attribute.is_value_nullable)[value_index];
set_value(key_index, value, is_null);
}
else
{
set_value(key_index, value, false);
}
keys_extractor.rollbackCurrentKey();
continue;
}
}
default_mask[key_index] = 1;
keys_extractor.rollbackCurrentKey();
}
});
query_count.fetch_add(keys_size, std::memory_order_relaxed);
found_count.fetch_add(keys_found, std::memory_order_relaxed);
return keys_found;
}
template <DictionaryKeyType dictionary_key_type>
template <typename AttributeType, bool is_nullable, typename ValueSetter>
void RangeHashedDictionary<dictionary_key_type>::getItemsInternalImpl(

View File

@ -0,0 +1,133 @@
#include <Dictionaries/RangeHashedDictionary.h>
#define INSTANTIATE_GET_ITEMS_IMPL(DictionaryKeyType, IsNullable, AttributeType, ValueType) \
template void RangeHashedDictionary<DictionaryKeyType>::getItemsImpl<ValueType, IsNullable, DictionaryDefaultValueExtractor<AttributeType>>( \
const Attribute & attribute,\
const Columns & key_columns,\
typename RangeHashedDictionary<DictionaryKeyType>::ValueSetterFunc<ValueType> && set_value,\
DictionaryDefaultValueExtractor<AttributeType> & default_value_extractor) const;
#define INSTANTIATE_GET_ITEMS_IMPL_FOR_ATTRIBUTE_TYPE(AttributeType) \
INSTANTIATE_GET_ITEMS_IMPL(DictionaryKeyType::Simple, true, AttributeType, DictionaryValueType<AttributeType>) \
INSTANTIATE_GET_ITEMS_IMPL(DictionaryKeyType::Simple, false, AttributeType, DictionaryValueType<AttributeType>) \
INSTANTIATE_GET_ITEMS_IMPL(DictionaryKeyType::Complex, true, AttributeType, DictionaryValueType<AttributeType>) \
INSTANTIATE_GET_ITEMS_IMPL(DictionaryKeyType::Complex, false, AttributeType, DictionaryValueType<AttributeType>)
namespace DB
{
template <DictionaryKeyType dictionary_key_type>
template <typename ValueType, bool is_nullable, typename DefaultValueExtractor>
void RangeHashedDictionary<dictionary_key_type>::getItemsImpl(
const Attribute & attribute,
const Columns & key_columns,
typename RangeHashedDictionary<dictionary_key_type>::ValueSetterFunc<ValueType> && set_value,
DefaultValueExtractor & default_value_extractor) const
{
const auto & attribute_container = std::get<AttributeContainerType<ValueType>>(attribute.container);
size_t keys_found = 0;
const ColumnPtr & range_column = key_columns.back();
auto key_columns_copy = key_columns;
key_columns_copy.pop_back();
DictionaryKeysArenaHolder<dictionary_key_type> arena_holder;
DictionaryKeysExtractor<dictionary_key_type> keys_extractor(key_columns_copy, arena_holder.getComplexKeyArena());
const size_t keys_size = keys_extractor.getKeysSize();
callOnRangeType(
dict_struct.range_min->type,
[&](const auto & types)
{
using Types = std::decay_t<decltype(types)>;
using RangeColumnType = typename Types::LeftType;
using RangeStorageType = typename RangeColumnType::ValueType;
using RangeInterval = Interval<RangeStorageType>;
const auto * range_column_typed = typeid_cast<const RangeColumnType *>(range_column.get());
if (!range_column_typed)
throw Exception(
ErrorCodes::TYPE_MISMATCH,
"Dictionary {} range column type should be equal to {}",
getFullName(),
dict_struct.range_min->type->getName());
const auto & range_column_data = range_column_typed->getData();
const auto & key_attribute_container = std::get<KeyAttributeContainerType<RangeStorageType>>(key_attribute.container);
for (size_t key_index = 0; key_index < keys_size; ++key_index)
{
auto key = keys_extractor.extractCurrentKey();
const auto it = key_attribute_container.find(key);
if (it)
{
const auto date = range_column_data[key_index];
const auto & interval_tree = it->getMapped();
size_t value_index = 0;
std::optional<RangeInterval> range;
interval_tree.find(
date,
[&](auto & interval, auto & interval_value_index)
{
if (range)
{
if (likely(configuration.lookup_strategy == RangeHashedDictionaryLookupStrategy::min) && interval < *range)
{
range = interval;
value_index = interval_value_index;
}
else if (configuration.lookup_strategy == RangeHashedDictionaryLookupStrategy::max && interval > *range)
{
range = interval;
value_index = interval_value_index;
}
}
else
{
range = interval;
value_index = interval_value_index;
}
return true;
});
if (range.has_value())
{
++keys_found;
ValueType value = attribute_container[value_index];
if constexpr (is_nullable)
{
bool is_null = (*attribute.is_value_nullable)[value_index];
set_value(key_index, value, is_null);
}
else
{
set_value(key_index, value, false);
}
keys_extractor.rollbackCurrentKey();
continue;
}
}
if constexpr (is_nullable)
set_value(key_index, default_value_extractor[key_index], default_value_extractor.isNullAt(key_index));
else
set_value(key_index, default_value_extractor[key_index], false);
keys_extractor.rollbackCurrentKey();
}
});
query_count.fetch_add(keys_size, std::memory_order_relaxed);
found_count.fetch_add(keys_found, std::memory_order_relaxed);
}
}

View File

@ -0,0 +1,10 @@
#include <Dictionaries/RangeHashedDictionaryGetItemsImpl.txx>
namespace DB
{
INSTANTIATE_GET_ITEMS_IMPL_FOR_ATTRIBUTE_TYPE(Decimal32);
INSTANTIATE_GET_ITEMS_IMPL_FOR_ATTRIBUTE_TYPE(Decimal64);
INSTANTIATE_GET_ITEMS_IMPL_FOR_ATTRIBUTE_TYPE(Decimal128);
INSTANTIATE_GET_ITEMS_IMPL_FOR_ATTRIBUTE_TYPE(Decimal256);
INSTANTIATE_GET_ITEMS_IMPL_FOR_ATTRIBUTE_TYPE(DateTime64);
}

View File

@ -0,0 +1,7 @@
#include <Dictionaries/RangeHashedDictionaryGetItemsImpl.txx>
namespace DB
{
INSTANTIATE_GET_ITEMS_IMPL_FOR_ATTRIBUTE_TYPE(Float32);
INSTANTIATE_GET_ITEMS_IMPL_FOR_ATTRIBUTE_TYPE(Float64);
}

View File

@ -0,0 +1,11 @@
#include <Dictionaries/RangeHashedDictionaryGetItemsImpl.txx>
namespace DB
{
INSTANTIATE_GET_ITEMS_IMPL_FOR_ATTRIBUTE_TYPE(Int8);
INSTANTIATE_GET_ITEMS_IMPL_FOR_ATTRIBUTE_TYPE(Int16);
INSTANTIATE_GET_ITEMS_IMPL_FOR_ATTRIBUTE_TYPE(Int32);
INSTANTIATE_GET_ITEMS_IMPL_FOR_ATTRIBUTE_TYPE(Int64);
INSTANTIATE_GET_ITEMS_IMPL_FOR_ATTRIBUTE_TYPE(Int128);
INSTANTIATE_GET_ITEMS_IMPL_FOR_ATTRIBUTE_TYPE(Int256);
}

View File

@ -0,0 +1,10 @@
#include <Dictionaries/RangeHashedDictionaryGetItemsImpl.txx>
namespace DB
{
INSTANTIATE_GET_ITEMS_IMPL_FOR_ATTRIBUTE_TYPE(UUID);
INSTANTIATE_GET_ITEMS_IMPL_FOR_ATTRIBUTE_TYPE(IPv4);
INSTANTIATE_GET_ITEMS_IMPL_FOR_ATTRIBUTE_TYPE(IPv6);
INSTANTIATE_GET_ITEMS_IMPL_FOR_ATTRIBUTE_TYPE(String);
INSTANTIATE_GET_ITEMS_IMPL_FOR_ATTRIBUTE_TYPE(Array);
}

View File

@ -0,0 +1,11 @@
#include <Dictionaries/RangeHashedDictionaryGetItemsImpl.txx>
namespace DB
{
INSTANTIATE_GET_ITEMS_IMPL_FOR_ATTRIBUTE_TYPE(UInt8);
INSTANTIATE_GET_ITEMS_IMPL_FOR_ATTRIBUTE_TYPE(UInt16);
INSTANTIATE_GET_ITEMS_IMPL_FOR_ATTRIBUTE_TYPE(UInt32);
INSTANTIATE_GET_ITEMS_IMPL_FOR_ATTRIBUTE_TYPE(UInt64);
INSTANTIATE_GET_ITEMS_IMPL_FOR_ATTRIBUTE_TYPE(UInt128);
INSTANTIATE_GET_ITEMS_IMPL_FOR_ATTRIBUTE_TYPE(UInt256);
}

View File

@ -0,0 +1,132 @@
#include <Dictionaries/RangeHashedDictionary.h>
#define INSTANTIATE_GET_ITEMS_SHORT_CIRCUIT_IMPL(DictionaryKeyType, IsNullable, ValueType) \
template size_t RangeHashedDictionary<DictionaryKeyType>::getItemsShortCircuitImpl<ValueType, IsNullable>( \
const Attribute & attribute, \
const Columns & key_columns, \
typename RangeHashedDictionary<DictionaryKeyType>::ValueSetterFunc<ValueType> && set_value, \
IColumn::Filter & default_mask) const;
#define INSTANTIATE_GET_ITEMS_SHORT_CIRCUIT_IMPL_FOR_ATTRIBUTE_TYPE(AttributeType) \
INSTANTIATE_GET_ITEMS_SHORT_CIRCUIT_IMPL(DictionaryKeyType::Simple, true, DictionaryValueType<AttributeType>) \
INSTANTIATE_GET_ITEMS_SHORT_CIRCUIT_IMPL(DictionaryKeyType::Simple, false, DictionaryValueType<AttributeType>) \
INSTANTIATE_GET_ITEMS_SHORT_CIRCUIT_IMPL(DictionaryKeyType::Complex, true, DictionaryValueType<AttributeType>) \
INSTANTIATE_GET_ITEMS_SHORT_CIRCUIT_IMPL(DictionaryKeyType::Complex, false, DictionaryValueType<AttributeType>)
namespace DB
{
template <DictionaryKeyType dictionary_key_type>
template <typename ValueType, bool is_nullable>
size_t RangeHashedDictionary<dictionary_key_type>::getItemsShortCircuitImpl(
const Attribute & attribute,
const Columns & key_columns,
typename RangeHashedDictionary<dictionary_key_type>::ValueSetterFunc<ValueType> && set_value,
IColumn::Filter & default_mask) const
{
const auto & attribute_container = std::get<AttributeContainerType<ValueType>>(attribute.container);
size_t keys_found = 0;
const ColumnPtr & range_column = key_columns.back();
auto key_columns_copy = key_columns;
key_columns_copy.pop_back();
DictionaryKeysArenaHolder<dictionary_key_type> arena_holder;
DictionaryKeysExtractor<dictionary_key_type> keys_extractor(key_columns_copy, arena_holder.getComplexKeyArena());
const size_t keys_size = keys_extractor.getKeysSize();
default_mask.resize(keys_size);
callOnRangeType(
dict_struct.range_min->type,
[&](const auto & types)
{
using Types = std::decay_t<decltype(types)>;
using RangeColumnType = typename Types::LeftType;
using RangeStorageType = typename RangeColumnType::ValueType;
using RangeInterval = Interval<RangeStorageType>;
const auto * range_column_typed = typeid_cast<const RangeColumnType *>(range_column.get());
if (!range_column_typed)
throw Exception(
ErrorCodes::TYPE_MISMATCH,
"Dictionary {} range column type should be equal to {}",
getFullName(),
dict_struct.range_min->type->getName());
const auto & range_column_data = range_column_typed->getData();
const auto & key_attribute_container = std::get<KeyAttributeContainerType<RangeStorageType>>(key_attribute.container);
for (size_t key_index = 0; key_index < keys_size; ++key_index)
{
auto key = keys_extractor.extractCurrentKey();
const auto it = key_attribute_container.find(key);
if (it)
{
const auto date = range_column_data[key_index];
const auto & interval_tree = it->getMapped();
size_t value_index = 0;
std::optional<RangeInterval> range;
interval_tree.find(
date,
[&](auto & interval, auto & interval_value_index)
{
if (range)
{
if (likely(configuration.lookup_strategy == RangeHashedDictionaryLookupStrategy::min) && interval < *range)
{
range = interval;
value_index = interval_value_index;
}
else if (configuration.lookup_strategy == RangeHashedDictionaryLookupStrategy::max && interval > *range)
{
range = interval;
value_index = interval_value_index;
}
}
else
{
range = interval;
value_index = interval_value_index;
}
return true;
});
if (range.has_value())
{
default_mask[key_index] = 0;
++keys_found;
ValueType value = attribute_container[value_index];
if constexpr (is_nullable)
{
bool is_null = (*attribute.is_value_nullable)[value_index];
set_value(key_index, value, is_null);
}
else
{
set_value(key_index, value, false);
}
keys_extractor.rollbackCurrentKey();
continue;
}
}
default_mask[key_index] = 1;
keys_extractor.rollbackCurrentKey();
}
});
query_count.fetch_add(keys_size, std::memory_order_relaxed);
found_count.fetch_add(keys_found, std::memory_order_relaxed);
return keys_found;
}
}

View File

@ -0,0 +1,10 @@
#include <Dictionaries/RangeHashedDictionaryGetItemsShortCircuitImpl.txx>
namespace DB
{
INSTANTIATE_GET_ITEMS_SHORT_CIRCUIT_IMPL_FOR_ATTRIBUTE_TYPE(Decimal32);
INSTANTIATE_GET_ITEMS_SHORT_CIRCUIT_IMPL_FOR_ATTRIBUTE_TYPE(Decimal64);
INSTANTIATE_GET_ITEMS_SHORT_CIRCUIT_IMPL_FOR_ATTRIBUTE_TYPE(Decimal128);
INSTANTIATE_GET_ITEMS_SHORT_CIRCUIT_IMPL_FOR_ATTRIBUTE_TYPE(Decimal256);
INSTANTIATE_GET_ITEMS_SHORT_CIRCUIT_IMPL_FOR_ATTRIBUTE_TYPE(DateTime64);
}

View File

@ -0,0 +1,7 @@
#include <Dictionaries/RangeHashedDictionaryGetItemsShortCircuitImpl.txx>
namespace DB
{
INSTANTIATE_GET_ITEMS_SHORT_CIRCUIT_IMPL_FOR_ATTRIBUTE_TYPE(Float32);
INSTANTIATE_GET_ITEMS_SHORT_CIRCUIT_IMPL_FOR_ATTRIBUTE_TYPE(Float64);
}

View File

@ -0,0 +1,11 @@
#include <Dictionaries/RangeHashedDictionaryGetItemsShortCircuitImpl.txx>
namespace DB
{
INSTANTIATE_GET_ITEMS_SHORT_CIRCUIT_IMPL_FOR_ATTRIBUTE_TYPE(Int8);
INSTANTIATE_GET_ITEMS_SHORT_CIRCUIT_IMPL_FOR_ATTRIBUTE_TYPE(Int16);
INSTANTIATE_GET_ITEMS_SHORT_CIRCUIT_IMPL_FOR_ATTRIBUTE_TYPE(Int32);
INSTANTIATE_GET_ITEMS_SHORT_CIRCUIT_IMPL_FOR_ATTRIBUTE_TYPE(Int64);
INSTANTIATE_GET_ITEMS_SHORT_CIRCUIT_IMPL_FOR_ATTRIBUTE_TYPE(Int128);
INSTANTIATE_GET_ITEMS_SHORT_CIRCUIT_IMPL_FOR_ATTRIBUTE_TYPE(Int256);
}

View File

@ -0,0 +1,10 @@
#include <Dictionaries/RangeHashedDictionaryGetItemsShortCircuitImpl.txx>
namespace DB
{
INSTANTIATE_GET_ITEMS_SHORT_CIRCUIT_IMPL_FOR_ATTRIBUTE_TYPE(UUID);
INSTANTIATE_GET_ITEMS_SHORT_CIRCUIT_IMPL_FOR_ATTRIBUTE_TYPE(IPv4);
INSTANTIATE_GET_ITEMS_SHORT_CIRCUIT_IMPL_FOR_ATTRIBUTE_TYPE(IPv6);
INSTANTIATE_GET_ITEMS_SHORT_CIRCUIT_IMPL_FOR_ATTRIBUTE_TYPE(String);
INSTANTIATE_GET_ITEMS_SHORT_CIRCUIT_IMPL_FOR_ATTRIBUTE_TYPE(Array);
}

View File

@ -0,0 +1,11 @@
#include <Dictionaries/RangeHashedDictionaryGetItemsShortCircuitImpl.txx>
namespace DB
{
INSTANTIATE_GET_ITEMS_SHORT_CIRCUIT_IMPL_FOR_ATTRIBUTE_TYPE(UInt8);
INSTANTIATE_GET_ITEMS_SHORT_CIRCUIT_IMPL_FOR_ATTRIBUTE_TYPE(UInt16);
INSTANTIATE_GET_ITEMS_SHORT_CIRCUIT_IMPL_FOR_ATTRIBUTE_TYPE(UInt32);
INSTANTIATE_GET_ITEMS_SHORT_CIRCUIT_IMPL_FOR_ATTRIBUTE_TYPE(UInt64);
INSTANTIATE_GET_ITEMS_SHORT_CIRCUIT_IMPL_FOR_ATTRIBUTE_TYPE(UInt128);
INSTANTIATE_GET_ITEMS_SHORT_CIRCUIT_IMPL_FOR_ATTRIBUTE_TYPE(UInt256);
}

Some files were not shown because too many files have changed in this diff Show More