mirror of
https://github.com/ClickHouse/ClickHouse.git
synced 2024-09-20 08:40:50 +00:00
Merge branch 'ClickHouse:master' into inline_poaarray_details
This commit is contained in:
commit
32458c8747
@ -6,6 +6,11 @@ sidebar_label: JDBC
|
||||
|
||||
# JDBC
|
||||
|
||||
:::note
|
||||
clickhouse-jdbc-bridge contains experimental codes and is no longer supported. It may contain reliability issues and security vulnerabilities. Use it at your own risk.
|
||||
ClickHouse recommend using built-in table functions in ClickHouse which provide a better alternative for ad-hoc querying scenarios (Postgres, MySQL, MongoDB, etc).
|
||||
:::
|
||||
|
||||
Allows ClickHouse to connect to external databases via [JDBC](https://en.wikipedia.org/wiki/Java_Database_Connectivity).
|
||||
|
||||
To implement the JDBC connection, ClickHouse uses the separate program [clickhouse-jdbc-bridge](https://github.com/ClickHouse/clickhouse-jdbc-bridge) that should run as a daemon.
|
||||
|
@ -21,3 +21,79 @@ When restarting a server, data disappears from the table and the table becomes e
|
||||
Normally, using this table engine is not justified. However, it can be used for tests, and for tasks where maximum speed is required on a relatively small number of rows (up to approximately 100,000,000).
|
||||
|
||||
The Memory engine is used by the system for temporary tables with external query data (see the section “External data for processing a query”), and for implementing `GLOBAL IN` (see the section “IN operators”).
|
||||
|
||||
Upper and lower bounds can be specified to limit Memory engine table size, effectively allowing it to act as a circular buffer (see [Engine Parameters](#engine-parameters)).
|
||||
|
||||
## Engine Parameters {#engine-parameters}
|
||||
|
||||
- `min_bytes_to_keep` — Minimum bytes to keep when memory table is size-capped.
|
||||
- Default value: `0`
|
||||
- Requires `max_bytes_to_keep`
|
||||
- `max_bytes_to_keep` — Maximum bytes to keep within memory table where oldest rows are deleted on each insertion (i.e circular buffer). Max bytes can exceed the stated limit if the oldest batch of rows to remove falls under the `min_bytes_to_keep` limit when adding a large block.
|
||||
- Default value: `0`
|
||||
- `min_rows_to_keep` — Minimum rows to keep when memory table is size-capped.
|
||||
- Default value: `0`
|
||||
- Requires `max_rows_to_keep`
|
||||
- `max_rows_to_keep` — Maximum rows to keep within memory table where oldest rows are deleted on each insertion (i.e circular buffer). Max rows can exceed the stated limit if the oldest batch of rows to remove falls under the `min_rows_to_keep` limit when adding a large block.
|
||||
- Default value: `0`
|
||||
|
||||
## Usage {#usage}
|
||||
|
||||
|
||||
**Initialize settings**
|
||||
``` sql
|
||||
CREATE TABLE memory (i UInt32) ENGINE = Memory SETTINGS min_rows_to_keep = 100, max_rows_to_keep = 1000;
|
||||
```
|
||||
|
||||
**Note:** Both `bytes` and `rows` capping parameters can be set at the same time, however, the lower bounds of `max` and `min` will be adhered to.
|
||||
|
||||
## Examples {#examples}
|
||||
``` sql
|
||||
CREATE TABLE memory (i UInt32) ENGINE = Memory SETTINGS min_bytes_to_keep = 4096, max_bytes_to_keep = 16384;
|
||||
|
||||
/* 1. testing oldest block doesn't get deleted due to min-threshold - 3000 rows */
|
||||
INSERT INTO memory SELECT * FROM numbers(0, 1600); -- 8'192 bytes
|
||||
|
||||
/* 2. adding block that doesn't get deleted */
|
||||
INSERT INTO memory SELECT * FROM numbers(1000, 100); -- 1'024 bytes
|
||||
|
||||
/* 3. testing oldest block gets deleted - 9216 bytes - 1100 */
|
||||
INSERT INTO memory SELECT * FROM numbers(9000, 1000); -- 8'192 bytes
|
||||
|
||||
/* 4. checking a very large block overrides all */
|
||||
INSERT INTO memory SELECT * FROM numbers(9000, 10000); -- 65'536 bytes
|
||||
|
||||
SELECT total_bytes, total_rows FROM system.tables WHERE name = 'memory' and database = currentDatabase();
|
||||
```
|
||||
|
||||
``` text
|
||||
┌─total_bytes─┬─total_rows─┐
|
||||
│ 65536 │ 10000 │
|
||||
└─────────────┴────────────┘
|
||||
```
|
||||
|
||||
also, for rows:
|
||||
|
||||
``` sql
|
||||
CREATE TABLE memory (i UInt32) ENGINE = Memory SETTINGS min_rows_to_keep = 4000, max_rows_to_keep = 10000;
|
||||
|
||||
/* 1. testing oldest block doesn't get deleted due to min-threshold - 3000 rows */
|
||||
INSERT INTO memory SELECT * FROM numbers(0, 1600); -- 1'600 rows
|
||||
|
||||
/* 2. adding block that doesn't get deleted */
|
||||
INSERT INTO memory SELECT * FROM numbers(1000, 100); -- 100 rows
|
||||
|
||||
/* 3. testing oldest block gets deleted - 9216 bytes - 1100 */
|
||||
INSERT INTO memory SELECT * FROM numbers(9000, 1000); -- 1'000 rows
|
||||
|
||||
/* 4. checking a very large block overrides all */
|
||||
INSERT INTO memory SELECT * FROM numbers(9000, 10000); -- 10'000 rows
|
||||
|
||||
SELECT total_bytes, total_rows FROM system.tables WHERE name = 'memory' and database = currentDatabase();
|
||||
```
|
||||
|
||||
``` text
|
||||
┌─total_bytes─┬─total_rows─┐
|
||||
│ 65536 │ 10000 │
|
||||
└─────────────┴────────────┘
|
||||
```
|
||||
|
@ -95,9 +95,11 @@ which is equal to
|
||||
|
||||
## Substituting Configuration {#substitution}
|
||||
|
||||
The config can also define “substitutions”. If an element has the `incl` attribute, the corresponding substitution from the file will be used as the value. By default, the path to the file with substitutions is `/etc/metrika.xml`. This can be changed in the [include_from](../operations/server-configuration-parameters/settings.md#server_configuration_parameters-include_from) element in the server config. The substitution values are specified in `/clickhouse/substitution_name` elements in this file. If a substitution specified in `incl` does not exist, it is recorded in the log. To prevent ClickHouse from logging missing substitutions, specify the `optional="true"` attribute (for example, settings for [macros](../operations/server-configuration-parameters/settings.md#macros)).
|
||||
The config can define substitutions. There are two types of substitutions:
|
||||
|
||||
If you want to replace an entire element with a substitution use `include` as the element name.
|
||||
- If an element has the `incl` attribute, the corresponding substitution from the file will be used as the value. By default, the path to the file with substitutions is `/etc/metrika.xml`. This can be changed in the [include_from](../operations/server-configuration-parameters/settings.md#server_configuration_parameters-include_from) element in the server config. The substitution values are specified in `/clickhouse/substitution_name` elements in this file. If a substitution specified in `incl` does not exist, it is recorded in the log. To prevent ClickHouse from logging missing substitutions, specify the `optional="true"` attribute (for example, settings for [macros](../operations/server-configuration-parameters/settings.md#macros)).
|
||||
|
||||
- If you want to replace an entire element with a substitution, use `include` as the element name. Substitutions can also be performed from ZooKeeper by specifying attribute `from_zk = "/path/to/node"`. In this case, the element value is replaced with the contents of the Zookeeper node at `/path/to/node`. This also works with you store an entire XML subtree as a Zookeeper node, it will be fully inserted into the source element.
|
||||
|
||||
XML substitution example:
|
||||
|
||||
@ -114,7 +116,7 @@ XML substitution example:
|
||||
</clickhouse>
|
||||
```
|
||||
|
||||
Substitutions can also be performed from ZooKeeper. To do this, specify the attribute `from_zk = "/path/to/node"`. The element value is replaced with the contents of the node at `/path/to/node` in ZooKeeper. You can also put an entire XML subtree on the ZooKeeper node, and it will be fully inserted into the source element.
|
||||
If you want to merge the substituting content with the existing configuration instead of appending you can use attribute `merge="true"`, for example: `<include from_zk="/some_path" merge="true">`. In this case, the existing configuration will be merged with the content from the substitution and the existing configuration settings will be replaced with values from substitution.
|
||||
|
||||
## Encrypting and Hiding Configuration {#encryption}
|
||||
|
||||
|
@ -933,9 +933,9 @@ Hard limit is configured via system tools
|
||||
|
||||
## database_atomic_delay_before_drop_table_sec {#database_atomic_delay_before_drop_table_sec}
|
||||
|
||||
Sets the delay before remove table data in seconds. If the query has `SYNC` modifier, this setting is ignored.
|
||||
The delay before a table data is dropped in seconds. If the `DROP TABLE` query has a `SYNC` modifier, this setting is ignored.
|
||||
|
||||
Default value: `480` (8 minute).
|
||||
Default value: `480` (8 minutes).
|
||||
|
||||
## database_catalog_unused_dir_hide_timeout_sec {#database_catalog_unused_dir_hide_timeout_sec}
|
||||
|
||||
|
@ -4337,6 +4337,18 @@ Possible values:
|
||||
|
||||
Default value: `0`.
|
||||
|
||||
|
||||
## function_locate_has_mysql_compatible_argument_order {#function-locate-has-mysql-compatible-argument-order}
|
||||
|
||||
Controls the order of arguments in function [locate](../../sql-reference/functions/string-search-functions.md#locate).
|
||||
|
||||
Possible values:
|
||||
|
||||
- 0 — Function `locate` accepts arguments `(haystack, needle[, start_pos])`.
|
||||
- 1 — Function `locate` accepts arguments `(needle, haystack, [, start_pos])` (MySQL-compatible behavior)
|
||||
|
||||
Default value: `1`.
|
||||
|
||||
## date_time_overflow_behavior {#date_time_overflow_behavior}
|
||||
|
||||
Defines the behavior when [Date](../../sql-reference/data-types/date.md), [Date32](../../sql-reference/data-types/date32.md), [DateTime](../../sql-reference/data-types/datetime.md), [DateTime64](../../sql-reference/data-types/datetime64.md) or integers are converted into Date, Date32, DateTime or DateTime64 but the value cannot be represented in the result type.
|
||||
|
@ -30,7 +30,6 @@ position(haystack, needle[, start_pos])
|
||||
|
||||
Alias:
|
||||
- `position(needle IN haystack)`
|
||||
- `locate(haystack, needle[, start_pos])`.
|
||||
|
||||
**Arguments**
|
||||
|
||||
@ -49,7 +48,7 @@ If substring `needle` is empty, these rules apply:
|
||||
- if `start_pos >= 1` and `start_pos <= length(haystack) + 1`: return `start_pos`
|
||||
- otherwise: return `0`
|
||||
|
||||
The same rules also apply to functions `positionCaseInsensitive`, `positionUTF8` and `positionCaseInsensitiveUTF8`
|
||||
The same rules also apply to functions `locate`, `positionCaseInsensitive`, `positionUTF8` and `positionCaseInsensitiveUTF8`.
|
||||
|
||||
Type: `Integer`.
|
||||
|
||||
@ -114,6 +113,21 @@ SELECT
|
||||
└─────────────────────┴────────────────────────┴────────────────────────┴────────────────────────┴────────────────────────┴────────────────────────┴────────────────────────┘
|
||||
```
|
||||
|
||||
## locate
|
||||
|
||||
Like [position](#position) but with arguments `haystack` and `locate` switched.
|
||||
|
||||
The behavior of this function depends on the ClickHouse version:
|
||||
- in versions < v24.3, `locate` was an alias of function `position` and accepted arguments `(haystack, needle[, start_pos])`.
|
||||
- in versions >= 24.3,, `locate` is an individual function (for better compatibility with MySQL) and accepts arguments `(needle, haystack[, start_pos])`. The previous behavior
|
||||
can be restored using setting [function_locate_has_mysql_compatible_argument_order = false](../../operations/settings/settings.md#function-locate-has-mysql-compatible-argument-order);
|
||||
|
||||
**Syntax**
|
||||
|
||||
``` sql
|
||||
locate(needle, haystack[, start_pos])
|
||||
```
|
||||
|
||||
## positionCaseInsensitive
|
||||
|
||||
Like [position](#position) but searches case-insensitively.
|
||||
|
@ -13,13 +13,6 @@ a system table called `system.dropped_tables`.
|
||||
|
||||
If you have a materialized view without a `TO` clause associated with the dropped table, then you will also have to UNDROP the inner table of that view.
|
||||
|
||||
:::note
|
||||
UNDROP TABLE is experimental. To use it add this setting:
|
||||
```sql
|
||||
set allow_experimental_undrop_table_query = 1;
|
||||
```
|
||||
:::
|
||||
|
||||
:::tip
|
||||
Also see [DROP TABLE](/docs/en/sql-reference/statements/drop.md)
|
||||
:::
|
||||
@ -32,60 +25,53 @@ UNDROP TABLE [db.]name [UUID '<uuid>'] [ON CLUSTER cluster]
|
||||
|
||||
**Example**
|
||||
|
||||
``` sql
|
||||
set allow_experimental_undrop_table_query = 1;
|
||||
```
|
||||
|
||||
```sql
|
||||
CREATE TABLE undropMe
|
||||
CREATE TABLE tab
|
||||
(
|
||||
`id` UInt8
|
||||
)
|
||||
ENGINE = MergeTree
|
||||
ORDER BY id
|
||||
```
|
||||
ORDER BY id;
|
||||
|
||||
DROP TABLE tab;
|
||||
|
||||
```sql
|
||||
DROP TABLE undropMe
|
||||
```
|
||||
```sql
|
||||
SELECT *
|
||||
FROM system.dropped_tables
|
||||
FORMAT Vertical
|
||||
FORMAT Vertical;
|
||||
```
|
||||
|
||||
```response
|
||||
Row 1:
|
||||
──────
|
||||
index: 0
|
||||
database: default
|
||||
table: undropMe
|
||||
table: tab
|
||||
uuid: aa696a1a-1d70-4e60-a841-4c80827706cc
|
||||
engine: MergeTree
|
||||
metadata_dropped_path: /var/lib/clickhouse/metadata_dropped/default.undropMe.aa696a1a-1d70-4e60-a841-4c80827706cc.sql
|
||||
metadata_dropped_path: /var/lib/clickhouse/metadata_dropped/default.tab.aa696a1a-1d70-4e60-a841-4c80827706cc.sql
|
||||
table_dropped_time: 2023-04-05 14:12:12
|
||||
|
||||
1 row in set. Elapsed: 0.001 sec.
|
||||
```
|
||||
|
||||
```sql
|
||||
UNDROP TABLE undropMe
|
||||
```
|
||||
```response
|
||||
Ok.
|
||||
```
|
||||
```sql
|
||||
UNDROP TABLE tab;
|
||||
|
||||
SELECT *
|
||||
FROM system.dropped_tables
|
||||
FORMAT Vertical
|
||||
```
|
||||
FORMAT Vertical;
|
||||
|
||||
```response
|
||||
Ok.
|
||||
|
||||
0 rows in set. Elapsed: 0.001 sec.
|
||||
```
|
||||
|
||||
```sql
|
||||
DESCRIBE TABLE undropMe
|
||||
FORMAT Vertical
|
||||
DESCRIBE TABLE tab
|
||||
FORMAT Vertical;
|
||||
```
|
||||
|
||||
```response
|
||||
Row 1:
|
||||
──────
|
||||
|
@ -6,6 +6,11 @@ sidebar_label: jdbc
|
||||
|
||||
# jdbc
|
||||
|
||||
:::note
|
||||
clickhouse-jdbc-bridge contains experimental codes and is no longer supported. It may contain reliability issues and security vulnerabilities. Use it at your own risk.
|
||||
ClickHouse recommend using built-in table functions in ClickHouse which provide a better alternative for ad-hoc querying scenarios (Postgres, MySQL, MongoDB, etc).
|
||||
:::
|
||||
|
||||
`jdbc(datasource, schema, table)` - returns table that is connected via JDBC driver.
|
||||
|
||||
This table function requires separate [clickhouse-jdbc-bridge](https://github.com/ClickHouse/clickhouse-jdbc-bridge) program to be running.
|
||||
|
@ -733,8 +733,6 @@ try
|
||||
LOG_INFO(log, "Available CPU instruction sets: {}", cpu_info);
|
||||
#endif
|
||||
|
||||
sanityChecks(*this);
|
||||
|
||||
// Initialize global thread pool. Do it before we fetch configs from zookeeper
|
||||
// nodes (`from_zk`), because ZooKeeper interface uses the pool. We will
|
||||
// ignore `max_thread_pool_size` in configs we fetch from ZK, but oh well.
|
||||
@ -904,6 +902,7 @@ try
|
||||
config_processor.savePreprocessedConfig(loaded_config, config().getString("path", DBMS_DEFAULT_PATH));
|
||||
config().removeConfiguration(old_configuration.get());
|
||||
config().add(loaded_config.configuration.duplicate(), PRIO_DEFAULT, false);
|
||||
global_context->setConfig(loaded_config.configuration);
|
||||
}
|
||||
|
||||
Settings::checkNoSettingNamesAtTopLevel(config(), config_path);
|
||||
@ -911,6 +910,9 @@ try
|
||||
/// We need to reload server settings because config could be updated via zookeeper.
|
||||
server_settings.loadSettingsFromConfig(config());
|
||||
|
||||
/// NOTE: Do sanity checks after we loaded all possible substitutions (for the configuration) from ZK
|
||||
sanityChecks(*this);
|
||||
|
||||
#if defined(OS_LINUX)
|
||||
std::string executable_path = getExecutablePath();
|
||||
|
||||
|
@ -39,7 +39,7 @@ void logAboutProgress(LoggerPtr log, size_t processed, size_t total, AtomicStopw
|
||||
{
|
||||
if (total && (processed % PRINT_MESSAGE_EACH_N_OBJECTS == 0 || watch.compareAndRestart(PRINT_MESSAGE_EACH_N_SECONDS)))
|
||||
{
|
||||
LOG_INFO(log, "Processed: {}%", static_cast<Int64>(processed * 1000.0 / total) * 0.1);
|
||||
LOG_INFO(log, "Processed: {:.1f}%", static_cast<double>(processed) * 100.0 / total);
|
||||
watch.restart();
|
||||
}
|
||||
}
|
||||
|
@ -427,6 +427,8 @@ void ConfigProcessor::doIncludesRecursive(
|
||||
|
||||
/// Replace the original contents, not add to it.
|
||||
bool replace = attributes->getNamedItem("replace");
|
||||
/// Merge with the original contents
|
||||
bool merge = attributes->getNamedItem("merge");
|
||||
|
||||
bool included_something = false;
|
||||
|
||||
@ -450,7 +452,6 @@ void ConfigProcessor::doIncludesRecursive(
|
||||
}
|
||||
else
|
||||
{
|
||||
/// Replace the whole node not just contents.
|
||||
if (node->nodeName() == "include")
|
||||
{
|
||||
const NodeListPtr children = node_to_include->childNodes();
|
||||
@ -458,8 +459,18 @@ void ConfigProcessor::doIncludesRecursive(
|
||||
for (Node * child = children->item(0); child; child = next_child)
|
||||
{
|
||||
next_child = child->nextSibling();
|
||||
NodePtr new_node = config->importNode(child, true);
|
||||
node->parentNode()->insertBefore(new_node, node);
|
||||
|
||||
/// Recursively replace existing nodes in merge mode
|
||||
if (merge)
|
||||
{
|
||||
NodePtr new_node = config->importNode(child->parentNode(), true);
|
||||
mergeRecursive(config, node->parentNode(), new_node);
|
||||
}
|
||||
else /// Append to existing node by default
|
||||
{
|
||||
NodePtr new_node = config->importNode(child, true);
|
||||
node->parentNode()->insertBefore(new_node, node);
|
||||
}
|
||||
}
|
||||
|
||||
node->parentNode()->removeChild(node);
|
||||
@ -777,9 +788,9 @@ ConfigProcessor::LoadedConfig ConfigProcessor::loadConfig(bool allow_zk_includes
|
||||
}
|
||||
|
||||
ConfigProcessor::LoadedConfig ConfigProcessor::loadConfigWithZooKeeperIncludes(
|
||||
zkutil::ZooKeeperNodeCache & zk_node_cache,
|
||||
const zkutil::EventPtr & zk_changed_event,
|
||||
bool fallback_to_preprocessed)
|
||||
zkutil::ZooKeeperNodeCache & zk_node_cache,
|
||||
const zkutil::EventPtr & zk_changed_event,
|
||||
bool fallback_to_preprocessed)
|
||||
{
|
||||
XMLDocumentPtr config_xml;
|
||||
bool has_zk_includes;
|
||||
|
@ -25,6 +25,18 @@ inline bool isFinite(T x)
|
||||
return true;
|
||||
}
|
||||
|
||||
template <typename T>
|
||||
bool canConvertTo(Float64 x)
|
||||
{
|
||||
if constexpr (std::is_floating_point_v<T>)
|
||||
return true;
|
||||
if (!isFinite(x))
|
||||
return false;
|
||||
if (x > Float64(std::numeric_limits<T>::max()) || x < Float64(std::numeric_limits<T>::lowest()))
|
||||
return false;
|
||||
|
||||
return true;
|
||||
}
|
||||
|
||||
template <typename T>
|
||||
T NaNOrZero()
|
||||
|
@ -41,13 +41,13 @@ BaseSettingsHelpers::Flags BaseSettingsHelpers::readFlags(ReadBuffer & in)
|
||||
|
||||
void BaseSettingsHelpers::throwSettingNotFound(std::string_view name)
|
||||
{
|
||||
throw Exception(ErrorCodes::UNKNOWN_SETTING, "Unknown setting {}", String{name});
|
||||
throw Exception(ErrorCodes::UNKNOWN_SETTING, "Unknown setting '{}'", String{name});
|
||||
}
|
||||
|
||||
|
||||
void BaseSettingsHelpers::warningSettingNotFound(std::string_view name)
|
||||
{
|
||||
LOG_WARNING(getLogger("Settings"), "Unknown setting {}, skipping", name);
|
||||
LOG_WARNING(getLogger("Settings"), "Unknown setting '{}', skipping", name);
|
||||
}
|
||||
|
||||
}
|
||||
|
@ -175,6 +175,7 @@ class IColumn;
|
||||
M(Bool, enable_positional_arguments, true, "Enable positional arguments in ORDER BY, GROUP BY and LIMIT BY", 0) \
|
||||
M(Bool, enable_extended_results_for_datetime_functions, false, "Enable date functions like toLastDayOfMonth return Date32 results (instead of Date results) for Date32/DateTime64 arguments.", 0) \
|
||||
M(Bool, allow_nonconst_timezone_arguments, false, "Allow non-const timezone arguments in certain time-related functions like toTimeZone(), fromUnixTimestamp*(), snowflakeToDateTime*()", 0) \
|
||||
M(Bool, function_locate_has_mysql_compatible_argument_order, true, "Function locate() has arguments (needle, haystack[, start_pos]) like in MySQL instead of (haystack, needle[, start_pos]) like function position()", 0) \
|
||||
\
|
||||
M(Bool, group_by_use_nulls, false, "Treat columns mentioned in ROLLUP, CUBE or GROUPING SETS as Nullable", 0) \
|
||||
\
|
||||
|
@ -94,6 +94,7 @@ static std::map<ClickHouseVersion, SettingsChangesHistory::SettingsChanges> sett
|
||||
{"input_format_json_use_string_type_for_ambiguous_paths_in_named_tuples_inference_from_objects", false, false, "Allow to use String type for ambiguous paths during named tuple inference from JSON objects"},
|
||||
{"throw_if_deduplication_in_dependent_materialized_views_enabled_with_async_insert", false, true, "Deduplication is dependent materialized view cannot work together with async inserts."},
|
||||
{"parallel_replicas_allow_in_with_subquery", false, true, "If true, subquery for IN will be executed on every follower replica"},
|
||||
{"function_locate_has_mysql_compatible_argument_order", false, true, "Increase compatibility with MySQL's locate function."},
|
||||
{"filesystem_cache_reserve_space_wait_lock_timeout_milliseconds", 1000, 1000, "Wait time to lock cache for sapce reservation in filesystem cache"},
|
||||
{"max_parser_backtracks", 0, 1000000, "Limiting the complexity of parsing"},
|
||||
}},
|
||||
|
@ -440,10 +440,22 @@ void DatabaseOrdinary::stopLoading()
|
||||
|
||||
DatabaseTablesIteratorPtr DatabaseOrdinary::getTablesIterator(ContextPtr local_context, const DatabaseOnDisk::FilterByNameFunction & filter_by_table_name) const
|
||||
{
|
||||
auto result = DatabaseWithOwnTablesBase::getTablesIterator(local_context, filter_by_table_name);
|
||||
std::scoped_lock lock(mutex);
|
||||
typeid_cast<DatabaseTablesSnapshotIterator &>(*result).setLoadTasks(startup_table);
|
||||
return result;
|
||||
// Wait for every table (matching the filter) to be loaded and started up before we make the snapshot.
|
||||
// It is important, because otherwise table might be:
|
||||
// - not attached and thus will be missed in the snapshot;
|
||||
// - not started, which is not good for DDL operations.
|
||||
LoadTaskPtrs tasks_to_wait;
|
||||
{
|
||||
std::lock_guard lock(mutex);
|
||||
if (!filter_by_table_name)
|
||||
tasks_to_wait.reserve(startup_table.size());
|
||||
for (const auto & [table_name, task] : startup_table)
|
||||
if (!filter_by_table_name || filter_by_table_name(table_name))
|
||||
tasks_to_wait.emplace_back(task);
|
||||
}
|
||||
waitLoad(currentPoolOr(TablesLoaderForegroundPoolId), tasks_to_wait);
|
||||
|
||||
return DatabaseWithOwnTablesBase::getTablesIterator(local_context, filter_by_table_name);
|
||||
}
|
||||
|
||||
void DatabaseOrdinary::alterTable(ContextPtr local_context, const StorageID & table_id, const StorageInMemoryMetadata & metadata)
|
||||
|
@ -77,17 +77,12 @@ private:
|
||||
Tables tables;
|
||||
Tables::iterator it;
|
||||
|
||||
// Tasks to wait before returning a table
|
||||
using Tasks = std::unordered_map<String, LoadTaskPtr>;
|
||||
Tasks tasks;
|
||||
|
||||
protected:
|
||||
DatabaseTablesSnapshotIterator(DatabaseTablesSnapshotIterator && other) noexcept
|
||||
: IDatabaseTablesIterator(std::move(other.database_name))
|
||||
{
|
||||
size_t idx = std::distance(other.tables.begin(), other.it);
|
||||
std::swap(tables, other.tables);
|
||||
std::swap(tasks, other.tasks);
|
||||
other.it = other.tables.end();
|
||||
it = tables.begin();
|
||||
std::advance(it, idx);
|
||||
@ -110,17 +105,7 @@ public:
|
||||
|
||||
const String & name() const override { return it->first; }
|
||||
|
||||
const StoragePtr & table() const override
|
||||
{
|
||||
if (auto task = tasks.find(it->first); task != tasks.end())
|
||||
waitLoad(currentPoolOr(TablesLoaderForegroundPoolId), task->second);
|
||||
return it->second;
|
||||
}
|
||||
|
||||
void setLoadTasks(const Tasks & tasks_)
|
||||
{
|
||||
tasks = tasks_;
|
||||
}
|
||||
const StoragePtr & table() const override { return it->second; }
|
||||
};
|
||||
|
||||
using DatabaseTablesIteratorPtr = std::unique_ptr<IDatabaseTablesIterator>;
|
||||
|
@ -22,13 +22,13 @@ namespace DB
|
||||
* positionCaseInsensitive(haystack, needle)
|
||||
* positionCaseInsensitiveUTF8(haystack, needle)
|
||||
*
|
||||
* like(haystack, pattern) - search by the regular expression LIKE; Returns 0 or 1. Case-insensitive, but only for Latin.
|
||||
* notLike(haystack, pattern)
|
||||
* like(haystack, needle) - search by the regular expression LIKE; Returns 0 or 1. Case-insensitive, but only for Latin.
|
||||
* notLike(haystack, needle)
|
||||
*
|
||||
* ilike(haystack, pattern) - like 'like' but case-insensitive
|
||||
* notIlike(haystack, pattern)
|
||||
* ilike(haystack, needle) - like 'like' but case-insensitive
|
||||
* notIlike(haystack, needle)
|
||||
*
|
||||
* match(haystack, pattern) - search by regular expression re2; Returns 0 or 1.
|
||||
* match(haystack, needle) - search by regular expression re2; Returns 0 or 1.
|
||||
*
|
||||
* countSubstrings(haystack, needle) -- count number of occurrences of needle in haystack.
|
||||
* countSubstringsCaseInsensitive(haystack, needle)
|
||||
@ -53,7 +53,7 @@ namespace DB
|
||||
* - the first subpattern, if the regexp has a subpattern;
|
||||
* - the zero subpattern (the match part, otherwise);
|
||||
* - if not match - an empty string.
|
||||
* extract(haystack, pattern)
|
||||
* extract(haystack, needle)
|
||||
*/
|
||||
|
||||
namespace ErrorCodes
|
||||
@ -69,13 +69,39 @@ enum class ExecutionErrorPolicy
|
||||
Throw
|
||||
};
|
||||
|
||||
template <typename Impl, ExecutionErrorPolicy execution_error_policy = ExecutionErrorPolicy::Throw>
|
||||
enum class HaystackNeedleOrderIsConfigurable
|
||||
{
|
||||
No, /// function arguments are always: (haystack, needle[, position])
|
||||
Yes /// depending on a setting, the function arguments are (haystack, needle[, position]) or (needle, haystack[, position])
|
||||
};
|
||||
|
||||
template <typename Impl,
|
||||
ExecutionErrorPolicy execution_error_policy = ExecutionErrorPolicy::Throw,
|
||||
HaystackNeedleOrderIsConfigurable haystack_needle_order_is_configurable = HaystackNeedleOrderIsConfigurable::No>
|
||||
class FunctionsStringSearch : public IFunction
|
||||
{
|
||||
private:
|
||||
enum class ArgumentOrder
|
||||
{
|
||||
HaystackNeedle,
|
||||
NeedleHaystack
|
||||
};
|
||||
|
||||
ArgumentOrder argument_order = ArgumentOrder::HaystackNeedle;
|
||||
|
||||
public:
|
||||
static constexpr auto name = Impl::name;
|
||||
|
||||
static FunctionPtr create(ContextPtr) { return std::make_shared<FunctionsStringSearch>(); }
|
||||
static FunctionPtr create(ContextPtr context) { return std::make_shared<FunctionsStringSearch>(context); }
|
||||
|
||||
explicit FunctionsStringSearch([[maybe_unused]] ContextPtr context)
|
||||
{
|
||||
if constexpr (haystack_needle_order_is_configurable == HaystackNeedleOrderIsConfigurable::Yes)
|
||||
{
|
||||
if (context->getSettingsRef().function_locate_has_mysql_compatible_argument_order)
|
||||
argument_order = ArgumentOrder::NeedleHaystack;
|
||||
}
|
||||
}
|
||||
|
||||
String getName() const override { return name; }
|
||||
|
||||
@ -105,13 +131,16 @@ public:
|
||||
"Number of arguments for function {} doesn't match: passed {}, should be 2 or 3",
|
||||
getName(), arguments.size());
|
||||
|
||||
if (!isStringOrFixedString(arguments[0]))
|
||||
const auto & haystack_type = (argument_order == ArgumentOrder::HaystackNeedle) ? arguments[0] : arguments[1];
|
||||
const auto & needle_type = (argument_order == ArgumentOrder::HaystackNeedle) ? arguments[1] : arguments[0];
|
||||
|
||||
if (!isStringOrFixedString(haystack_type))
|
||||
throw Exception(
|
||||
ErrorCodes::ILLEGAL_TYPE_OF_ARGUMENT,
|
||||
"Illegal type {} of argument of function {}",
|
||||
arguments[0]->getName(), getName());
|
||||
|
||||
if (!isString(arguments[1]))
|
||||
if (!isString(needle_type))
|
||||
throw Exception(
|
||||
ErrorCodes::ILLEGAL_TYPE_OF_ARGUMENT,
|
||||
"Illegal type {} of argument of function {}",
|
||||
@ -135,8 +164,8 @@ public:
|
||||
|
||||
ColumnPtr executeImpl(const ColumnsWithTypeAndName & arguments, const DataTypePtr & result_type, size_t /*input_rows_count*/) const override
|
||||
{
|
||||
const ColumnPtr & column_haystack = arguments[0].column;
|
||||
const ColumnPtr & column_needle = arguments[1].column;
|
||||
const ColumnPtr & column_haystack = (argument_order == ArgumentOrder::HaystackNeedle) ? arguments[0].column : arguments[1].column;
|
||||
const ColumnPtr & column_needle = (argument_order == ArgumentOrder::HaystackNeedle) ? arguments[1].column : arguments[0].column;
|
||||
|
||||
ColumnPtr column_start_pos = nullptr;
|
||||
if (arguments.size() >= 3)
|
||||
|
@ -213,6 +213,7 @@ struct MapToSubcolumnAdapter : public MapAdapterBase<MapToSubcolumnAdapter<Name,
|
||||
class FunctionMapKeyLike : public IFunction
|
||||
{
|
||||
public:
|
||||
FunctionMapKeyLike() : impl(/*context*/ nullptr) {} /// nullptr because getting a context here is hard and FunctionLike doesn't need context
|
||||
String getName() const override { return "mapKeyLike"; }
|
||||
size_t getNumberOfArguments() const override { return 3; }
|
||||
bool isSuitableForShortCircuitArgumentsExecution(const DataTypesWithConstInfo & /*arguments*/) const override { return false; }
|
||||
|
@ -1,5 +1,5 @@
|
||||
#include "FunctionFactory.h"
|
||||
#include "like.h"
|
||||
#include "FunctionFactory.h"
|
||||
|
||||
|
||||
namespace DB
|
||||
|
34
src/Functions/locate.cpp
Normal file
34
src/Functions/locate.cpp
Normal file
@ -0,0 +1,34 @@
|
||||
#include "FunctionsStringSearch.h"
|
||||
#include "FunctionFactory.h"
|
||||
#include "PositionImpl.h"
|
||||
|
||||
|
||||
namespace DB
|
||||
{
|
||||
namespace
|
||||
{
|
||||
|
||||
struct NameLocate
|
||||
{
|
||||
static constexpr auto name = "locate";
|
||||
};
|
||||
|
||||
using FunctionLocate = FunctionsStringSearch<PositionImpl<NameLocate, PositionCaseSensitiveASCII>, ExecutionErrorPolicy::Throw, HaystackNeedleOrderIsConfigurable::Yes>;
|
||||
|
||||
}
|
||||
|
||||
REGISTER_FUNCTION(Locate)
|
||||
{
|
||||
FunctionDocumentation::Description doc_description = "Like function `position` but with arguments `haystack` and `locate` switched. The behavior of this function depends on the ClickHouse version: In versions < v24.3, `locate` was an alias of function `position` and accepted arguments `(haystack, needle[, start_pos])`. In versions >= 24.3,, `locate` is an individual function (for better compatibility with MySQL) and accepts arguments `(needle, haystack[, start_pos])`. The previous behaviorcan be restored using setting `function_locate_has_mysql_compatible_argument_order = false`.";
|
||||
FunctionDocumentation::Syntax doc_syntax = "location(needle, haystack[, start_pos])";
|
||||
FunctionDocumentation::Arguments doc_arguments = {{"needle", "Substring to be searched (String)"},
|
||||
{"haystack", "String in which the search is performed (String)."},
|
||||
{"start_pos", "Position (1-based) in `haystack` at which the search starts (UInt*)."}};
|
||||
FunctionDocumentation::ReturnedValue doc_returned_value = "Starting position in bytes and counting from 1, if the substring was found. 0, if the substring was not found.";
|
||||
FunctionDocumentation::Examples doc_examples = {{"Example", "SELECT locate('abcabc', 'ca');", "3"}};
|
||||
FunctionDocumentation::Categories doc_categories = {"String search"};
|
||||
|
||||
|
||||
factory.registerFunction<FunctionLocate>({doc_description, doc_syntax, doc_arguments, doc_returned_value, doc_examples, doc_categories}, FunctionFactory::CaseInsensitive);
|
||||
}
|
||||
}
|
@ -20,6 +20,5 @@ using FunctionPosition = FunctionsStringSearch<PositionImpl<NamePosition, Positi
|
||||
REGISTER_FUNCTION(Position)
|
||||
{
|
||||
factory.registerFunction<FunctionPosition>({}, FunctionFactory::CaseInsensitive);
|
||||
factory.registerAlias("locate", NamePosition::name, FunctionFactory::CaseInsensitive);
|
||||
}
|
||||
}
|
||||
|
@ -148,19 +148,25 @@ AsynchronousInsertQueue::InsertData::Entry::Entry(
|
||||
{
|
||||
}
|
||||
|
||||
void AsynchronousInsertQueue::InsertData::Entry::resetChunk()
|
||||
{
|
||||
if (chunk.empty())
|
||||
return;
|
||||
|
||||
// To avoid races on counter of user's MemoryTracker we should free memory at this moment.
|
||||
// Entries data must be destroyed in context of user who runs async insert.
|
||||
// Each entry in the list may correspond to a different user,
|
||||
// so we need to switch current thread's MemoryTracker.
|
||||
MemoryTrackerSwitcher switcher(user_memory_tracker);
|
||||
chunk = {};
|
||||
}
|
||||
|
||||
void AsynchronousInsertQueue::InsertData::Entry::finish(std::exception_ptr exception_)
|
||||
{
|
||||
if (finished.exchange(true))
|
||||
return;
|
||||
|
||||
{
|
||||
// To avoid races on counter of user's MemoryTracker we should free memory at this moment.
|
||||
// Entries data must be destroyed in context of user who runs async insert.
|
||||
// Each entry in the list may correspond to a different user,
|
||||
// so we need to switch current thread's MemoryTracker.
|
||||
MemoryTrackerSwitcher switcher(user_memory_tracker);
|
||||
chunk = {};
|
||||
}
|
||||
resetChunk();
|
||||
|
||||
if (exception_)
|
||||
{
|
||||
@ -224,7 +230,7 @@ AsynchronousInsertQueue::~AsynchronousInsertQueue()
|
||||
auto & shard = queue_shards[i];
|
||||
|
||||
shard.are_tasks_available.notify_one();
|
||||
assert(dump_by_first_update_threads[i].joinable());
|
||||
chassert(dump_by_first_update_threads[i].joinable());
|
||||
dump_by_first_update_threads[i].join();
|
||||
|
||||
if (flush_on_shutdown)
|
||||
@ -510,14 +516,13 @@ void AsynchronousInsertQueue::validateSettings(const Settings & settings, Logger
|
||||
/// Adaptive timeout settings.
|
||||
const auto min_ms = std::chrono::milliseconds(settings.async_insert_busy_timeout_min_ms);
|
||||
|
||||
if (min_ms > max_ms)
|
||||
if (log)
|
||||
LOG_WARNING(
|
||||
log,
|
||||
"Setting 'async_insert_busy_timeout_min_ms'={} is greater than 'async_insert_busy_timeout_max_ms'={}. Ignoring "
|
||||
"'async_insert_busy_timeout_min_ms'",
|
||||
min_ms.count(),
|
||||
max_ms.count());
|
||||
if (min_ms > max_ms && log)
|
||||
LOG_WARNING(
|
||||
log,
|
||||
"Setting 'async_insert_busy_timeout_min_ms'={} is greater than 'async_insert_busy_timeout_max_ms'={}. Ignoring "
|
||||
"'async_insert_busy_timeout_min_ms'",
|
||||
min_ms.count(),
|
||||
max_ms.count());
|
||||
|
||||
if (settings.async_insert_busy_timeout_increase_rate <= 0)
|
||||
throw Exception(ErrorCodes::INVALID_SETTING_VALUE, "Setting 'async_insert_busy_timeout_increase_rate' must be greater than zero");
|
||||
@ -953,14 +958,18 @@ Chunk AsynchronousInsertQueue::processEntriesWithParsing(
|
||||
"Expected entry with data kind Parsed. Got: {}", entry->chunk.getDataKind());
|
||||
|
||||
auto buffer = std::make_unique<ReadBufferFromString>(*bytes);
|
||||
|
||||
size_t num_bytes = bytes->size();
|
||||
size_t num_rows = executor.execute(*buffer);
|
||||
|
||||
total_rows += num_rows;
|
||||
chunk_info->offsets.push_back(total_rows);
|
||||
chunk_info->tokens.push_back(entry->async_dedup_token);
|
||||
|
||||
add_to_async_insert_log(entry, query_for_logging, current_exception, num_rows, num_bytes, data->timeout_ms);
|
||||
|
||||
current_exception.clear();
|
||||
entry->resetChunk();
|
||||
}
|
||||
|
||||
Chunk chunk(executor.getResultColumns(), total_rows);
|
||||
@ -1011,6 +1020,8 @@ Chunk AsynchronousInsertQueue::processPreprocessedEntries(
|
||||
|
||||
const auto & query_for_logging = get_query_by_format(entry->format);
|
||||
add_to_async_insert_log(entry, query_for_logging, "", block->rows(), block->bytes(), data->timeout_ms);
|
||||
|
||||
entry->resetChunk();
|
||||
}
|
||||
|
||||
Chunk chunk(std::move(result_columns), total_rows);
|
||||
|
@ -117,6 +117,17 @@ private:
|
||||
return DataKind::Parsed;
|
||||
}
|
||||
|
||||
bool empty() const
|
||||
{
|
||||
return std::visit([]<typename T>(const T & arg)
|
||||
{
|
||||
if constexpr (std::is_same_v<T, Block>)
|
||||
return arg.rows() == 0;
|
||||
else
|
||||
return arg.empty();
|
||||
}, *this);
|
||||
}
|
||||
|
||||
const String * asString() const { return std::get_if<String>(this); }
|
||||
const Block * asBlock() const { return std::get_if<Block>(this); }
|
||||
};
|
||||
@ -140,7 +151,9 @@ private:
|
||||
const String & format_,
|
||||
MemoryTracker * user_memory_tracker_);
|
||||
|
||||
void resetChunk();
|
||||
void finish(std::exception_ptr exception_ = nullptr);
|
||||
|
||||
std::future<void> getFuture() { return promise.get_future(); }
|
||||
bool isFinished() const { return finished; }
|
||||
|
||||
|
@ -1143,7 +1143,7 @@ void DatabaseCatalog::dequeueDroppedTableCleanup(StorageID table_id)
|
||||
TableMarkedAsDropped dropped_table;
|
||||
{
|
||||
std::lock_guard lock(tables_marked_dropped_mutex);
|
||||
time_t latest_drop_time = std::numeric_limits<time_t>::min();
|
||||
auto latest_drop_time = std::numeric_limits<time_t>::min();
|
||||
auto it_dropped_table = tables_marked_dropped.end();
|
||||
for (auto it = tables_marked_dropped.begin(); it != tables_marked_dropped.end(); ++it)
|
||||
{
|
||||
@ -1168,7 +1168,7 @@ void DatabaseCatalog::dequeueDroppedTableCleanup(StorageID table_id)
|
||||
}
|
||||
if (it_dropped_table == tables_marked_dropped.end())
|
||||
throw Exception(ErrorCodes::UNKNOWN_TABLE,
|
||||
"The drop task of table {} is in progress, has been dropped or the database engine doesn't support it",
|
||||
"Table {} is being dropped, has been dropped, or the database engine does not support UNDROP",
|
||||
table_id.getNameForLogs());
|
||||
latest_metadata_dropped_path = it_dropped_table->metadata_path;
|
||||
String table_metadata_path = getPathForMetadata(it_dropped_table->table_id);
|
||||
|
@ -417,7 +417,7 @@ BlockIO InterpreterDropQuery::executeToDatabaseImpl(const ASTDropQuery & query,
|
||||
uuids_to_wait.push_back(table_to_wait);
|
||||
}
|
||||
}
|
||||
// only if operation is DETACH
|
||||
// only if operation is DETACH
|
||||
if ((!drop || !truncate) && query.sync)
|
||||
{
|
||||
/// Avoid "some tables are still in use" when sync mode is enabled
|
||||
|
@ -96,6 +96,7 @@
|
||||
#include <Common/scope_guard_safe.h>
|
||||
#include <Common/typeid_cast.h>
|
||||
#include <Common/ProfileEvents.h>
|
||||
#include <Common/NaNUtils.h>
|
||||
|
||||
|
||||
namespace ProfileEvents
|
||||
@ -2553,10 +2554,13 @@ void InterpreterSelectQuery::executeFetchColumns(QueryProcessingStage::Enum proc
|
||||
/// If necessary, we request more sources than the number of threads - to distribute the work evenly over the threads.
|
||||
if (max_streams > 1 && !is_sync_remote)
|
||||
{
|
||||
if (auto streams_with_ratio = max_streams * settings.max_streams_to_max_threads_ratio; streams_with_ratio < SIZE_MAX)
|
||||
if (auto streams_with_ratio = max_streams * settings.max_streams_to_max_threads_ratio; canConvertTo<size_t>(streams_with_ratio))
|
||||
max_streams = static_cast<size_t>(streams_with_ratio);
|
||||
else
|
||||
throw Exception(ErrorCodes::PARAMETER_OUT_OF_BOUND, "Exceeded limit for `max_streams` with `max_streams_to_max_threads_ratio`. Make sure that `max_streams * max_streams_to_max_threads_ratio` not exceeds {}, current value: {}", SIZE_MAX, streams_with_ratio);
|
||||
throw Exception(ErrorCodes::PARAMETER_OUT_OF_BOUND,
|
||||
"Exceeded limit for `max_streams` with `max_streams_to_max_threads_ratio`. "
|
||||
"Make sure that `max_streams * max_streams_to_max_threads_ratio` is in some reasonable boundaries, current value: {}",
|
||||
streams_with_ratio);
|
||||
}
|
||||
|
||||
auto & prewhere_info = analysis_result.prewhere_info;
|
||||
|
@ -86,7 +86,6 @@ void replaceStorageInQueryTree(QueryTreeNodePtr & query_tree, const ContextPtr &
|
||||
continue;
|
||||
|
||||
auto replacement_table_expression = std::make_shared<TableNode>(storage, context);
|
||||
replacement_table_expression->setAlias(node->getAlias());
|
||||
|
||||
if (auto table_expression_modifiers = table_node.getTableExpressionModifiers())
|
||||
replacement_table_expression->setTableExpressionModifiers(*table_expression_modifiers);
|
||||
|
@ -17,14 +17,16 @@ namespace ErrorCodes
|
||||
extern const int SUPPORT_IS_DISABLED;
|
||||
}
|
||||
|
||||
InterpreterUndropQuery::InterpreterUndropQuery(const ASTPtr & query_ptr_, ContextMutablePtr context_) : WithMutableContext(context_), query_ptr(query_ptr_)
|
||||
InterpreterUndropQuery::InterpreterUndropQuery(const ASTPtr & query_ptr_, ContextMutablePtr context_)
|
||||
: WithMutableContext(context_)
|
||||
, query_ptr(query_ptr_)
|
||||
{
|
||||
}
|
||||
|
||||
|
||||
BlockIO InterpreterUndropQuery::execute()
|
||||
{
|
||||
getContext()->checkAccess(AccessType::UNDROP_TABLE);
|
||||
|
||||
auto & undrop = query_ptr->as<ASTUndropQuery &>();
|
||||
if (!undrop.cluster.empty() && !maybeRemoveOnCluster(query_ptr, getContext()))
|
||||
{
|
||||
|
@ -94,7 +94,24 @@ public:
|
||||
ASTPtr clone() const override
|
||||
{
|
||||
auto res = std::make_shared<ASTRenameQuery>(*this);
|
||||
res->cloneChildren();
|
||||
res->children.clear();
|
||||
|
||||
auto clone_child = [&res](ASTPtr & node)
|
||||
{
|
||||
if (node)
|
||||
{
|
||||
node = node->clone();
|
||||
res->children.push_back(node);
|
||||
}
|
||||
};
|
||||
|
||||
for (auto & elem : res->elements)
|
||||
{
|
||||
clone_child(elem.from.database);
|
||||
clone_child(elem.from.table);
|
||||
clone_child(elem.to.database);
|
||||
clone_child(elem.to.table);
|
||||
}
|
||||
cloneOutputOptions(*res);
|
||||
return res;
|
||||
}
|
||||
@ -108,9 +125,15 @@ public:
|
||||
for (Element & elem : query.elements)
|
||||
{
|
||||
if (!elem.from.database)
|
||||
{
|
||||
elem.from.database = std::make_shared<ASTIdentifier>(params.default_database);
|
||||
query.children.push_back(elem.from.database);
|
||||
}
|
||||
if (!elem.to.database)
|
||||
{
|
||||
elem.to.database = std::make_shared<ASTIdentifier>(params.default_database);
|
||||
query.children.push_back(elem.to.database);
|
||||
}
|
||||
}
|
||||
|
||||
return query_ptr;
|
||||
|
@ -131,6 +131,7 @@ namespace ErrorCodes
|
||||
extern const int LOGICAL_ERROR;
|
||||
extern const int TOO_MANY_ROWS;
|
||||
extern const int CANNOT_PARSE_TEXT;
|
||||
extern const int PARAMETER_OUT_OF_BOUND;
|
||||
}
|
||||
|
||||
static MergeTreeReaderSettings getMergeTreeReaderSettings(
|
||||
@ -348,7 +349,14 @@ Pipe ReadFromMergeTree::readFromPoolParallelReplicas(
|
||||
|
||||
/// We have a special logic for local replica. It has to read less data, because in some cases it should
|
||||
/// merge states of aggregate functions or do some other important stuff other than reading from Disk.
|
||||
pool_settings.min_marks_for_concurrent_read = static_cast<size_t>(pool_settings.min_marks_for_concurrent_read * context->getSettingsRef().parallel_replicas_single_task_marks_count_multiplier);
|
||||
const auto multiplier = context->getSettingsRef().parallel_replicas_single_task_marks_count_multiplier;
|
||||
if (auto result = pool_settings.min_marks_for_concurrent_read * multiplier; canConvertTo<size_t>(result))
|
||||
pool_settings.min_marks_for_concurrent_read = static_cast<size_t>(result);
|
||||
else
|
||||
throw Exception(ErrorCodes::PARAMETER_OUT_OF_BOUND,
|
||||
"Exceeded limit for the number of marks per a single task for parallel replicas. "
|
||||
"Make sure that `parallel_replicas_single_task_marks_count_multiplier` is in some reasonable boundaries, current value is: {}",
|
||||
multiplier);
|
||||
|
||||
auto pool = std::make_shared<MergeTreeReadPoolParallelReplicas>(
|
||||
std::move(extension),
|
||||
@ -512,8 +520,14 @@ Pipe ReadFromMergeTree::readInOrder(
|
||||
.columns_to_read = required_columns,
|
||||
};
|
||||
|
||||
pool_settings.min_marks_for_concurrent_read = static_cast<size_t>(
|
||||
pool_settings.min_marks_for_concurrent_read * context->getSettingsRef().parallel_replicas_single_task_marks_count_multiplier);
|
||||
const auto multiplier = context->getSettingsRef().parallel_replicas_single_task_marks_count_multiplier;
|
||||
if (auto result = pool_settings.min_marks_for_concurrent_read * multiplier; canConvertTo<size_t>(result))
|
||||
pool_settings.min_marks_for_concurrent_read = static_cast<size_t>(result);
|
||||
else
|
||||
throw Exception(ErrorCodes::PARAMETER_OUT_OF_BOUND,
|
||||
"Exceeded limit for the number of marks per a single task for parallel replicas. "
|
||||
"Make sure that `parallel_replicas_single_task_marks_count_multiplier` is in some reasonable boundaries, current value is: {}",
|
||||
multiplier);
|
||||
|
||||
CoordinationMode mode = read_type == ReadType::InOrder
|
||||
? CoordinationMode::WithOrder
|
||||
|
@ -10,6 +10,10 @@ class ASTStorage;
|
||||
|
||||
#define MEMORY_SETTINGS(M, ALIAS) \
|
||||
M(Bool, compress, false, "Compress data in memory", 0) \
|
||||
M(UInt64, min_rows_to_keep, 0, "Minimum block size (in rows) to retain in Memory table buffer.", 0) \
|
||||
M(UInt64, max_rows_to_keep, 0, "Maximum block size (in rows) to retain in Memory table buffer.", 0) \
|
||||
M(UInt64, min_bytes_to_keep, 0, "Minimum block size (in bytes) to retain in Memory table buffer.", 0) \
|
||||
M(UInt64, max_bytes_to_keep, 0, "Maximum block size (in bytes) to retain in Memory table buffer.", 0) \
|
||||
|
||||
DECLARE_SETTINGS_TRAITS(memorySettingsTraits, MEMORY_SETTINGS)
|
||||
|
||||
|
@ -241,7 +241,7 @@ std::vector<AsyncInsertInfoPtr> scatterAsyncInsertInfoBySelector(AsyncInsertInfo
|
||||
}
|
||||
|
||||
BlocksWithPartition MergeTreeDataWriter::splitBlockIntoParts(
|
||||
const Block & block, size_t max_parts, const StorageMetadataPtr & metadata_snapshot, ContextPtr context, AsyncInsertInfoPtr async_insert_info)
|
||||
Block && block, size_t max_parts, const StorageMetadataPtr & metadata_snapshot, ContextPtr context, AsyncInsertInfoPtr async_insert_info)
|
||||
{
|
||||
BlocksWithPartition result;
|
||||
if (!block || !block.rows())
|
||||
@ -320,7 +320,7 @@ BlocksWithPartition MergeTreeDataWriter::splitBlockIntoParts(
|
||||
}
|
||||
|
||||
Block MergeTreeDataWriter::mergeBlock(
|
||||
const Block & block,
|
||||
Block && block,
|
||||
SortDescription sort_description,
|
||||
const Names & partition_key_columns,
|
||||
IColumn::Permutation *& permutation,
|
||||
@ -410,7 +410,11 @@ MergeTreeDataWriter::TemporaryPart MergeTreeDataWriter::writeTempPartWithoutPref
|
||||
}
|
||||
|
||||
MergeTreeDataWriter::TemporaryPart MergeTreeDataWriter::writeTempPartImpl(
|
||||
BlockWithPartition & block_with_partition, const StorageMetadataPtr & metadata_snapshot, ContextPtr context, int64_t block_number, bool need_tmp_prefix)
|
||||
BlockWithPartition & block_with_partition,
|
||||
const StorageMetadataPtr & metadata_snapshot,
|
||||
ContextPtr context,
|
||||
int64_t block_number,
|
||||
bool need_tmp_prefix)
|
||||
{
|
||||
TemporaryPart temp_part;
|
||||
Block & block = block_with_partition.block;
|
||||
@ -498,7 +502,7 @@ MergeTreeDataWriter::TemporaryPart MergeTreeDataWriter::writeTempPartImpl(
|
||||
if (context->getSettingsRef().optimize_on_insert)
|
||||
{
|
||||
ProfileEventTimeIncrement<Microseconds> watch(ProfileEvents::MergeTreeDataWriterMergingBlocksMicroseconds);
|
||||
block = mergeBlock(block, sort_description, partition_key_columns, perm_ptr, data.merging_params);
|
||||
block = mergeBlock(std::move(block), sort_description, partition_key_columns, perm_ptr, data.merging_params);
|
||||
}
|
||||
|
||||
/// Size of part would not be greater than block.bytes() + epsilon
|
||||
@ -718,7 +722,7 @@ MergeTreeDataWriter::TemporaryPart MergeTreeDataWriter::writeProjectionPartImpl(
|
||||
|
||||
MergeTreeData::MergingParams projection_merging_params;
|
||||
projection_merging_params.mode = MergeTreeData::MergingParams::Aggregating;
|
||||
block = mergeBlock(block, sort_description, {}, perm_ptr, projection_merging_params);
|
||||
block = mergeBlock(std::move(block), sort_description, {}, perm_ptr, projection_merging_params);
|
||||
}
|
||||
|
||||
/// This effectively chooses minimal compression method:
|
||||
|
@ -53,7 +53,7 @@ public:
|
||||
* (split rows by partition)
|
||||
* Works deterministically: if same block was passed, function will return same result in same order.
|
||||
*/
|
||||
static BlocksWithPartition splitBlockIntoParts(const Block & block, size_t max_parts, const StorageMetadataPtr & metadata_snapshot, ContextPtr context, AsyncInsertInfoPtr async_insert_info = nullptr);
|
||||
static BlocksWithPartition splitBlockIntoParts(Block && block, size_t max_parts, const StorageMetadataPtr & metadata_snapshot, ContextPtr context, AsyncInsertInfoPtr async_insert_info = nullptr);
|
||||
|
||||
/// This structure contains not completely written temporary part.
|
||||
/// Some writes may happen asynchronously, e.g. for blob storages.
|
||||
@ -107,7 +107,7 @@ public:
|
||||
size_t block_num);
|
||||
|
||||
static Block mergeBlock(
|
||||
const Block & block,
|
||||
Block && block,
|
||||
SortDescription sort_description,
|
||||
const Names & partition_key_columns,
|
||||
IColumn::Permutation *& permutation,
|
||||
|
@ -63,7 +63,7 @@ void MergeTreeSink::consume(Chunk chunk)
|
||||
if (!storage_snapshot->object_columns.empty())
|
||||
convertDynamicColumnsToTuples(block, storage_snapshot);
|
||||
|
||||
auto part_blocks = storage.writer.splitBlockIntoParts(block, max_parts_per_block, metadata_snapshot, context);
|
||||
auto part_blocks = storage.writer.splitBlockIntoParts(std::move(block), max_parts_per_block, metadata_snapshot, context);
|
||||
|
||||
using DelayedPartitions = std::vector<MergeTreeSink::DelayedChunk::Partition>;
|
||||
DelayedPartitions partitions;
|
||||
@ -87,6 +87,10 @@ void MergeTreeSink::consume(Chunk chunk)
|
||||
elapsed_ns = watch.elapsed();
|
||||
}
|
||||
|
||||
/// Reset earlier to free memory
|
||||
current_block.block.clear();
|
||||
current_block.partition.clear();
|
||||
|
||||
/// If optimize_on_insert setting is true, current_block could become empty after merge
|
||||
/// and we didn't create part.
|
||||
if (!temp_part.part)
|
||||
|
@ -288,7 +288,7 @@ void ReplicatedMergeTreeSinkImpl<async_insert>::consume(Chunk chunk)
|
||||
throw Exception(ErrorCodes::LOGICAL_ERROR, "No chunk info for async inserts");
|
||||
}
|
||||
|
||||
auto part_blocks = storage.writer.splitBlockIntoParts(block, max_parts_per_block, metadata_snapshot, context, async_insert_info);
|
||||
auto part_blocks = storage.writer.splitBlockIntoParts(std::move(block), max_parts_per_block, metadata_snapshot, context, async_insert_info);
|
||||
|
||||
using DelayedPartition = typename ReplicatedMergeTreeSinkImpl<async_insert>::DelayedChunk::Partition;
|
||||
using DelayedPartitions = std::vector<DelayedPartition>;
|
||||
@ -383,6 +383,12 @@ void ReplicatedMergeTreeSinkImpl<async_insert>::consume(Chunk chunk)
|
||||
partitions = DelayedPartitions{};
|
||||
}
|
||||
|
||||
if constexpr (!async_insert)
|
||||
{
|
||||
/// Reset earlier to free memory.
|
||||
current_block.block.clear();
|
||||
current_block.partition.clear();
|
||||
}
|
||||
|
||||
partitions.emplace_back(DelayedPartition(
|
||||
log,
|
||||
|
@ -46,6 +46,7 @@ namespace ErrorCodes
|
||||
extern const int NUMBER_OF_ARGUMENTS_DOESNT_MATCH;
|
||||
extern const int CANNOT_RESTORE_TABLE;
|
||||
extern const int NOT_IMPLEMENTED;
|
||||
extern const int SETTING_CONSTRAINT_VIOLATION;
|
||||
}
|
||||
|
||||
class MemorySink : public SinkToStorage
|
||||
@ -103,16 +104,37 @@ public:
|
||||
std::lock_guard lock(storage.mutex);
|
||||
|
||||
auto new_data = std::make_unique<Blocks>(*(storage.data.get()));
|
||||
UInt64 new_total_rows = storage.total_size_rows.load(std::memory_order_relaxed) + inserted_rows;
|
||||
UInt64 new_total_bytes = storage.total_size_bytes.load(std::memory_order_relaxed) + inserted_bytes;
|
||||
while (!new_data->empty()
|
||||
&& ((storage.max_bytes_to_keep && new_total_bytes > storage.max_bytes_to_keep)
|
||||
|| (storage.max_rows_to_keep && new_total_rows > storage.max_rows_to_keep)))
|
||||
{
|
||||
Block oldest_block = new_data->front();
|
||||
UInt64 rows_to_remove = oldest_block.rows();
|
||||
UInt64 bytes_to_remove = oldest_block.allocatedBytes();
|
||||
if (new_total_bytes - bytes_to_remove < storage.min_bytes_to_keep
|
||||
|| new_total_rows - rows_to_remove < storage.min_rows_to_keep)
|
||||
{
|
||||
break; // stop - removing next block will put us under min_bytes / min_rows threshold
|
||||
}
|
||||
|
||||
// delete old block from current storage table
|
||||
new_total_rows -= rows_to_remove;
|
||||
new_total_bytes -= bytes_to_remove;
|
||||
new_data->erase(new_data->begin());
|
||||
}
|
||||
|
||||
// append new data to modified storage table and commit
|
||||
new_data->insert(new_data->end(), new_blocks.begin(), new_blocks.end());
|
||||
|
||||
storage.data.set(std::move(new_data));
|
||||
storage.total_size_bytes.fetch_add(inserted_bytes, std::memory_order_relaxed);
|
||||
storage.total_size_rows.fetch_add(inserted_rows, std::memory_order_relaxed);
|
||||
storage.total_size_rows.store(new_total_rows, std::memory_order_relaxed);
|
||||
storage.total_size_bytes.store(new_total_bytes, std::memory_order_relaxed);
|
||||
}
|
||||
|
||||
private:
|
||||
Blocks new_blocks;
|
||||
|
||||
StorageMemory & storage;
|
||||
StorageSnapshotPtr storage_snapshot;
|
||||
};
|
||||
@ -123,8 +145,10 @@ StorageMemory::StorageMemory(
|
||||
ColumnsDescription columns_description_,
|
||||
ConstraintsDescription constraints_,
|
||||
const String & comment,
|
||||
bool compress_)
|
||||
: IStorage(table_id_), data(std::make_unique<const Blocks>()), compress(compress_)
|
||||
const MemorySettings & settings)
|
||||
: IStorage(table_id_), data(std::make_unique<const Blocks>()), compress(settings.compress),
|
||||
min_rows_to_keep(settings.min_rows_to_keep), max_rows_to_keep(settings.max_rows_to_keep),
|
||||
min_bytes_to_keep(settings.min_bytes_to_keep), max_bytes_to_keep(settings.max_bytes_to_keep)
|
||||
{
|
||||
StorageInMemoryMetadata storage_metadata;
|
||||
storage_metadata.setColumns(std::move(columns_description_));
|
||||
@ -542,7 +566,11 @@ void registerStorageMemory(StorageFactory & factory)
|
||||
if (has_settings)
|
||||
settings.loadFromQuery(*args.storage_def);
|
||||
|
||||
return std::make_shared<StorageMemory>(args.table_id, args.columns, args.constraints, args.comment, settings.compress);
|
||||
if (settings.min_bytes_to_keep > settings.max_bytes_to_keep
|
||||
|| settings.min_rows_to_keep > settings.max_rows_to_keep)
|
||||
throw Exception(ErrorCodes::SETTING_CONSTRAINT_VIOLATION, "Min. bytes / rows must be set with a max.");
|
||||
|
||||
return std::make_shared<StorageMemory>(args.table_id, args.columns, args.constraints, args.comment, settings);
|
||||
},
|
||||
{
|
||||
.supports_settings = true,
|
||||
|
@ -7,6 +7,7 @@
|
||||
#include <Core/NamesAndTypes.h>
|
||||
#include <Interpreters/DatabaseCatalog.h>
|
||||
#include <Storages/IStorage.h>
|
||||
#include <Storages/MemorySettings.h>
|
||||
|
||||
#include <Common/MultiVersion.h>
|
||||
|
||||
@ -30,7 +31,7 @@ public:
|
||||
ColumnsDescription columns_description_,
|
||||
ConstraintsDescription constraints_,
|
||||
const String & comment,
|
||||
bool compress_ = false);
|
||||
const MemorySettings & settings = MemorySettings());
|
||||
|
||||
String getName() const override { return "Memory"; }
|
||||
|
||||
@ -134,6 +135,11 @@ private:
|
||||
std::atomic<size_t> total_size_rows = 0;
|
||||
|
||||
bool compress;
|
||||
UInt64 min_rows_to_keep;
|
||||
UInt64 max_rows_to_keep;
|
||||
UInt64 min_bytes_to_keep;
|
||||
UInt64 max_bytes_to_keep;
|
||||
|
||||
|
||||
friend class ReadFromMemoryStorageStep;
|
||||
};
|
||||
|
@ -885,7 +885,6 @@ SelectQueryInfo ReadFromMerge::getModifiedQueryInfo(const ContextMutablePtr & mo
|
||||
if (modified_query_info.table_expression)
|
||||
{
|
||||
auto replacement_table_expression = std::make_shared<TableNode>(storage, storage_lock, storage_snapshot_);
|
||||
replacement_table_expression->setAlias(modified_query_info.table_expression->getAlias());
|
||||
if (query_info.table_expression_modifiers)
|
||||
replacement_table_expression->setTableExpressionModifiers(*query_info.table_expression_modifiers);
|
||||
|
||||
@ -1026,7 +1025,7 @@ QueryPipelineBuilderPtr ReadFromMerge::createSources(
|
||||
const auto & [database_name, storage, _, table_name] = storage_with_lock;
|
||||
bool allow_experimental_analyzer = context->getSettingsRef().allow_experimental_analyzer;
|
||||
auto storage_stage
|
||||
= storage->getQueryProcessingStage(context, processed_stage, storage_snapshot_, modified_query_info);
|
||||
= storage->getQueryProcessingStage(context, QueryProcessingStage::Complete, storage_snapshot_, modified_query_info);
|
||||
|
||||
builder = plan.buildQueryPipeline(
|
||||
QueryPlanOptimizationSettings::fromContext(context), BuildQueryPipelineSettings::fromContext(context));
|
||||
@ -1053,80 +1052,40 @@ QueryPipelineBuilderPtr ReadFromMerge::createSources(
|
||||
|
||||
Block pipe_header = builder->getHeader();
|
||||
|
||||
if (allow_experimental_analyzer)
|
||||
if (has_database_virtual_column && common_header.has("_database") && !pipe_header.has("_database"))
|
||||
{
|
||||
String table_alias = modified_query_info.query_tree->as<QueryNode>()->getJoinTree()->as<TableNode>()->getAlias();
|
||||
ColumnWithTypeAndName column;
|
||||
column.name = "_database";
|
||||
column.type = std::make_shared<DataTypeLowCardinality>(std::make_shared<DataTypeString>());
|
||||
column.column = column.type->createColumnConst(0, Field(database_name));
|
||||
|
||||
String database_column = table_alias.empty() || processed_stage == QueryProcessingStage::FetchColumns ? "_database" : table_alias + "._database";
|
||||
String table_column = table_alias.empty() || processed_stage == QueryProcessingStage::FetchColumns ? "_table" : table_alias + "._table";
|
||||
auto adding_column_dag = ActionsDAG::makeAddingColumnActions(std::move(column));
|
||||
auto adding_column_actions = std::make_shared<ExpressionActions>(
|
||||
std::move(adding_column_dag), ExpressionActionsSettings::fromContext(context, CompileExpressions::yes));
|
||||
|
||||
if (has_database_virtual_column && common_header.has(database_column)
|
||||
&& (storage_stage == QueryProcessingStage::FetchColumns || !pipe_header.has("'" + database_name + "'_String")))
|
||||
{
|
||||
ColumnWithTypeAndName column;
|
||||
column.name = database_column;
|
||||
column.type = std::make_shared<DataTypeLowCardinality>(std::make_shared<DataTypeString>());
|
||||
column.column = column.type->createColumnConst(0, Field(database_name));
|
||||
|
||||
auto adding_column_dag = ActionsDAG::makeAddingColumnActions(std::move(column));
|
||||
auto adding_column_actions = std::make_shared<ExpressionActions>(
|
||||
std::move(adding_column_dag), ExpressionActionsSettings::fromContext(context, CompileExpressions::yes));
|
||||
|
||||
builder->addSimpleTransform([&](const Block & stream_header)
|
||||
{ return std::make_shared<ExpressionTransform>(stream_header, adding_column_actions); });
|
||||
}
|
||||
|
||||
if (has_table_virtual_column && common_header.has(table_column)
|
||||
&& (storage_stage == QueryProcessingStage::FetchColumns || !pipe_header.has("'" + table_name + "'_String")))
|
||||
{
|
||||
ColumnWithTypeAndName column;
|
||||
column.name = table_column;
|
||||
column.type = std::make_shared<DataTypeLowCardinality>(std::make_shared<DataTypeString>());
|
||||
column.column = column.type->createColumnConst(0, Field(table_name));
|
||||
|
||||
auto adding_column_dag = ActionsDAG::makeAddingColumnActions(std::move(column));
|
||||
auto adding_column_actions = std::make_shared<ExpressionActions>(
|
||||
std::move(adding_column_dag), ExpressionActionsSettings::fromContext(context, CompileExpressions::yes));
|
||||
|
||||
builder->addSimpleTransform([&](const Block & stream_header)
|
||||
{ return std::make_shared<ExpressionTransform>(stream_header, adding_column_actions); });
|
||||
}
|
||||
builder->addSimpleTransform([&](const Block & stream_header)
|
||||
{ return std::make_shared<ExpressionTransform>(stream_header, adding_column_actions); });
|
||||
}
|
||||
else
|
||||
|
||||
if (has_table_virtual_column && common_header.has("_table") && !pipe_header.has("_table"))
|
||||
{
|
||||
if (has_database_virtual_column && common_header.has("_database") && !pipe_header.has("_database"))
|
||||
{
|
||||
ColumnWithTypeAndName column;
|
||||
column.name = "_database";
|
||||
column.type = std::make_shared<DataTypeLowCardinality>(std::make_shared<DataTypeString>());
|
||||
column.column = column.type->createColumnConst(0, Field(database_name));
|
||||
ColumnWithTypeAndName column;
|
||||
column.name = "_table";
|
||||
column.type = std::make_shared<DataTypeLowCardinality>(std::make_shared<DataTypeString>());
|
||||
column.column = column.type->createColumnConst(0, Field(table_name));
|
||||
|
||||
auto adding_column_dag = ActionsDAG::makeAddingColumnActions(std::move(column));
|
||||
auto adding_column_actions = std::make_shared<ExpressionActions>(
|
||||
std::move(adding_column_dag), ExpressionActionsSettings::fromContext(context, CompileExpressions::yes));
|
||||
builder->addSimpleTransform([&](const Block & stream_header)
|
||||
{ return std::make_shared<ExpressionTransform>(stream_header, adding_column_actions); });
|
||||
}
|
||||
auto adding_column_dag = ActionsDAG::makeAddingColumnActions(std::move(column));
|
||||
auto adding_column_actions = std::make_shared<ExpressionActions>(
|
||||
std::move(adding_column_dag), ExpressionActionsSettings::fromContext(context, CompileExpressions::yes));
|
||||
|
||||
if (has_table_virtual_column && common_header.has("_table") && !pipe_header.has("_table"))
|
||||
{
|
||||
ColumnWithTypeAndName column;
|
||||
column.name = "_table";
|
||||
column.type = std::make_shared<DataTypeLowCardinality>(std::make_shared<DataTypeString>());
|
||||
column.column = column.type->createColumnConst(0, Field(table_name));
|
||||
|
||||
auto adding_column_dag = ActionsDAG::makeAddingColumnActions(std::move(column));
|
||||
auto adding_column_actions = std::make_shared<ExpressionActions>(
|
||||
std::move(adding_column_dag), ExpressionActionsSettings::fromContext(context, CompileExpressions::yes));
|
||||
builder->addSimpleTransform([&](const Block & stream_header)
|
||||
{ return std::make_shared<ExpressionTransform>(stream_header, adding_column_actions); });
|
||||
}
|
||||
builder->addSimpleTransform([&](const Block & stream_header)
|
||||
{ return std::make_shared<ExpressionTransform>(stream_header, adding_column_actions); });
|
||||
}
|
||||
|
||||
/// Subordinary tables could have different but convertible types, like numeric types of different width.
|
||||
/// We must return streams with structure equals to structure of Merge table.
|
||||
convertAndFilterSourceStream(
|
||||
header, modified_query_info, storage_snapshot_, aliases, row_policy_data_opt, context, *builder, storage_stage);
|
||||
header, modified_query_info, storage_snapshot_, aliases, row_policy_data_opt, context, *builder, processed_stage);
|
||||
}
|
||||
|
||||
return builder;
|
||||
@ -1157,13 +1116,13 @@ QueryPlan ReadFromMerge::createPlanForTable(
|
||||
bool allow_experimental_analyzer = modified_context->getSettingsRef().allow_experimental_analyzer;
|
||||
|
||||
auto storage_stage = storage->getQueryProcessingStage(modified_context,
|
||||
processed_stage,
|
||||
QueryProcessingStage::Complete,
|
||||
storage_snapshot_,
|
||||
modified_query_info);
|
||||
|
||||
QueryPlan plan;
|
||||
|
||||
if (processed_stage <= storage_stage)
|
||||
if (processed_stage <= storage_stage || (allow_experimental_analyzer && processed_stage == QueryProcessingStage::FetchColumns))
|
||||
{
|
||||
/// If there are only virtual columns in query, you must request at least one other column.
|
||||
if (real_column_names.empty())
|
||||
@ -1208,7 +1167,7 @@ QueryPlan ReadFromMerge::createPlanForTable(
|
||||
row_policy_data_opt->addStorageFilter(source_step_with_filter);
|
||||
}
|
||||
}
|
||||
else if (processed_stage > storage_stage || allow_experimental_analyzer)
|
||||
else if (processed_stage > storage_stage || (allow_experimental_analyzer && processed_stage != QueryProcessingStage::FetchColumns))
|
||||
{
|
||||
/// Maximum permissible parallelism is streams_num
|
||||
modified_context->setSetting("max_threads", streams_num);
|
||||
|
@ -1,4 +1,5 @@
|
||||
00223_shard_distributed_aggregation_memory_efficient
|
||||
00717_merge_and_distributed
|
||||
00725_memory_tracking
|
||||
01062_pm_all_join_with_block_continuation
|
||||
01083_expressions_in_engine_arguments
|
||||
|
@ -0,0 +1,12 @@
|
||||
<?xml version="1.0" encoding="utf-8"?>
|
||||
<clickhouse>
|
||||
<background_pool_size>44</background_pool_size>
|
||||
<merge_tree>
|
||||
<include from_zk="/merge_max_block_size" merge="true"/>
|
||||
<merge_max_block_size>99</merge_max_block_size>
|
||||
<min_bytes_for_wide_part>1</min_bytes_for_wide_part>
|
||||
<min_rows_for_wide_part>1111</min_rows_for_wide_part>
|
||||
</merge_tree>
|
||||
|
||||
<include from_zk="/min_bytes_for_wide_part" merge="true"/>
|
||||
</clickhouse>
|
@ -13,7 +13,12 @@ node2 = cluster.add_instance(
|
||||
env_variables={"MAX_QUERY_SIZE": "55555"},
|
||||
)
|
||||
node3 = cluster.add_instance(
|
||||
"node3", user_configs=["configs/config_zk.xml"], with_zookeeper=True
|
||||
"node3",
|
||||
user_configs=[
|
||||
"configs/config_zk.xml",
|
||||
],
|
||||
main_configs=["configs/config_zk_include_test.xml"],
|
||||
with_zookeeper=True,
|
||||
)
|
||||
node4 = cluster.add_instance(
|
||||
"node4",
|
||||
@ -62,6 +67,16 @@ def start_cluster():
|
||||
value=b"<user_2><password></password><profile>default</profile></user_2>",
|
||||
makepath=True,
|
||||
)
|
||||
zk.create(
|
||||
path="/min_bytes_for_wide_part",
|
||||
value=b"<merge_tree><min_bytes_for_wide_part>33</min_bytes_for_wide_part></merge_tree>",
|
||||
makepath=True,
|
||||
)
|
||||
zk.create(
|
||||
path="/merge_max_block_size",
|
||||
value=b"<merge_max_block_size>8888</merge_max_block_size>",
|
||||
makepath=True,
|
||||
)
|
||||
|
||||
cluster.add_zookeeper_startup_command(create_zk_roots)
|
||||
|
||||
@ -237,3 +252,63 @@ def test_allow_databases(start_cluster):
|
||||
).strip()
|
||||
== ""
|
||||
)
|
||||
|
||||
|
||||
def test_config_multiple_zk_substitutions(start_cluster):
|
||||
assert (
|
||||
node3.query(
|
||||
"SELECT value FROM system.merge_tree_settings WHERE name='min_bytes_for_wide_part'"
|
||||
)
|
||||
== "33\n"
|
||||
)
|
||||
assert (
|
||||
node3.query(
|
||||
"SELECT value FROM system.merge_tree_settings WHERE name='min_rows_for_wide_part'"
|
||||
)
|
||||
== "1111\n"
|
||||
)
|
||||
assert (
|
||||
node3.query(
|
||||
"SELECT value FROM system.merge_tree_settings WHERE name='merge_max_block_size'"
|
||||
)
|
||||
== "8888\n"
|
||||
)
|
||||
assert (
|
||||
node3.query(
|
||||
"SELECT value FROM system.server_settings WHERE name='background_pool_size'"
|
||||
)
|
||||
== "44\n"
|
||||
)
|
||||
|
||||
zk = cluster.get_kazoo_client("zoo1")
|
||||
zk.create(
|
||||
path="/background_pool_size",
|
||||
value=b"<background_pool_size>72</background_pool_size>",
|
||||
makepath=True,
|
||||
)
|
||||
|
||||
node3.replace_config(
|
||||
"/etc/clickhouse-server/config.d/config_zk_include_test.xml",
|
||||
"""
|
||||
<clickhouse>
|
||||
<include from_zk="/background_pool_size" merge="true"/>
|
||||
<background_pool_size>44</background_pool_size>
|
||||
<merge_tree>
|
||||
<include from_zk="/merge_max_block_size" merge="true"/>
|
||||
<min_bytes_for_wide_part>1</min_bytes_for_wide_part>
|
||||
<min_rows_for_wide_part>1111</min_rows_for_wide_part>
|
||||
</merge_tree>
|
||||
|
||||
<include from_zk="/min_bytes_for_wide_part" merge="true"/>
|
||||
</clickhouse>
|
||||
""",
|
||||
)
|
||||
|
||||
node3.query("SYSTEM RELOAD CONFIG")
|
||||
|
||||
assert (
|
||||
node3.query(
|
||||
"SELECT value FROM system.server_settings WHERE name='background_pool_size'"
|
||||
)
|
||||
== "72\n"
|
||||
)
|
||||
|
@ -29,30 +29,39 @@ def test_undrop_drop_and_undrop_loop(started_cluster):
|
||||
logging.info(
|
||||
"random_sec: " + random_sec.__str__() + ", table_uuid: " + table_uuid
|
||||
)
|
||||
|
||||
node.query(
|
||||
"create table test_undrop_loop"
|
||||
"CREATE TABLE test_undrop_loop"
|
||||
+ count.__str__()
|
||||
+ " UUID '"
|
||||
+ table_uuid
|
||||
+ "' (id Int32) Engine=MergeTree() order by id;"
|
||||
+ "' (id Int32) ENGINE = MergeTree() ORDER BY id;"
|
||||
)
|
||||
node.query("drop table test_undrop_loop" + count.__str__() + ";")
|
||||
|
||||
node.query("DROP TABLE test_undrop_loop" + count.__str__() + ";")
|
||||
|
||||
time.sleep(random_sec)
|
||||
|
||||
if random_sec >= 5:
|
||||
error = node.query_and_get_error(
|
||||
"undrop table test_undrop_loop"
|
||||
"UNDROP TABLE test_undrop_loop"
|
||||
+ count.__str__()
|
||||
+ " uuid '"
|
||||
+ " UUID '"
|
||||
+ table_uuid
|
||||
+ "';"
|
||||
)
|
||||
assert "UNKNOWN_TABLE" in error
|
||||
else:
|
||||
elif random_sec <= 3:
|
||||
# (*)
|
||||
node.query(
|
||||
"undrop table test_undrop_loop"
|
||||
"UNDROP TABLE test_undrop_loop"
|
||||
+ count.__str__()
|
||||
+ " uuid '"
|
||||
+ " UUID '"
|
||||
+ table_uuid
|
||||
+ "';"
|
||||
)
|
||||
count = count + 1
|
||||
else:
|
||||
pass
|
||||
# ignore random_sec = 4 to account for communication delay with the database.
|
||||
# if we don't do that, then the second case (*) may find the table already dropped and receive an unexpected exception from the database (Bug #55167)
|
||||
|
7
tests/queries/0_stateless/00765_locate.reference
Normal file
7
tests/queries/0_stateless/00765_locate.reference
Normal file
@ -0,0 +1,7 @@
|
||||
-- negative tests
|
||||
-- test mysql compatibility setting
|
||||
0
|
||||
0
|
||||
3
|
||||
-- the function name needs to be case-insensitive for historical reasons
|
||||
0
|
15
tests/queries/0_stateless/00765_locate.sql
Normal file
15
tests/queries/0_stateless/00765_locate.sql
Normal file
@ -0,0 +1,15 @@
|
||||
SET send_logs_level = 'fatal';
|
||||
|
||||
SELECT '-- negative tests';
|
||||
SELECT locate(); -- { serverError NUMBER_OF_ARGUMENTS_DOESNT_MATCH }
|
||||
SELECT locate(1, 'abc'); -- { serverError ILLEGAL_TYPE_OF_ARGUMENT }
|
||||
SELECT locate('abc', 1); -- { serverError ILLEGAL_TYPE_OF_ARGUMENT }
|
||||
SELECT locate('abc', 'abc', 'abc'); -- { serverError ILLEGAL_TYPE_OF_ARGUMENT }
|
||||
|
||||
SELECT '-- test mysql compatibility setting';
|
||||
SELECT locate('abcabc', 'ca');
|
||||
SELECT locate('abcabc', 'ca') SETTINGS function_locate_has_mysql_compatible_argument_order = true;
|
||||
SELECT locate('abcabc', 'ca') SETTINGS function_locate_has_mysql_compatible_argument_order = false;
|
||||
|
||||
SELECT '-- the function name needs to be case-insensitive for historical reasons';
|
||||
SELECT LoCaTe('abcabc', 'ca');
|
@ -4,7 +4,6 @@ foo
|
||||
FOO
|
||||
baz
|
||||
zzz
|
||||
2
|
||||
fo
|
||||
oo
|
||||
o
|
||||
|
@ -6,7 +6,6 @@ select LOWER('Foo');
|
||||
select UPPER('Foo');
|
||||
select REPLACE('bar', 'r', 'z');
|
||||
select REGEXP_REPLACE('bar', '.', 'z');
|
||||
select Locate('foo', 'o');
|
||||
select SUBSTRING('foo', 1, 2);
|
||||
select Substr('foo', 2);
|
||||
select mid('foo', 3);
|
||||
|
@ -26,7 +26,6 @@ SELECT
|
||||
least(1),
|
||||
length('1'),
|
||||
log(1),
|
||||
position('1', '1'),
|
||||
log(1),
|
||||
log10(1),
|
||||
log2(1),
|
||||
|
@ -1 +1 @@
|
||||
EXPLAIN SYNTAX SELECT CAST(1 AS INT), CEIL(1), CEILING(1), CHAR(49), CHAR_LENGTH('1'), CHARACTER_LENGTH('1'), COALESCE(1), CONCAT('1', '1'), CORR(1, 1), COS(1), COUNT(1), COVAR_POP(1, 1), COVAR_SAMP(1, 1), DATABASE(), SCHEMA(), DATEDIFF('DAY', toDate('2020-10-24'), toDate('2019-10-24')), EXP(1), FLATTEN([[1]]), FLOOR(1), FQDN(), GREATEST(1), IF(1, 1, 1), IFNULL(1, 1), LCASE('A'), LEAST(1), LENGTH('1'), LN(1), LOCATE('1', '1'), LOG(1), LOG10(1), LOG2(1), LOWER('A'), MAX(1), MID('123', 1, 1), MIN(1), MOD(1, 1), NOT(1), NOW(), NOW64(), NULLIF(1, 1), PI(), POSITION('123', '2'), POW(1, 1), POWER(1, 1), RAND(), REPLACE('1', '1', '2'), REVERSE('123'), ROUND(1), SIN(1), SQRT(1), STDDEV_POP(1), STDDEV_SAMP(1), SUBSTR('123', 2), SUBSTRING('123', 2), SUM(1), TAN(1), TANH(1), TRUNC(1), TRUNCATE(1), UCASE('A'), UPPER('A'), USER(), VAR_POP(1), VAR_SAMP(1), WEEK(toDate('2020-10-24')), YEARWEEK(toDate('2020-10-24')) format TSVRaw;
|
||||
EXPLAIN SYNTAX SELECT CAST(1 AS INT), CEIL(1), CEILING(1), CHAR(49), CHAR_LENGTH('1'), CHARACTER_LENGTH('1'), COALESCE(1), CONCAT('1', '1'), CORR(1, 1), COS(1), COUNT(1), COVAR_POP(1, 1), COVAR_SAMP(1, 1), DATABASE(), SCHEMA(), DATEDIFF('DAY', toDate('2020-10-24'), toDate('2019-10-24')), EXP(1), FLATTEN([[1]]), FLOOR(1), FQDN(), GREATEST(1), IF(1, 1, 1), IFNULL(1, 1), LCASE('A'), LEAST(1), LENGTH('1'), LN(1), LOG(1), LOG10(1), LOG2(1), LOWER('A'), MAX(1), MID('123', 1, 1), MIN(1), MOD(1, 1), NOT(1), NOW(), NOW64(), NULLIF(1, 1), PI(), POSITION('123', '2'), POW(1, 1), POWER(1, 1), RAND(), REPLACE('1', '1', '2'), REVERSE('123'), ROUND(1), SIN(1), SQRT(1), STDDEV_POP(1), STDDEV_SAMP(1), SUBSTR('123', 2), SUBSTRING('123', 2), SUM(1), TAN(1), TANH(1), TRUNC(1), TRUNCATE(1), UCASE('A'), UPPER('A'), USER(), VAR_POP(1), VAR_SAMP(1), WEEK(toDate('2020-10-24')), YEARWEEK(toDate('2020-10-24')) format TSVRaw;
|
||||
|
@ -4,4 +4,4 @@ CURDIR=$(cd "$(dirname "${BASH_SOURCE[0]}")" && pwd)
|
||||
# shellcheck source=../shell_config.sh
|
||||
. "$CURDIR"/../shell_config.sh
|
||||
|
||||
$CLICKHOUSE_LOCAL --query="SET input_format_with_names_use_headers = 1" 2>&1 | grep -qF "Code: 115. DB::Exception: Unknown setting input_format_with_names_use_headers: Maybe you meant ['input_format_with_names_use_header','input_format_with_types_use_header']. (UNKNOWN_SETTING)" && echo 'OK' || echo 'FAIL' ||:
|
||||
$CLICKHOUSE_LOCAL --query="SET input_format_with_names_use_headers = 1" 2>&1 | grep -qF "Code: 115. DB::Exception: Unknown setting 'input_format_with_names_use_headers': Maybe you meant ['input_format_with_names_use_header','input_format_with_types_use_header']. (UNKNOWN_SETTING)" && echo 'OK' || echo 'FAIL' ||:
|
||||
|
@ -85,5 +85,5 @@ drop table 02681_undrop_multiple;
|
||||
select table from system.dropped_tables where table = '02681_undrop_multiple' limit 1;
|
||||
undrop table 02681_undrop_multiple;
|
||||
select * from 02681_undrop_multiple order by id;
|
||||
undrop table 02681_undrop_multiple; -- { serverError 57 }
|
||||
undrop table 02681_undrop_multiple; -- { serverError TABLE_ALREADY_EXISTS }
|
||||
drop table 02681_undrop_multiple sync;
|
||||
|
@ -0,0 +1,16 @@
|
||||
TESTING BYTES
|
||||
8192
|
||||
9216
|
||||
9216
|
||||
65536
|
||||
TESTING ROWS
|
||||
50
|
||||
1000
|
||||
1020
|
||||
1100
|
||||
TESTING NO CIRCULAR-BUFFER
|
||||
8192
|
||||
9216
|
||||
17408
|
||||
82944
|
||||
TESTING INVALID SETTINGS
|
@ -0,0 +1,63 @@
|
||||
SET max_block_size = 65409; -- Default value
|
||||
|
||||
DROP TABLE IF EXISTS memory;
|
||||
CREATE TABLE memory (i UInt32) ENGINE = Memory SETTINGS min_bytes_to_keep = 4096, max_bytes_to_keep = 16384;
|
||||
|
||||
SELECT 'TESTING BYTES';
|
||||
/* 1. testing oldest block doesn't get deleted because of min-threshold */
|
||||
INSERT INTO memory SELECT * FROM numbers(0, 1600);
|
||||
SELECT total_bytes FROM system.tables WHERE name = 'memory' and database = currentDatabase();
|
||||
|
||||
/* 2. adding block that doesn't get deleted */
|
||||
INSERT INTO memory SELECT * FROM numbers(1000, 100);
|
||||
SELECT total_bytes FROM system.tables WHERE name = 'memory' and database = currentDatabase();
|
||||
|
||||
/* 3. testing oldest block gets deleted - 9216 bytes - 1100 */
|
||||
INSERT INTO memory SELECT * FROM numbers(9000, 1000);
|
||||
SELECT total_bytes FROM system.tables WHERE name = 'memory' and database = currentDatabase();
|
||||
|
||||
/* 4.check large block over-writes all bytes / rows */
|
||||
INSERT INTO memory SELECT * FROM numbers(9000, 10000);
|
||||
SELECT total_bytes FROM system.tables WHERE name = 'memory' and database = currentDatabase();
|
||||
|
||||
DROP TABLE IF EXISTS memory;
|
||||
CREATE TABLE memory (i UInt32) ENGINE = Memory SETTINGS min_rows_to_keep = 100, max_rows_to_keep = 1000;
|
||||
|
||||
SELECT 'TESTING ROWS';
|
||||
/* 1. add normal number of rows */
|
||||
INSERT INTO memory SELECT * FROM numbers(0, 50);
|
||||
SELECT total_rows FROM system.tables WHERE name = 'memory' and database = currentDatabase();
|
||||
|
||||
/* 2. table should have 1000 */
|
||||
INSERT INTO memory SELECT * FROM numbers(50, 950);
|
||||
SELECT total_rows FROM system.tables WHERE name = 'memory' and database = currentDatabase();
|
||||
|
||||
/* 3. table should have 1020 - removed first 50 */
|
||||
INSERT INTO memory SELECT * FROM numbers(2000, 70);
|
||||
SELECT total_rows FROM system.tables WHERE name = 'memory' and database = currentDatabase();
|
||||
|
||||
/* 4. check large block over-writes all rows */
|
||||
INSERT INTO memory SELECT * FROM numbers(3000, 1100);
|
||||
SELECT total_rows FROM system.tables WHERE name = 'memory' and database = currentDatabase();
|
||||
|
||||
SELECT 'TESTING NO CIRCULAR-BUFFER';
|
||||
DROP TABLE IF EXISTS memory;
|
||||
CREATE TABLE memory (i UInt32) ENGINE = Memory;
|
||||
|
||||
INSERT INTO memory SELECT * FROM numbers(0, 1600);
|
||||
SELECT total_bytes FROM system.tables WHERE name = 'memory' and database = currentDatabase();
|
||||
|
||||
INSERT INTO memory SELECT * FROM numbers(1000, 100);
|
||||
SELECT total_bytes FROM system.tables WHERE name = 'memory' and database = currentDatabase();
|
||||
|
||||
INSERT INTO memory SELECT * FROM numbers(9000, 1000);
|
||||
SELECT total_bytes FROM system.tables WHERE name = 'memory' and database = currentDatabase();
|
||||
|
||||
INSERT INTO memory SELECT * FROM numbers(9000, 10000);
|
||||
SELECT total_bytes FROM system.tables WHERE name = 'memory' and database = currentDatabase();
|
||||
|
||||
SELECT 'TESTING INVALID SETTINGS';
|
||||
CREATE TABLE faulty_memory (i UInt32) ENGINE = Memory SETTINGS min_rows_to_keep = 100; -- { serverError 452 }
|
||||
CREATE TABLE faulty_memory (i UInt32) ENGINE = Memory SETTINGS min_bytes_to_keep = 100; -- { serverError 452 }
|
||||
|
||||
DROP TABLE memory;
|
@ -1,2 +1,2 @@
|
||||
clickhouse_add_executable (check-marks main.cpp)
|
||||
target_link_libraries(check-marks PRIVATE dbms boost::program_options)
|
||||
target_link_libraries(check-marks PRIVATE dbms clickhouse_functions boost::program_options)
|
||||
|
@ -1,2 +1,2 @@
|
||||
clickhouse_add_executable(check-mysql-binlog main.cpp)
|
||||
target_link_libraries(check-mysql-binlog PRIVATE dbms boost::program_options)
|
||||
target_link_libraries(check-mysql-binlog PRIVATE dbms clickhouse_functions boost::program_options)
|
||||
|
@ -4,4 +4,4 @@ if (NOT TARGET ch_contrib::nuraft)
|
||||
endif ()
|
||||
|
||||
clickhouse_add_executable(keeper-data-dumper main.cpp)
|
||||
target_link_libraries(keeper-data-dumper PRIVATE dbms)
|
||||
target_link_libraries(keeper-data-dumper PRIVATE dbms clickhouse_functions)
|
||||
|
Loading…
Reference in New Issue
Block a user