mirror of
https://github.com/ClickHouse/ClickHouse.git
synced 2024-11-22 07:31:57 +00:00
Merge remote-tracking branch 'blessed/master' into disable_adaptative
This commit is contained in:
commit
46f0f385aa
@ -50,9 +50,6 @@ std::optional<uint64_t> getCgroupsV2MemoryLimit()
|
||||
|
||||
}
|
||||
|
||||
/** Returns the size of physical memory (RAM) in bytes.
|
||||
* Returns 0 on unsupported platform
|
||||
*/
|
||||
uint64_t getMemoryAmountOrZero()
|
||||
{
|
||||
int64_t num_pages = sysconf(_SC_PHYS_PAGES);
|
||||
|
@ -2,11 +2,10 @@
|
||||
|
||||
#include <cstdint>
|
||||
|
||||
/** Returns the size of physical memory (RAM) in bytes.
|
||||
* Returns 0 on unsupported platform or if it cannot determine the size of physical memory.
|
||||
*/
|
||||
/// Returns the size in bytes of physical memory (RAM) available to the process. The value can
|
||||
/// be smaller than the total available RAM available to the system due to cgroups settings.
|
||||
/// Returns 0 on unsupported platform or if it cannot determine the size of physical memory.
|
||||
uint64_t getMemoryAmountOrZero();
|
||||
|
||||
/** Throws exception if it cannot determine the size of physical memory.
|
||||
*/
|
||||
/// Throws exception if it cannot determine the size of physical memory.
|
||||
uint64_t getMemoryAmount();
|
||||
|
@ -6,6 +6,11 @@ sidebar_label: JDBC
|
||||
|
||||
# JDBC
|
||||
|
||||
:::note
|
||||
clickhouse-jdbc-bridge contains experimental codes and is no longer supported. It may contain reliability issues and security vulnerabilities. Use it at your own risk.
|
||||
ClickHouse recommend using built-in table functions in ClickHouse which provide a better alternative for ad-hoc querying scenarios (Postgres, MySQL, MongoDB, etc).
|
||||
:::
|
||||
|
||||
Allows ClickHouse to connect to external databases via [JDBC](https://en.wikipedia.org/wiki/Java_Database_Connectivity).
|
||||
|
||||
To implement the JDBC connection, ClickHouse uses the separate program [clickhouse-jdbc-bridge](https://github.com/ClickHouse/clickhouse-jdbc-bridge) that should run as a daemon.
|
||||
|
@ -21,3 +21,79 @@ When restarting a server, data disappears from the table and the table becomes e
|
||||
Normally, using this table engine is not justified. However, it can be used for tests, and for tasks where maximum speed is required on a relatively small number of rows (up to approximately 100,000,000).
|
||||
|
||||
The Memory engine is used by the system for temporary tables with external query data (see the section “External data for processing a query”), and for implementing `GLOBAL IN` (see the section “IN operators”).
|
||||
|
||||
Upper and lower bounds can be specified to limit Memory engine table size, effectively allowing it to act as a circular buffer (see [Engine Parameters](#engine-parameters)).
|
||||
|
||||
## Engine Parameters {#engine-parameters}
|
||||
|
||||
- `min_bytes_to_keep` — Minimum bytes to keep when memory table is size-capped.
|
||||
- Default value: `0`
|
||||
- Requires `max_bytes_to_keep`
|
||||
- `max_bytes_to_keep` — Maximum bytes to keep within memory table where oldest rows are deleted on each insertion (i.e circular buffer). Max bytes can exceed the stated limit if the oldest batch of rows to remove falls under the `min_bytes_to_keep` limit when adding a large block.
|
||||
- Default value: `0`
|
||||
- `min_rows_to_keep` — Minimum rows to keep when memory table is size-capped.
|
||||
- Default value: `0`
|
||||
- Requires `max_rows_to_keep`
|
||||
- `max_rows_to_keep` — Maximum rows to keep within memory table where oldest rows are deleted on each insertion (i.e circular buffer). Max rows can exceed the stated limit if the oldest batch of rows to remove falls under the `min_rows_to_keep` limit when adding a large block.
|
||||
- Default value: `0`
|
||||
|
||||
## Usage {#usage}
|
||||
|
||||
|
||||
**Initialize settings**
|
||||
``` sql
|
||||
CREATE TABLE memory (i UInt32) ENGINE = Memory SETTINGS min_rows_to_keep = 100, max_rows_to_keep = 1000;
|
||||
```
|
||||
|
||||
**Note:** Both `bytes` and `rows` capping parameters can be set at the same time, however, the lower bounds of `max` and `min` will be adhered to.
|
||||
|
||||
## Examples {#examples}
|
||||
``` sql
|
||||
CREATE TABLE memory (i UInt32) ENGINE = Memory SETTINGS min_bytes_to_keep = 4096, max_bytes_to_keep = 16384;
|
||||
|
||||
/* 1. testing oldest block doesn't get deleted due to min-threshold - 3000 rows */
|
||||
INSERT INTO memory SELECT * FROM numbers(0, 1600); -- 8'192 bytes
|
||||
|
||||
/* 2. adding block that doesn't get deleted */
|
||||
INSERT INTO memory SELECT * FROM numbers(1000, 100); -- 1'024 bytes
|
||||
|
||||
/* 3. testing oldest block gets deleted - 9216 bytes - 1100 */
|
||||
INSERT INTO memory SELECT * FROM numbers(9000, 1000); -- 8'192 bytes
|
||||
|
||||
/* 4. checking a very large block overrides all */
|
||||
INSERT INTO memory SELECT * FROM numbers(9000, 10000); -- 65'536 bytes
|
||||
|
||||
SELECT total_bytes, total_rows FROM system.tables WHERE name = 'memory' and database = currentDatabase();
|
||||
```
|
||||
|
||||
``` text
|
||||
┌─total_bytes─┬─total_rows─┐
|
||||
│ 65536 │ 10000 │
|
||||
└─────────────┴────────────┘
|
||||
```
|
||||
|
||||
also, for rows:
|
||||
|
||||
``` sql
|
||||
CREATE TABLE memory (i UInt32) ENGINE = Memory SETTINGS min_rows_to_keep = 4000, max_rows_to_keep = 10000;
|
||||
|
||||
/* 1. testing oldest block doesn't get deleted due to min-threshold - 3000 rows */
|
||||
INSERT INTO memory SELECT * FROM numbers(0, 1600); -- 1'600 rows
|
||||
|
||||
/* 2. adding block that doesn't get deleted */
|
||||
INSERT INTO memory SELECT * FROM numbers(1000, 100); -- 100 rows
|
||||
|
||||
/* 3. testing oldest block gets deleted - 9216 bytes - 1100 */
|
||||
INSERT INTO memory SELECT * FROM numbers(9000, 1000); -- 1'000 rows
|
||||
|
||||
/* 4. checking a very large block overrides all */
|
||||
INSERT INTO memory SELECT * FROM numbers(9000, 10000); -- 10'000 rows
|
||||
|
||||
SELECT total_bytes, total_rows FROM system.tables WHERE name = 'memory' and database = currentDatabase();
|
||||
```
|
||||
|
||||
``` text
|
||||
┌─total_bytes─┬─total_rows─┐
|
||||
│ 65536 │ 10000 │
|
||||
└─────────────┴────────────┘
|
||||
```
|
||||
|
@ -4337,6 +4337,18 @@ Possible values:
|
||||
|
||||
Default value: `0`.
|
||||
|
||||
|
||||
## function_locate_has_mysql_compatible_argument_order {#function-locate-has-mysql-compatible-argument-order}
|
||||
|
||||
Controls the order of arguments in function [locate](../../sql-reference/functions/string-search-functions.md#locate).
|
||||
|
||||
Possible values:
|
||||
|
||||
- 0 — Function `locate` accepts arguments `(haystack, needle[, start_pos])`.
|
||||
- 1 — Function `locate` accepts arguments `(needle, haystack, [, start_pos])` (MySQL-compatible behavior)
|
||||
|
||||
Default value: `1`.
|
||||
|
||||
## date_time_overflow_behavior {#date_time_overflow_behavior}
|
||||
|
||||
Defines the behavior when [Date](../../sql-reference/data-types/date.md), [Date32](../../sql-reference/data-types/date32.md), [DateTime](../../sql-reference/data-types/datetime.md), [DateTime64](../../sql-reference/data-types/datetime64.md) or integers are converted into Date, Date32, DateTime or DateTime64 but the value cannot be represented in the result type.
|
||||
|
@ -30,7 +30,6 @@ position(haystack, needle[, start_pos])
|
||||
|
||||
Alias:
|
||||
- `position(needle IN haystack)`
|
||||
- `locate(haystack, needle[, start_pos])`.
|
||||
|
||||
**Arguments**
|
||||
|
||||
@ -49,7 +48,7 @@ If substring `needle` is empty, these rules apply:
|
||||
- if `start_pos >= 1` and `start_pos <= length(haystack) + 1`: return `start_pos`
|
||||
- otherwise: return `0`
|
||||
|
||||
The same rules also apply to functions `positionCaseInsensitive`, `positionUTF8` and `positionCaseInsensitiveUTF8`
|
||||
The same rules also apply to functions `locate`, `positionCaseInsensitive`, `positionUTF8` and `positionCaseInsensitiveUTF8`.
|
||||
|
||||
Type: `Integer`.
|
||||
|
||||
@ -114,6 +113,21 @@ SELECT
|
||||
└─────────────────────┴────────────────────────┴────────────────────────┴────────────────────────┴────────────────────────┴────────────────────────┴────────────────────────┘
|
||||
```
|
||||
|
||||
## locate
|
||||
|
||||
Like [position](#position) but with arguments `haystack` and `locate` switched.
|
||||
|
||||
The behavior of this function depends on the ClickHouse version:
|
||||
- in versions < v24.3, `locate` was an alias of function `position` and accepted arguments `(haystack, needle[, start_pos])`.
|
||||
- in versions >= 24.3,, `locate` is an individual function (for better compatibility with MySQL) and accepts arguments `(needle, haystack[, start_pos])`. The previous behavior
|
||||
can be restored using setting [function_locate_has_mysql_compatible_argument_order = false](../../operations/settings/settings.md#function-locate-has-mysql-compatible-argument-order);
|
||||
|
||||
**Syntax**
|
||||
|
||||
``` sql
|
||||
locate(needle, haystack[, start_pos])
|
||||
```
|
||||
|
||||
## positionCaseInsensitive
|
||||
|
||||
Like [position](#position) but searches case-insensitively.
|
||||
|
@ -6,6 +6,11 @@ sidebar_label: jdbc
|
||||
|
||||
# jdbc
|
||||
|
||||
:::note
|
||||
clickhouse-jdbc-bridge contains experimental codes and is no longer supported. It may contain reliability issues and security vulnerabilities. Use it at your own risk.
|
||||
ClickHouse recommend using built-in table functions in ClickHouse which provide a better alternative for ad-hoc querying scenarios (Postgres, MySQL, MongoDB, etc).
|
||||
:::
|
||||
|
||||
`jdbc(datasource, schema, table)` - returns table that is connected via JDBC driver.
|
||||
|
||||
This table function requires separate [clickhouse-jdbc-bridge](https://github.com/ClickHouse/clickhouse-jdbc-bridge) program to be running.
|
||||
|
@ -10,6 +10,7 @@
|
||||
#include <IO/UseSSL.h>
|
||||
#include <Core/ServerUUID.h>
|
||||
#include <Common/logger_useful.h>
|
||||
#include <Common/CgroupsMemoryUsageObserver.h>
|
||||
#include <Common/ErrorHandlers.h>
|
||||
#include <Common/assertProcessUserMatchesDataOwner.h>
|
||||
#include <Common/makeSocketAddress.h>
|
||||
@ -623,6 +624,25 @@ try
|
||||
buildLoggers(config(), logger());
|
||||
main_config_reloader->start();
|
||||
|
||||
std::optional<CgroupsMemoryUsageObserver> cgroups_memory_usage_observer;
|
||||
try
|
||||
{
|
||||
auto wait_time = config().getUInt64("keeper_server.cgroups_memory_observer_wait_time", 15);
|
||||
if (wait_time != 0)
|
||||
{
|
||||
cgroups_memory_usage_observer.emplace(std::chrono::seconds(wait_time));
|
||||
/// Not calling cgroups_memory_usage_observer->setLimits() here (as for the normal ClickHouse server) because Keeper controls
|
||||
/// its memory usage by other means (via setting 'max_memory_usage_soft_limit').
|
||||
cgroups_memory_usage_observer->setOnMemoryAmountAvailableChangedFn([&]() { main_config_reloader->reload(); });
|
||||
cgroups_memory_usage_observer->startThread();
|
||||
}
|
||||
}
|
||||
catch (Exception &)
|
||||
{
|
||||
tryLogCurrentException(log, "Disabling cgroup memory observer because of an error during initialization");
|
||||
}
|
||||
|
||||
|
||||
LOG_INFO(log, "Ready for connections.");
|
||||
|
||||
waitForTerminationRequest();
|
||||
|
@ -1296,7 +1296,7 @@ try
|
||||
std::optional<CgroupsMemoryUsageObserver> cgroups_memory_usage_observer;
|
||||
try
|
||||
{
|
||||
UInt64 wait_time = server_settings.cgroups_memory_usage_observer_wait_time;
|
||||
auto wait_time = server_settings.cgroups_memory_usage_observer_wait_time;
|
||||
if (wait_time != 0)
|
||||
cgroups_memory_usage_observer.emplace(std::chrono::seconds(wait_time));
|
||||
}
|
||||
@ -1362,7 +1362,7 @@ try
|
||||
{
|
||||
double hard_limit_ratio = new_server_settings.cgroup_memory_watcher_hard_limit_ratio;
|
||||
double soft_limit_ratio = new_server_settings.cgroup_memory_watcher_soft_limit_ratio;
|
||||
cgroups_memory_usage_observer->setLimits(
|
||||
cgroups_memory_usage_observer->setMemoryUsageLimits(
|
||||
static_cast<uint64_t>(max_server_memory_usage * hard_limit_ratio),
|
||||
static_cast<uint64_t>(max_server_memory_usage * soft_limit_ratio));
|
||||
}
|
||||
@ -1720,6 +1720,12 @@ try
|
||||
throw;
|
||||
}
|
||||
|
||||
if (cgroups_memory_usage_observer)
|
||||
{
|
||||
cgroups_memory_usage_observer->setOnMemoryAmountAvailableChangedFn([&]() { main_config_reloader->reload(); });
|
||||
cgroups_memory_usage_observer->startThread();
|
||||
}
|
||||
|
||||
/// Reload config in SYSTEM RELOAD CONFIG query.
|
||||
global_context->setConfigReloadCallback([&]()
|
||||
{
|
||||
|
@ -39,7 +39,7 @@ void logAboutProgress(LoggerPtr log, size_t processed, size_t total, AtomicStopw
|
||||
{
|
||||
if (total && (processed % PRINT_MESSAGE_EACH_N_OBJECTS == 0 || watch.compareAndRestart(PRINT_MESSAGE_EACH_N_SECONDS)))
|
||||
{
|
||||
LOG_INFO(log, "Processed: {}%", static_cast<Int64>(processed * 1000.0 / total) * 0.1);
|
||||
LOG_INFO(log, "Processed: {:.1f}%", static_cast<double>(processed) * 100.0 / total);
|
||||
watch.restart();
|
||||
}
|
||||
}
|
||||
|
@ -9,6 +9,7 @@
|
||||
#include <IO/ReadBufferFromFileDescriptor.h>
|
||||
#include <IO/ReadHelpers.h>
|
||||
#include <base/cgroupsv2.h>
|
||||
#include <base/getMemoryAmount.h>
|
||||
#include <base/sleep.h>
|
||||
|
||||
#include <filesystem>
|
||||
@ -36,7 +37,7 @@ namespace ErrorCodes
|
||||
CgroupsMemoryUsageObserver::CgroupsMemoryUsageObserver(std::chrono::seconds wait_time_)
|
||||
: log(getLogger("CgroupsMemoryUsageObserver"))
|
||||
, wait_time(wait_time_)
|
||||
, file(log)
|
||||
, memory_usage_file(log)
|
||||
{
|
||||
LOG_INFO(log, "Initialized cgroups memory limit observer, wait time is {} sec", wait_time.count());
|
||||
}
|
||||
@ -46,13 +47,13 @@ CgroupsMemoryUsageObserver::~CgroupsMemoryUsageObserver()
|
||||
stopThread();
|
||||
}
|
||||
|
||||
void CgroupsMemoryUsageObserver::setLimits(uint64_t hard_limit_, uint64_t soft_limit_)
|
||||
void CgroupsMemoryUsageObserver::setMemoryUsageLimits(uint64_t hard_limit_, uint64_t soft_limit_)
|
||||
{
|
||||
std::lock_guard<std::mutex> limit_lock(limit_mutex);
|
||||
|
||||
if (hard_limit_ == hard_limit && soft_limit_ == soft_limit)
|
||||
return;
|
||||
|
||||
stopThread();
|
||||
|
||||
hard_limit = hard_limit_;
|
||||
soft_limit = soft_limit_;
|
||||
|
||||
@ -83,10 +84,10 @@ void CgroupsMemoryUsageObserver::setLimits(uint64_t hard_limit_, uint64_t soft_l
|
||||
mallctl("arena." STRINGIFY(MALLCTL_ARENAS_ALL) ".purge", nullptr, nullptr, nullptr, 0);
|
||||
#endif
|
||||
/// Reset current usage in memory tracker. Expect zero for free_memory_in_allocator_arenas as we just purged them.
|
||||
uint64_t current_usage = readMemoryUsage();
|
||||
MemoryTracker::setRSS(current_usage, 0);
|
||||
uint64_t memory_usage = memory_usage_file.readMemoryUsage();
|
||||
MemoryTracker::setRSS(memory_usage, 0);
|
||||
|
||||
LOG_INFO(log, "Purged jemalloc arenas. Current memory usage is {}", ReadableSize(current_usage));
|
||||
LOG_INFO(log, "Purged jemalloc arenas. Current memory usage is {}", ReadableSize(memory_usage));
|
||||
}
|
||||
else
|
||||
{
|
||||
@ -94,14 +95,13 @@ void CgroupsMemoryUsageObserver::setLimits(uint64_t hard_limit_, uint64_t soft_l
|
||||
}
|
||||
};
|
||||
|
||||
startThread();
|
||||
|
||||
LOG_INFO(log, "Set new limits, soft limit: {}, hard limit: {}", ReadableSize(soft_limit_), ReadableSize(hard_limit_));
|
||||
}
|
||||
|
||||
uint64_t CgroupsMemoryUsageObserver::readMemoryUsage() const
|
||||
void CgroupsMemoryUsageObserver::setOnMemoryAmountAvailableChangedFn(OnMemoryAmountAvailableChangedFn on_memory_amount_available_changed_)
|
||||
{
|
||||
return file.readMemoryUsage();
|
||||
std::lock_guard<std::mutex> memory_amount_available_changed_lock(memory_amount_available_changed_mutex);
|
||||
on_memory_amount_available_changed = on_memory_amount_available_changed_;
|
||||
}
|
||||
|
||||
namespace
|
||||
@ -163,7 +163,7 @@ std::pair<std::string, CgroupsMemoryUsageObserver::CgroupsVersion> getCgroupsFil
|
||||
|
||||
}
|
||||
|
||||
CgroupsMemoryUsageObserver::File::File(LoggerPtr log_)
|
||||
CgroupsMemoryUsageObserver::MemoryUsageFile::MemoryUsageFile(LoggerPtr log_)
|
||||
: log(log_)
|
||||
{
|
||||
std::tie(file_name, version) = getCgroupsFileName();
|
||||
@ -177,7 +177,7 @@ CgroupsMemoryUsageObserver::File::File(LoggerPtr log_)
|
||||
file_name, "Cannot open file '{}'", file_name);
|
||||
}
|
||||
|
||||
CgroupsMemoryUsageObserver::File::~File()
|
||||
CgroupsMemoryUsageObserver::MemoryUsageFile::~MemoryUsageFile()
|
||||
{
|
||||
assert(fd != -1);
|
||||
if (::close(fd) != 0)
|
||||
@ -195,7 +195,7 @@ CgroupsMemoryUsageObserver::File::~File()
|
||||
}
|
||||
}
|
||||
|
||||
uint64_t CgroupsMemoryUsageObserver::File::readMemoryUsage() const
|
||||
uint64_t CgroupsMemoryUsageObserver::MemoryUsageFile::readMemoryUsage() const
|
||||
{
|
||||
/// File read is probably not read is thread-safe, just to be sure
|
||||
std::lock_guard lock(mutex);
|
||||
@ -278,6 +278,9 @@ void CgroupsMemoryUsageObserver::runThread()
|
||||
{
|
||||
setThreadName("CgrpMemUsgObsr");
|
||||
|
||||
last_available_memory_amount = getMemoryAmount();
|
||||
LOG_INFO(log, "Memory amount initially available to the process is {}", ReadableSize(last_available_memory_amount));
|
||||
|
||||
std::unique_lock lock(thread_mutex);
|
||||
while (true)
|
||||
{
|
||||
@ -286,8 +289,42 @@ void CgroupsMemoryUsageObserver::runThread()
|
||||
|
||||
try
|
||||
{
|
||||
uint64_t memory_usage = file.readMemoryUsage();
|
||||
processMemoryUsage(memory_usage);
|
||||
uint64_t available_memory_amount = getMemoryAmount();
|
||||
if (available_memory_amount != last_available_memory_amount)
|
||||
{
|
||||
LOG_INFO(log, "Memory amount available to the process changed from {} to {}", ReadableSize(last_available_memory_amount), ReadableSize(available_memory_amount));
|
||||
last_available_memory_amount = available_memory_amount;
|
||||
std::lock_guard<std::mutex> memory_amount_available_changed_lock(memory_amount_available_changed_mutex);
|
||||
on_memory_amount_available_changed();
|
||||
}
|
||||
|
||||
std::lock_guard<std::mutex> limit_lock(limit_mutex);
|
||||
if (soft_limit > 0 && hard_limit > 0)
|
||||
{
|
||||
uint64_t memory_usage = memory_usage_file.readMemoryUsage();
|
||||
if (memory_usage > hard_limit)
|
||||
{
|
||||
if (last_memory_usage <= hard_limit)
|
||||
on_hard_limit(true);
|
||||
}
|
||||
else
|
||||
{
|
||||
if (last_memory_usage > hard_limit)
|
||||
on_hard_limit(false);
|
||||
}
|
||||
|
||||
if (memory_usage > soft_limit)
|
||||
{
|
||||
if (last_memory_usage <= soft_limit)
|
||||
on_soft_limit(true);
|
||||
}
|
||||
else
|
||||
{
|
||||
if (last_memory_usage > soft_limit)
|
||||
on_soft_limit(false);
|
||||
}
|
||||
last_memory_usage = memory_usage;
|
||||
}
|
||||
}
|
||||
catch (...)
|
||||
{
|
||||
@ -296,33 +333,6 @@ void CgroupsMemoryUsageObserver::runThread()
|
||||
}
|
||||
}
|
||||
|
||||
void CgroupsMemoryUsageObserver::processMemoryUsage(uint64_t current_usage)
|
||||
{
|
||||
if (current_usage > hard_limit)
|
||||
{
|
||||
if (last_usage <= hard_limit)
|
||||
on_hard_limit(true);
|
||||
}
|
||||
else
|
||||
{
|
||||
if (last_usage > hard_limit)
|
||||
on_hard_limit(false);
|
||||
}
|
||||
|
||||
if (current_usage > soft_limit)
|
||||
{
|
||||
if (last_usage <= soft_limit)
|
||||
on_soft_limit(true);
|
||||
}
|
||||
else
|
||||
{
|
||||
if (last_usage > soft_limit)
|
||||
on_soft_limit(false);
|
||||
}
|
||||
|
||||
last_usage = current_usage;
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
#endif
|
||||
|
@ -2,57 +2,71 @@
|
||||
|
||||
#include <Common/ThreadPool.h>
|
||||
|
||||
#include <atomic>
|
||||
#include <chrono>
|
||||
#include <mutex>
|
||||
|
||||
namespace DB
|
||||
{
|
||||
|
||||
/// Periodically reads the current memory usage from Linux cgroups.
|
||||
/// You can specify soft or hard memory limits:
|
||||
/// - When the soft memory limit is hit, drop jemalloc cache.
|
||||
/// - When the hard memory limit is hit, update MemoryTracking metric to throw memory exceptions faster.
|
||||
/// Does two things:
|
||||
/// 1. Periodically reads the memory usage of the process from Linux cgroups.
|
||||
/// You can specify soft or hard memory limits:
|
||||
/// - When the soft memory limit is hit, drop jemalloc cache.
|
||||
/// - When the hard memory limit is hit, update MemoryTracking metric to throw memory exceptions faster.
|
||||
/// The goal of this is to avoid that the process hits the maximum allowed memory limit at which there is a good
|
||||
/// chance that the Limux OOM killer terminates it. All of this is done is because internal memory tracking in
|
||||
/// ClickHouse can unfortunately under-estimate the actually used memory.
|
||||
/// 2. Periodically reads the the maximum memory available to the process (which can change due to cgroups settings).
|
||||
/// You can specify a callback to react on changes. The callback typically reloads the configuration, i.e. Server
|
||||
/// or Keeper configuration file. This reloads settings 'max_server_memory_usage' (Server) and 'max_memory_usage_soft_limit'
|
||||
/// (Keeper) from which various other internal limits are calculated, including the soft and hard limits for (1.).
|
||||
/// The goal of this is to provide elasticity when the container is scaled-up/scaled-down. The mechanism (polling
|
||||
/// cgroups) is quite implicit, unfortunately there is currently no better way to communicate memory threshold changes
|
||||
/// to the database.
|
||||
#if defined(OS_LINUX)
|
||||
class CgroupsMemoryUsageObserver
|
||||
{
|
||||
public:
|
||||
using OnMemoryLimitFn = std::function<void(bool)>;
|
||||
using OnMemoryAmountAvailableChangedFn = std::function<void()>;
|
||||
|
||||
enum class CgroupsVersion
|
||||
{
|
||||
V1,
|
||||
V2
|
||||
|
||||
};
|
||||
|
||||
explicit CgroupsMemoryUsageObserver(std::chrono::seconds wait_time_);
|
||||
~CgroupsMemoryUsageObserver();
|
||||
|
||||
void setLimits(uint64_t hard_limit_, uint64_t soft_limit_);
|
||||
void setMemoryUsageLimits(uint64_t hard_limit_, uint64_t soft_limit_);
|
||||
void setOnMemoryAmountAvailableChangedFn(OnMemoryAmountAvailableChangedFn on_memory_amount_available_changed_);
|
||||
|
||||
size_t getHardLimit() const { return hard_limit; }
|
||||
size_t getSoftLimit() const { return soft_limit; }
|
||||
|
||||
uint64_t readMemoryUsage() const;
|
||||
void startThread();
|
||||
|
||||
private:
|
||||
LoggerPtr log;
|
||||
|
||||
std::atomic<size_t> hard_limit = 0;
|
||||
std::atomic<size_t> soft_limit = 0;
|
||||
|
||||
const std::chrono::seconds wait_time;
|
||||
|
||||
using CallbackFn = std::function<void(bool)>;
|
||||
CallbackFn on_hard_limit;
|
||||
CallbackFn on_soft_limit;
|
||||
std::mutex limit_mutex;
|
||||
size_t hard_limit TSA_GUARDED_BY(limit_mutex) = 0;
|
||||
size_t soft_limit TSA_GUARDED_BY(limit_mutex) = 0;
|
||||
OnMemoryLimitFn on_hard_limit TSA_GUARDED_BY(limit_mutex);
|
||||
OnMemoryLimitFn on_soft_limit TSA_GUARDED_BY(limit_mutex);
|
||||
|
||||
uint64_t last_usage = 0;
|
||||
std::mutex memory_amount_available_changed_mutex;
|
||||
OnMemoryAmountAvailableChangedFn on_memory_amount_available_changed TSA_GUARDED_BY(memory_amount_available_changed_mutex);
|
||||
|
||||
uint64_t last_memory_usage = 0; /// how much memory does the process use
|
||||
uint64_t last_available_memory_amount; /// how much memory can the process use
|
||||
|
||||
/// Represents the cgroup virtual file that shows the memory consumption of the process's cgroup.
|
||||
struct File
|
||||
struct MemoryUsageFile
|
||||
{
|
||||
public:
|
||||
explicit File(LoggerPtr log_);
|
||||
~File();
|
||||
explicit MemoryUsageFile(LoggerPtr log_);
|
||||
~MemoryUsageFile();
|
||||
uint64_t readMemoryUsage() const;
|
||||
private:
|
||||
LoggerPtr log;
|
||||
@ -62,13 +76,11 @@ private:
|
||||
std::string file_name;
|
||||
};
|
||||
|
||||
File file;
|
||||
MemoryUsageFile memory_usage_file;
|
||||
|
||||
void startThread();
|
||||
void stopThread();
|
||||
|
||||
void runThread();
|
||||
void processMemoryUsage(uint64_t usage);
|
||||
|
||||
std::mutex thread_mutex;
|
||||
std::condition_variable cond;
|
||||
@ -79,13 +91,13 @@ private:
|
||||
#else
|
||||
class CgroupsMemoryUsageObserver
|
||||
{
|
||||
using OnMemoryAmountAvailableChangedFn = std::function<void()>;
|
||||
public:
|
||||
explicit CgroupsMemoryUsageObserver(std::chrono::seconds) {}
|
||||
|
||||
void setLimits(uint64_t, uint64_t) {}
|
||||
size_t readMemoryUsage() { return 0; }
|
||||
size_t getHardLimit() { return 0; }
|
||||
size_t getSoftLimit() { return 0; }
|
||||
void setMemoryUsageLimits(uint64_t, uint64_t) {}
|
||||
void setOnMemoryAmountAvailableChangedFn(OnMemoryAmountAvailableChangedFn) {}
|
||||
void startThread() {}
|
||||
};
|
||||
#endif
|
||||
|
||||
|
@ -25,6 +25,18 @@ inline bool isFinite(T x)
|
||||
return true;
|
||||
}
|
||||
|
||||
template <typename T>
|
||||
bool canConvertTo(Float64 x)
|
||||
{
|
||||
if constexpr (std::is_floating_point_v<T>)
|
||||
return true;
|
||||
if (!isFinite(x))
|
||||
return false;
|
||||
if (x > Float64(std::numeric_limits<T>::max()) || x < Float64(std::numeric_limits<T>::lowest()))
|
||||
return false;
|
||||
|
||||
return true;
|
||||
}
|
||||
|
||||
template <typename T>
|
||||
T NaNOrZero()
|
||||
|
@ -175,6 +175,7 @@ class IColumn;
|
||||
M(Bool, enable_positional_arguments, true, "Enable positional arguments in ORDER BY, GROUP BY and LIMIT BY", 0) \
|
||||
M(Bool, enable_extended_results_for_datetime_functions, false, "Enable date functions like toLastDayOfMonth return Date32 results (instead of Date results) for Date32/DateTime64 arguments.", 0) \
|
||||
M(Bool, allow_nonconst_timezone_arguments, false, "Allow non-const timezone arguments in certain time-related functions like toTimeZone(), fromUnixTimestamp*(), snowflakeToDateTime*()", 0) \
|
||||
M(Bool, function_locate_has_mysql_compatible_argument_order, true, "Function locate() has arguments (needle, haystack[, start_pos]) like in MySQL instead of (haystack, needle[, start_pos]) like function position()", 0) \
|
||||
\
|
||||
M(Bool, group_by_use_nulls, false, "Treat columns mentioned in ROLLUP, CUBE or GROUPING SETS as Nullable", 0) \
|
||||
\
|
||||
|
@ -94,6 +94,7 @@ static std::map<ClickHouseVersion, SettingsChangesHistory::SettingsChanges> sett
|
||||
{"input_format_json_use_string_type_for_ambiguous_paths_in_named_tuples_inference_from_objects", false, false, "Allow to use String type for ambiguous paths during named tuple inference from JSON objects"},
|
||||
{"throw_if_deduplication_in_dependent_materialized_views_enabled_with_async_insert", false, true, "Deduplication is dependent materialized view cannot work together with async inserts."},
|
||||
{"parallel_replicas_allow_in_with_subquery", false, true, "If true, subquery for IN will be executed on every follower replica"},
|
||||
{"function_locate_has_mysql_compatible_argument_order", false, true, "Increase compatibility with MySQL's locate function."},
|
||||
{"filesystem_cache_reserve_space_wait_lock_timeout_milliseconds", 1000, 1000, "Wait time to lock cache for sapce reservation in filesystem cache"},
|
||||
{"max_parser_backtracks", 0, 1000000, "Limiting the complexity of parsing"},
|
||||
}},
|
||||
|
@ -440,10 +440,22 @@ void DatabaseOrdinary::stopLoading()
|
||||
|
||||
DatabaseTablesIteratorPtr DatabaseOrdinary::getTablesIterator(ContextPtr local_context, const DatabaseOnDisk::FilterByNameFunction & filter_by_table_name) const
|
||||
{
|
||||
auto result = DatabaseWithOwnTablesBase::getTablesIterator(local_context, filter_by_table_name);
|
||||
std::scoped_lock lock(mutex);
|
||||
typeid_cast<DatabaseTablesSnapshotIterator &>(*result).setLoadTasks(startup_table);
|
||||
return result;
|
||||
// Wait for every table (matching the filter) to be loaded and started up before we make the snapshot.
|
||||
// It is important, because otherwise table might be:
|
||||
// - not attached and thus will be missed in the snapshot;
|
||||
// - not started, which is not good for DDL operations.
|
||||
LoadTaskPtrs tasks_to_wait;
|
||||
{
|
||||
std::lock_guard lock(mutex);
|
||||
if (!filter_by_table_name)
|
||||
tasks_to_wait.reserve(startup_table.size());
|
||||
for (const auto & [table_name, task] : startup_table)
|
||||
if (!filter_by_table_name || filter_by_table_name(table_name))
|
||||
tasks_to_wait.emplace_back(task);
|
||||
}
|
||||
waitLoad(currentPoolOr(TablesLoaderForegroundPoolId), tasks_to_wait);
|
||||
|
||||
return DatabaseWithOwnTablesBase::getTablesIterator(local_context, filter_by_table_name);
|
||||
}
|
||||
|
||||
void DatabaseOrdinary::alterTable(ContextPtr local_context, const StorageID & table_id, const StorageInMemoryMetadata & metadata)
|
||||
|
@ -77,17 +77,12 @@ private:
|
||||
Tables tables;
|
||||
Tables::iterator it;
|
||||
|
||||
// Tasks to wait before returning a table
|
||||
using Tasks = std::unordered_map<String, LoadTaskPtr>;
|
||||
Tasks tasks;
|
||||
|
||||
protected:
|
||||
DatabaseTablesSnapshotIterator(DatabaseTablesSnapshotIterator && other) noexcept
|
||||
: IDatabaseTablesIterator(std::move(other.database_name))
|
||||
{
|
||||
size_t idx = std::distance(other.tables.begin(), other.it);
|
||||
std::swap(tables, other.tables);
|
||||
std::swap(tasks, other.tasks);
|
||||
other.it = other.tables.end();
|
||||
it = tables.begin();
|
||||
std::advance(it, idx);
|
||||
@ -110,17 +105,7 @@ public:
|
||||
|
||||
const String & name() const override { return it->first; }
|
||||
|
||||
const StoragePtr & table() const override
|
||||
{
|
||||
if (auto task = tasks.find(it->first); task != tasks.end())
|
||||
waitLoad(currentPoolOr(TablesLoaderForegroundPoolId), task->second);
|
||||
return it->second;
|
||||
}
|
||||
|
||||
void setLoadTasks(const Tasks & tasks_)
|
||||
{
|
||||
tasks = tasks_;
|
||||
}
|
||||
const StoragePtr & table() const override { return it->second; }
|
||||
};
|
||||
|
||||
using DatabaseTablesIteratorPtr = std::unique_ptr<IDatabaseTablesIterator>;
|
||||
|
@ -22,13 +22,13 @@ namespace DB
|
||||
* positionCaseInsensitive(haystack, needle)
|
||||
* positionCaseInsensitiveUTF8(haystack, needle)
|
||||
*
|
||||
* like(haystack, pattern) - search by the regular expression LIKE; Returns 0 or 1. Case-insensitive, but only for Latin.
|
||||
* notLike(haystack, pattern)
|
||||
* like(haystack, needle) - search by the regular expression LIKE; Returns 0 or 1. Case-insensitive, but only for Latin.
|
||||
* notLike(haystack, needle)
|
||||
*
|
||||
* ilike(haystack, pattern) - like 'like' but case-insensitive
|
||||
* notIlike(haystack, pattern)
|
||||
* ilike(haystack, needle) - like 'like' but case-insensitive
|
||||
* notIlike(haystack, needle)
|
||||
*
|
||||
* match(haystack, pattern) - search by regular expression re2; Returns 0 or 1.
|
||||
* match(haystack, needle) - search by regular expression re2; Returns 0 or 1.
|
||||
*
|
||||
* countSubstrings(haystack, needle) -- count number of occurrences of needle in haystack.
|
||||
* countSubstringsCaseInsensitive(haystack, needle)
|
||||
@ -53,7 +53,7 @@ namespace DB
|
||||
* - the first subpattern, if the regexp has a subpattern;
|
||||
* - the zero subpattern (the match part, otherwise);
|
||||
* - if not match - an empty string.
|
||||
* extract(haystack, pattern)
|
||||
* extract(haystack, needle)
|
||||
*/
|
||||
|
||||
namespace ErrorCodes
|
||||
@ -69,13 +69,39 @@ enum class ExecutionErrorPolicy
|
||||
Throw
|
||||
};
|
||||
|
||||
template <typename Impl, ExecutionErrorPolicy execution_error_policy = ExecutionErrorPolicy::Throw>
|
||||
enum class HaystackNeedleOrderIsConfigurable
|
||||
{
|
||||
No, /// function arguments are always: (haystack, needle[, position])
|
||||
Yes /// depending on a setting, the function arguments are (haystack, needle[, position]) or (needle, haystack[, position])
|
||||
};
|
||||
|
||||
template <typename Impl,
|
||||
ExecutionErrorPolicy execution_error_policy = ExecutionErrorPolicy::Throw,
|
||||
HaystackNeedleOrderIsConfigurable haystack_needle_order_is_configurable = HaystackNeedleOrderIsConfigurable::No>
|
||||
class FunctionsStringSearch : public IFunction
|
||||
{
|
||||
private:
|
||||
enum class ArgumentOrder
|
||||
{
|
||||
HaystackNeedle,
|
||||
NeedleHaystack
|
||||
};
|
||||
|
||||
ArgumentOrder argument_order = ArgumentOrder::HaystackNeedle;
|
||||
|
||||
public:
|
||||
static constexpr auto name = Impl::name;
|
||||
|
||||
static FunctionPtr create(ContextPtr) { return std::make_shared<FunctionsStringSearch>(); }
|
||||
static FunctionPtr create(ContextPtr context) { return std::make_shared<FunctionsStringSearch>(context); }
|
||||
|
||||
explicit FunctionsStringSearch([[maybe_unused]] ContextPtr context)
|
||||
{
|
||||
if constexpr (haystack_needle_order_is_configurable == HaystackNeedleOrderIsConfigurable::Yes)
|
||||
{
|
||||
if (context->getSettingsRef().function_locate_has_mysql_compatible_argument_order)
|
||||
argument_order = ArgumentOrder::NeedleHaystack;
|
||||
}
|
||||
}
|
||||
|
||||
String getName() const override { return name; }
|
||||
|
||||
@ -105,13 +131,16 @@ public:
|
||||
"Number of arguments for function {} doesn't match: passed {}, should be 2 or 3",
|
||||
getName(), arguments.size());
|
||||
|
||||
if (!isStringOrFixedString(arguments[0]))
|
||||
const auto & haystack_type = (argument_order == ArgumentOrder::HaystackNeedle) ? arguments[0] : arguments[1];
|
||||
const auto & needle_type = (argument_order == ArgumentOrder::HaystackNeedle) ? arguments[1] : arguments[0];
|
||||
|
||||
if (!isStringOrFixedString(haystack_type))
|
||||
throw Exception(
|
||||
ErrorCodes::ILLEGAL_TYPE_OF_ARGUMENT,
|
||||
"Illegal type {} of argument of function {}",
|
||||
arguments[0]->getName(), getName());
|
||||
|
||||
if (!isString(arguments[1]))
|
||||
if (!isString(needle_type))
|
||||
throw Exception(
|
||||
ErrorCodes::ILLEGAL_TYPE_OF_ARGUMENT,
|
||||
"Illegal type {} of argument of function {}",
|
||||
@ -135,8 +164,8 @@ public:
|
||||
|
||||
ColumnPtr executeImpl(const ColumnsWithTypeAndName & arguments, const DataTypePtr & result_type, size_t /*input_rows_count*/) const override
|
||||
{
|
||||
const ColumnPtr & column_haystack = arguments[0].column;
|
||||
const ColumnPtr & column_needle = arguments[1].column;
|
||||
const ColumnPtr & column_haystack = (argument_order == ArgumentOrder::HaystackNeedle) ? arguments[0].column : arguments[1].column;
|
||||
const ColumnPtr & column_needle = (argument_order == ArgumentOrder::HaystackNeedle) ? arguments[1].column : arguments[0].column;
|
||||
|
||||
ColumnPtr column_start_pos = nullptr;
|
||||
if (arguments.size() >= 3)
|
||||
|
@ -213,6 +213,7 @@ struct MapToSubcolumnAdapter : public MapAdapterBase<MapToSubcolumnAdapter<Name,
|
||||
class FunctionMapKeyLike : public IFunction
|
||||
{
|
||||
public:
|
||||
FunctionMapKeyLike() : impl(/*context*/ nullptr) {} /// nullptr because getting a context here is hard and FunctionLike doesn't need context
|
||||
String getName() const override { return "mapKeyLike"; }
|
||||
size_t getNumberOfArguments() const override { return 3; }
|
||||
bool isSuitableForShortCircuitArgumentsExecution(const DataTypesWithConstInfo & /*arguments*/) const override { return false; }
|
||||
|
@ -1,5 +1,5 @@
|
||||
#include "FunctionFactory.h"
|
||||
#include "like.h"
|
||||
#include "FunctionFactory.h"
|
||||
|
||||
|
||||
namespace DB
|
||||
|
34
src/Functions/locate.cpp
Normal file
34
src/Functions/locate.cpp
Normal file
@ -0,0 +1,34 @@
|
||||
#include "FunctionsStringSearch.h"
|
||||
#include "FunctionFactory.h"
|
||||
#include "PositionImpl.h"
|
||||
|
||||
|
||||
namespace DB
|
||||
{
|
||||
namespace
|
||||
{
|
||||
|
||||
struct NameLocate
|
||||
{
|
||||
static constexpr auto name = "locate";
|
||||
};
|
||||
|
||||
using FunctionLocate = FunctionsStringSearch<PositionImpl<NameLocate, PositionCaseSensitiveASCII>, ExecutionErrorPolicy::Throw, HaystackNeedleOrderIsConfigurable::Yes>;
|
||||
|
||||
}
|
||||
|
||||
REGISTER_FUNCTION(Locate)
|
||||
{
|
||||
FunctionDocumentation::Description doc_description = "Like function `position` but with arguments `haystack` and `locate` switched. The behavior of this function depends on the ClickHouse version: In versions < v24.3, `locate` was an alias of function `position` and accepted arguments `(haystack, needle[, start_pos])`. In versions >= 24.3,, `locate` is an individual function (for better compatibility with MySQL) and accepts arguments `(needle, haystack[, start_pos])`. The previous behaviorcan be restored using setting `function_locate_has_mysql_compatible_argument_order = false`.";
|
||||
FunctionDocumentation::Syntax doc_syntax = "location(needle, haystack[, start_pos])";
|
||||
FunctionDocumentation::Arguments doc_arguments = {{"needle", "Substring to be searched (String)"},
|
||||
{"haystack", "String in which the search is performed (String)."},
|
||||
{"start_pos", "Position (1-based) in `haystack` at which the search starts (UInt*)."}};
|
||||
FunctionDocumentation::ReturnedValue doc_returned_value = "Starting position in bytes and counting from 1, if the substring was found. 0, if the substring was not found.";
|
||||
FunctionDocumentation::Examples doc_examples = {{"Example", "SELECT locate('abcabc', 'ca');", "3"}};
|
||||
FunctionDocumentation::Categories doc_categories = {"String search"};
|
||||
|
||||
|
||||
factory.registerFunction<FunctionLocate>({doc_description, doc_syntax, doc_arguments, doc_returned_value, doc_examples, doc_categories}, FunctionFactory::CaseInsensitive);
|
||||
}
|
||||
}
|
@ -20,6 +20,5 @@ using FunctionPosition = FunctionsStringSearch<PositionImpl<NamePosition, Positi
|
||||
REGISTER_FUNCTION(Position)
|
||||
{
|
||||
factory.registerFunction<FunctionPosition>({}, FunctionFactory::CaseInsensitive);
|
||||
factory.registerAlias("locate", NamePosition::name, FunctionFactory::CaseInsensitive);
|
||||
}
|
||||
}
|
||||
|
@ -148,19 +148,25 @@ AsynchronousInsertQueue::InsertData::Entry::Entry(
|
||||
{
|
||||
}
|
||||
|
||||
void AsynchronousInsertQueue::InsertData::Entry::resetChunk()
|
||||
{
|
||||
if (chunk.empty())
|
||||
return;
|
||||
|
||||
// To avoid races on counter of user's MemoryTracker we should free memory at this moment.
|
||||
// Entries data must be destroyed in context of user who runs async insert.
|
||||
// Each entry in the list may correspond to a different user,
|
||||
// so we need to switch current thread's MemoryTracker.
|
||||
MemoryTrackerSwitcher switcher(user_memory_tracker);
|
||||
chunk = {};
|
||||
}
|
||||
|
||||
void AsynchronousInsertQueue::InsertData::Entry::finish(std::exception_ptr exception_)
|
||||
{
|
||||
if (finished.exchange(true))
|
||||
return;
|
||||
|
||||
{
|
||||
// To avoid races on counter of user's MemoryTracker we should free memory at this moment.
|
||||
// Entries data must be destroyed in context of user who runs async insert.
|
||||
// Each entry in the list may correspond to a different user,
|
||||
// so we need to switch current thread's MemoryTracker.
|
||||
MemoryTrackerSwitcher switcher(user_memory_tracker);
|
||||
chunk = {};
|
||||
}
|
||||
resetChunk();
|
||||
|
||||
if (exception_)
|
||||
{
|
||||
@ -224,7 +230,7 @@ AsynchronousInsertQueue::~AsynchronousInsertQueue()
|
||||
auto & shard = queue_shards[i];
|
||||
|
||||
shard.are_tasks_available.notify_one();
|
||||
assert(dump_by_first_update_threads[i].joinable());
|
||||
chassert(dump_by_first_update_threads[i].joinable());
|
||||
dump_by_first_update_threads[i].join();
|
||||
|
||||
if (flush_on_shutdown)
|
||||
@ -510,14 +516,13 @@ void AsynchronousInsertQueue::validateSettings(const Settings & settings, Logger
|
||||
/// Adaptive timeout settings.
|
||||
const auto min_ms = std::chrono::milliseconds(settings.async_insert_busy_timeout_min_ms);
|
||||
|
||||
if (min_ms > max_ms)
|
||||
if (log)
|
||||
LOG_WARNING(
|
||||
log,
|
||||
"Setting 'async_insert_busy_timeout_min_ms'={} is greater than 'async_insert_busy_timeout_max_ms'={}. Ignoring "
|
||||
"'async_insert_busy_timeout_min_ms'",
|
||||
min_ms.count(),
|
||||
max_ms.count());
|
||||
if (min_ms > max_ms && log)
|
||||
LOG_WARNING(
|
||||
log,
|
||||
"Setting 'async_insert_busy_timeout_min_ms'={} is greater than 'async_insert_busy_timeout_max_ms'={}. Ignoring "
|
||||
"'async_insert_busy_timeout_min_ms'",
|
||||
min_ms.count(),
|
||||
max_ms.count());
|
||||
|
||||
if (settings.async_insert_busy_timeout_increase_rate <= 0)
|
||||
throw Exception(ErrorCodes::INVALID_SETTING_VALUE, "Setting 'async_insert_busy_timeout_increase_rate' must be greater than zero");
|
||||
@ -953,14 +958,18 @@ Chunk AsynchronousInsertQueue::processEntriesWithParsing(
|
||||
"Expected entry with data kind Parsed. Got: {}", entry->chunk.getDataKind());
|
||||
|
||||
auto buffer = std::make_unique<ReadBufferFromString>(*bytes);
|
||||
|
||||
size_t num_bytes = bytes->size();
|
||||
size_t num_rows = executor.execute(*buffer);
|
||||
|
||||
total_rows += num_rows;
|
||||
chunk_info->offsets.push_back(total_rows);
|
||||
chunk_info->tokens.push_back(entry->async_dedup_token);
|
||||
|
||||
add_to_async_insert_log(entry, query_for_logging, current_exception, num_rows, num_bytes, data->timeout_ms);
|
||||
|
||||
current_exception.clear();
|
||||
entry->resetChunk();
|
||||
}
|
||||
|
||||
Chunk chunk(executor.getResultColumns(), total_rows);
|
||||
@ -1011,6 +1020,8 @@ Chunk AsynchronousInsertQueue::processPreprocessedEntries(
|
||||
|
||||
const auto & query_for_logging = get_query_by_format(entry->format);
|
||||
add_to_async_insert_log(entry, query_for_logging, "", block->rows(), block->bytes(), data->timeout_ms);
|
||||
|
||||
entry->resetChunk();
|
||||
}
|
||||
|
||||
Chunk chunk(std::move(result_columns), total_rows);
|
||||
|
@ -117,6 +117,17 @@ private:
|
||||
return DataKind::Parsed;
|
||||
}
|
||||
|
||||
bool empty() const
|
||||
{
|
||||
return std::visit([]<typename T>(const T & arg)
|
||||
{
|
||||
if constexpr (std::is_same_v<T, Block>)
|
||||
return arg.rows() == 0;
|
||||
else
|
||||
return arg.empty();
|
||||
}, *this);
|
||||
}
|
||||
|
||||
const String * asString() const { return std::get_if<String>(this); }
|
||||
const Block * asBlock() const { return std::get_if<Block>(this); }
|
||||
};
|
||||
@ -140,7 +151,9 @@ private:
|
||||
const String & format_,
|
||||
MemoryTracker * user_memory_tracker_);
|
||||
|
||||
void resetChunk();
|
||||
void finish(std::exception_ptr exception_ = nullptr);
|
||||
|
||||
std::future<void> getFuture() { return promise.get_future(); }
|
||||
bool isFinished() const { return finished; }
|
||||
|
||||
|
@ -417,7 +417,7 @@ BlockIO InterpreterDropQuery::executeToDatabaseImpl(const ASTDropQuery & query,
|
||||
uuids_to_wait.push_back(table_to_wait);
|
||||
}
|
||||
}
|
||||
// only if operation is DETACH
|
||||
// only if operation is DETACH
|
||||
if ((!drop || !truncate) && query.sync)
|
||||
{
|
||||
/// Avoid "some tables are still in use" when sync mode is enabled
|
||||
|
@ -96,6 +96,7 @@
|
||||
#include <Common/scope_guard_safe.h>
|
||||
#include <Common/typeid_cast.h>
|
||||
#include <Common/ProfileEvents.h>
|
||||
#include <Common/NaNUtils.h>
|
||||
|
||||
|
||||
namespace ProfileEvents
|
||||
@ -2553,10 +2554,13 @@ void InterpreterSelectQuery::executeFetchColumns(QueryProcessingStage::Enum proc
|
||||
/// If necessary, we request more sources than the number of threads - to distribute the work evenly over the threads.
|
||||
if (max_streams > 1 && !is_sync_remote)
|
||||
{
|
||||
if (auto streams_with_ratio = max_streams * settings.max_streams_to_max_threads_ratio; streams_with_ratio < SIZE_MAX)
|
||||
if (auto streams_with_ratio = max_streams * settings.max_streams_to_max_threads_ratio; canConvertTo<size_t>(streams_with_ratio))
|
||||
max_streams = static_cast<size_t>(streams_with_ratio);
|
||||
else
|
||||
throw Exception(ErrorCodes::PARAMETER_OUT_OF_BOUND, "Exceeded limit for `max_streams` with `max_streams_to_max_threads_ratio`. Make sure that `max_streams * max_streams_to_max_threads_ratio` not exceeds {}, current value: {}", SIZE_MAX, streams_with_ratio);
|
||||
throw Exception(ErrorCodes::PARAMETER_OUT_OF_BOUND,
|
||||
"Exceeded limit for `max_streams` with `max_streams_to_max_threads_ratio`. "
|
||||
"Make sure that `max_streams * max_streams_to_max_threads_ratio` is in some reasonable boundaries, current value: {}",
|
||||
streams_with_ratio);
|
||||
}
|
||||
|
||||
auto & prewhere_info = analysis_result.prewhere_info;
|
||||
|
@ -94,7 +94,24 @@ public:
|
||||
ASTPtr clone() const override
|
||||
{
|
||||
auto res = std::make_shared<ASTRenameQuery>(*this);
|
||||
res->cloneChildren();
|
||||
res->children.clear();
|
||||
|
||||
auto clone_child = [&res](ASTPtr & node)
|
||||
{
|
||||
if (node)
|
||||
{
|
||||
node = node->clone();
|
||||
res->children.push_back(node);
|
||||
}
|
||||
};
|
||||
|
||||
for (auto & elem : res->elements)
|
||||
{
|
||||
clone_child(elem.from.database);
|
||||
clone_child(elem.from.table);
|
||||
clone_child(elem.to.database);
|
||||
clone_child(elem.to.table);
|
||||
}
|
||||
cloneOutputOptions(*res);
|
||||
return res;
|
||||
}
|
||||
@ -108,9 +125,15 @@ public:
|
||||
for (Element & elem : query.elements)
|
||||
{
|
||||
if (!elem.from.database)
|
||||
{
|
||||
elem.from.database = std::make_shared<ASTIdentifier>(params.default_database);
|
||||
query.children.push_back(elem.from.database);
|
||||
}
|
||||
if (!elem.to.database)
|
||||
{
|
||||
elem.to.database = std::make_shared<ASTIdentifier>(params.default_database);
|
||||
query.children.push_back(elem.to.database);
|
||||
}
|
||||
}
|
||||
|
||||
return query_ptr;
|
||||
|
@ -131,6 +131,7 @@ namespace ErrorCodes
|
||||
extern const int LOGICAL_ERROR;
|
||||
extern const int TOO_MANY_ROWS;
|
||||
extern const int CANNOT_PARSE_TEXT;
|
||||
extern const int PARAMETER_OUT_OF_BOUND;
|
||||
}
|
||||
|
||||
static MergeTreeReaderSettings getMergeTreeReaderSettings(
|
||||
@ -348,7 +349,14 @@ Pipe ReadFromMergeTree::readFromPoolParallelReplicas(
|
||||
|
||||
/// We have a special logic for local replica. It has to read less data, because in some cases it should
|
||||
/// merge states of aggregate functions or do some other important stuff other than reading from Disk.
|
||||
pool_settings.min_marks_for_concurrent_read = static_cast<size_t>(pool_settings.min_marks_for_concurrent_read * context->getSettingsRef().parallel_replicas_single_task_marks_count_multiplier);
|
||||
const auto multiplier = context->getSettingsRef().parallel_replicas_single_task_marks_count_multiplier;
|
||||
if (auto result = pool_settings.min_marks_for_concurrent_read * multiplier; canConvertTo<size_t>(result))
|
||||
pool_settings.min_marks_for_concurrent_read = static_cast<size_t>(result);
|
||||
else
|
||||
throw Exception(ErrorCodes::PARAMETER_OUT_OF_BOUND,
|
||||
"Exceeded limit for the number of marks per a single task for parallel replicas. "
|
||||
"Make sure that `parallel_replicas_single_task_marks_count_multiplier` is in some reasonable boundaries, current value is: {}",
|
||||
multiplier);
|
||||
|
||||
auto pool = std::make_shared<MergeTreeReadPoolParallelReplicas>(
|
||||
std::move(extension),
|
||||
@ -512,8 +520,14 @@ Pipe ReadFromMergeTree::readInOrder(
|
||||
.columns_to_read = required_columns,
|
||||
};
|
||||
|
||||
pool_settings.min_marks_for_concurrent_read = static_cast<size_t>(
|
||||
pool_settings.min_marks_for_concurrent_read * context->getSettingsRef().parallel_replicas_single_task_marks_count_multiplier);
|
||||
const auto multiplier = context->getSettingsRef().parallel_replicas_single_task_marks_count_multiplier;
|
||||
if (auto result = pool_settings.min_marks_for_concurrent_read * multiplier; canConvertTo<size_t>(result))
|
||||
pool_settings.min_marks_for_concurrent_read = static_cast<size_t>(result);
|
||||
else
|
||||
throw Exception(ErrorCodes::PARAMETER_OUT_OF_BOUND,
|
||||
"Exceeded limit for the number of marks per a single task for parallel replicas. "
|
||||
"Make sure that `parallel_replicas_single_task_marks_count_multiplier` is in some reasonable boundaries, current value is: {}",
|
||||
multiplier);
|
||||
|
||||
CoordinationMode mode = read_type == ReadType::InOrder
|
||||
? CoordinationMode::WithOrder
|
||||
|
@ -10,6 +10,10 @@ class ASTStorage;
|
||||
|
||||
#define MEMORY_SETTINGS(M, ALIAS) \
|
||||
M(Bool, compress, false, "Compress data in memory", 0) \
|
||||
M(UInt64, min_rows_to_keep, 0, "Minimum block size (in rows) to retain in Memory table buffer.", 0) \
|
||||
M(UInt64, max_rows_to_keep, 0, "Maximum block size (in rows) to retain in Memory table buffer.", 0) \
|
||||
M(UInt64, min_bytes_to_keep, 0, "Minimum block size (in bytes) to retain in Memory table buffer.", 0) \
|
||||
M(UInt64, max_bytes_to_keep, 0, "Maximum block size (in bytes) to retain in Memory table buffer.", 0) \
|
||||
|
||||
DECLARE_SETTINGS_TRAITS(memorySettingsTraits, MEMORY_SETTINGS)
|
||||
|
||||
|
@ -6143,7 +6143,7 @@ MergeTreeData::MutableDataPartsVector MergeTreeData::tryLoadPartsToAttach(const
|
||||
|
||||
LOG_DEBUG(log, "Found containing part {} for part {}", containing_part, part_info.dir_name);
|
||||
|
||||
if (!containing_part.empty() && containing_part != part_info.dir_name)
|
||||
if (containing_part != part_info.dir_name)
|
||||
part_info.disk->moveDirectory(fs::path(relative_data_path) / source_dir / part_info.dir_name,
|
||||
fs::path(relative_data_path) / source_dir / ("inactive_" + part_info.dir_name));
|
||||
else
|
||||
|
@ -241,7 +241,7 @@ std::vector<AsyncInsertInfoPtr> scatterAsyncInsertInfoBySelector(AsyncInsertInfo
|
||||
}
|
||||
|
||||
BlocksWithPartition MergeTreeDataWriter::splitBlockIntoParts(
|
||||
const Block & block, size_t max_parts, const StorageMetadataPtr & metadata_snapshot, ContextPtr context, AsyncInsertInfoPtr async_insert_info)
|
||||
Block && block, size_t max_parts, const StorageMetadataPtr & metadata_snapshot, ContextPtr context, AsyncInsertInfoPtr async_insert_info)
|
||||
{
|
||||
BlocksWithPartition result;
|
||||
if (!block || !block.rows())
|
||||
@ -320,7 +320,7 @@ BlocksWithPartition MergeTreeDataWriter::splitBlockIntoParts(
|
||||
}
|
||||
|
||||
Block MergeTreeDataWriter::mergeBlock(
|
||||
const Block & block,
|
||||
Block && block,
|
||||
SortDescription sort_description,
|
||||
const Names & partition_key_columns,
|
||||
IColumn::Permutation *& permutation,
|
||||
@ -410,7 +410,11 @@ MergeTreeDataWriter::TemporaryPart MergeTreeDataWriter::writeTempPartWithoutPref
|
||||
}
|
||||
|
||||
MergeTreeDataWriter::TemporaryPart MergeTreeDataWriter::writeTempPartImpl(
|
||||
BlockWithPartition & block_with_partition, const StorageMetadataPtr & metadata_snapshot, ContextPtr context, int64_t block_number, bool need_tmp_prefix)
|
||||
BlockWithPartition & block_with_partition,
|
||||
const StorageMetadataPtr & metadata_snapshot,
|
||||
ContextPtr context,
|
||||
int64_t block_number,
|
||||
bool need_tmp_prefix)
|
||||
{
|
||||
TemporaryPart temp_part;
|
||||
Block & block = block_with_partition.block;
|
||||
@ -498,7 +502,7 @@ MergeTreeDataWriter::TemporaryPart MergeTreeDataWriter::writeTempPartImpl(
|
||||
if (context->getSettingsRef().optimize_on_insert)
|
||||
{
|
||||
ProfileEventTimeIncrement<Microseconds> watch(ProfileEvents::MergeTreeDataWriterMergingBlocksMicroseconds);
|
||||
block = mergeBlock(block, sort_description, partition_key_columns, perm_ptr, data.merging_params);
|
||||
block = mergeBlock(std::move(block), sort_description, partition_key_columns, perm_ptr, data.merging_params);
|
||||
}
|
||||
|
||||
/// Size of part would not be greater than block.bytes() + epsilon
|
||||
@ -718,7 +722,7 @@ MergeTreeDataWriter::TemporaryPart MergeTreeDataWriter::writeProjectionPartImpl(
|
||||
|
||||
MergeTreeData::MergingParams projection_merging_params;
|
||||
projection_merging_params.mode = MergeTreeData::MergingParams::Aggregating;
|
||||
block = mergeBlock(block, sort_description, {}, perm_ptr, projection_merging_params);
|
||||
block = mergeBlock(std::move(block), sort_description, {}, perm_ptr, projection_merging_params);
|
||||
}
|
||||
|
||||
/// This effectively chooses minimal compression method:
|
||||
|
@ -53,7 +53,7 @@ public:
|
||||
* (split rows by partition)
|
||||
* Works deterministically: if same block was passed, function will return same result in same order.
|
||||
*/
|
||||
static BlocksWithPartition splitBlockIntoParts(const Block & block, size_t max_parts, const StorageMetadataPtr & metadata_snapshot, ContextPtr context, AsyncInsertInfoPtr async_insert_info = nullptr);
|
||||
static BlocksWithPartition splitBlockIntoParts(Block && block, size_t max_parts, const StorageMetadataPtr & metadata_snapshot, ContextPtr context, AsyncInsertInfoPtr async_insert_info = nullptr);
|
||||
|
||||
/// This structure contains not completely written temporary part.
|
||||
/// Some writes may happen asynchronously, e.g. for blob storages.
|
||||
@ -107,7 +107,7 @@ public:
|
||||
size_t block_num);
|
||||
|
||||
static Block mergeBlock(
|
||||
const Block & block,
|
||||
Block && block,
|
||||
SortDescription sort_description,
|
||||
const Names & partition_key_columns,
|
||||
IColumn::Permutation *& permutation,
|
||||
|
@ -63,7 +63,7 @@ void MergeTreeSink::consume(Chunk chunk)
|
||||
if (!storage_snapshot->object_columns.empty())
|
||||
convertDynamicColumnsToTuples(block, storage_snapshot);
|
||||
|
||||
auto part_blocks = storage.writer.splitBlockIntoParts(block, max_parts_per_block, metadata_snapshot, context);
|
||||
auto part_blocks = storage.writer.splitBlockIntoParts(std::move(block), max_parts_per_block, metadata_snapshot, context);
|
||||
|
||||
using DelayedPartitions = std::vector<MergeTreeSink::DelayedChunk::Partition>;
|
||||
DelayedPartitions partitions;
|
||||
@ -87,6 +87,10 @@ void MergeTreeSink::consume(Chunk chunk)
|
||||
elapsed_ns = watch.elapsed();
|
||||
}
|
||||
|
||||
/// Reset earlier to free memory
|
||||
current_block.block.clear();
|
||||
current_block.partition.clear();
|
||||
|
||||
/// If optimize_on_insert setting is true, current_block could become empty after merge
|
||||
/// and we didn't create part.
|
||||
if (!temp_part.part)
|
||||
|
@ -288,7 +288,7 @@ void ReplicatedMergeTreeSinkImpl<async_insert>::consume(Chunk chunk)
|
||||
throw Exception(ErrorCodes::LOGICAL_ERROR, "No chunk info for async inserts");
|
||||
}
|
||||
|
||||
auto part_blocks = storage.writer.splitBlockIntoParts(block, max_parts_per_block, metadata_snapshot, context, async_insert_info);
|
||||
auto part_blocks = storage.writer.splitBlockIntoParts(std::move(block), max_parts_per_block, metadata_snapshot, context, async_insert_info);
|
||||
|
||||
using DelayedPartition = typename ReplicatedMergeTreeSinkImpl<async_insert>::DelayedChunk::Partition;
|
||||
using DelayedPartitions = std::vector<DelayedPartition>;
|
||||
@ -383,6 +383,12 @@ void ReplicatedMergeTreeSinkImpl<async_insert>::consume(Chunk chunk)
|
||||
partitions = DelayedPartitions{};
|
||||
}
|
||||
|
||||
if constexpr (!async_insert)
|
||||
{
|
||||
/// Reset earlier to free memory.
|
||||
current_block.block.clear();
|
||||
current_block.partition.clear();
|
||||
}
|
||||
|
||||
partitions.emplace_back(DelayedPartition(
|
||||
log,
|
||||
|
@ -46,6 +46,7 @@ namespace ErrorCodes
|
||||
extern const int NUMBER_OF_ARGUMENTS_DOESNT_MATCH;
|
||||
extern const int CANNOT_RESTORE_TABLE;
|
||||
extern const int NOT_IMPLEMENTED;
|
||||
extern const int SETTING_CONSTRAINT_VIOLATION;
|
||||
}
|
||||
|
||||
class MemorySink : public SinkToStorage
|
||||
@ -103,16 +104,37 @@ public:
|
||||
std::lock_guard lock(storage.mutex);
|
||||
|
||||
auto new_data = std::make_unique<Blocks>(*(storage.data.get()));
|
||||
UInt64 new_total_rows = storage.total_size_rows.load(std::memory_order_relaxed) + inserted_rows;
|
||||
UInt64 new_total_bytes = storage.total_size_bytes.load(std::memory_order_relaxed) + inserted_bytes;
|
||||
while (!new_data->empty()
|
||||
&& ((storage.max_bytes_to_keep && new_total_bytes > storage.max_bytes_to_keep)
|
||||
|| (storage.max_rows_to_keep && new_total_rows > storage.max_rows_to_keep)))
|
||||
{
|
||||
Block oldest_block = new_data->front();
|
||||
UInt64 rows_to_remove = oldest_block.rows();
|
||||
UInt64 bytes_to_remove = oldest_block.allocatedBytes();
|
||||
if (new_total_bytes - bytes_to_remove < storage.min_bytes_to_keep
|
||||
|| new_total_rows - rows_to_remove < storage.min_rows_to_keep)
|
||||
{
|
||||
break; // stop - removing next block will put us under min_bytes / min_rows threshold
|
||||
}
|
||||
|
||||
// delete old block from current storage table
|
||||
new_total_rows -= rows_to_remove;
|
||||
new_total_bytes -= bytes_to_remove;
|
||||
new_data->erase(new_data->begin());
|
||||
}
|
||||
|
||||
// append new data to modified storage table and commit
|
||||
new_data->insert(new_data->end(), new_blocks.begin(), new_blocks.end());
|
||||
|
||||
storage.data.set(std::move(new_data));
|
||||
storage.total_size_bytes.fetch_add(inserted_bytes, std::memory_order_relaxed);
|
||||
storage.total_size_rows.fetch_add(inserted_rows, std::memory_order_relaxed);
|
||||
storage.total_size_rows.store(new_total_rows, std::memory_order_relaxed);
|
||||
storage.total_size_bytes.store(new_total_bytes, std::memory_order_relaxed);
|
||||
}
|
||||
|
||||
private:
|
||||
Blocks new_blocks;
|
||||
|
||||
StorageMemory & storage;
|
||||
StorageSnapshotPtr storage_snapshot;
|
||||
};
|
||||
@ -123,8 +145,10 @@ StorageMemory::StorageMemory(
|
||||
ColumnsDescription columns_description_,
|
||||
ConstraintsDescription constraints_,
|
||||
const String & comment,
|
||||
bool compress_)
|
||||
: IStorage(table_id_), data(std::make_unique<const Blocks>()), compress(compress_)
|
||||
const MemorySettings & settings)
|
||||
: IStorage(table_id_), data(std::make_unique<const Blocks>()), compress(settings.compress),
|
||||
min_rows_to_keep(settings.min_rows_to_keep), max_rows_to_keep(settings.max_rows_to_keep),
|
||||
min_bytes_to_keep(settings.min_bytes_to_keep), max_bytes_to_keep(settings.max_bytes_to_keep)
|
||||
{
|
||||
StorageInMemoryMetadata storage_metadata;
|
||||
storage_metadata.setColumns(std::move(columns_description_));
|
||||
@ -542,7 +566,11 @@ void registerStorageMemory(StorageFactory & factory)
|
||||
if (has_settings)
|
||||
settings.loadFromQuery(*args.storage_def);
|
||||
|
||||
return std::make_shared<StorageMemory>(args.table_id, args.columns, args.constraints, args.comment, settings.compress);
|
||||
if (settings.min_bytes_to_keep > settings.max_bytes_to_keep
|
||||
|| settings.min_rows_to_keep > settings.max_rows_to_keep)
|
||||
throw Exception(ErrorCodes::SETTING_CONSTRAINT_VIOLATION, "Min. bytes / rows must be set with a max.");
|
||||
|
||||
return std::make_shared<StorageMemory>(args.table_id, args.columns, args.constraints, args.comment, settings);
|
||||
},
|
||||
{
|
||||
.supports_settings = true,
|
||||
|
@ -7,6 +7,7 @@
|
||||
#include <Core/NamesAndTypes.h>
|
||||
#include <Interpreters/DatabaseCatalog.h>
|
||||
#include <Storages/IStorage.h>
|
||||
#include <Storages/MemorySettings.h>
|
||||
|
||||
#include <Common/MultiVersion.h>
|
||||
|
||||
@ -30,7 +31,7 @@ public:
|
||||
ColumnsDescription columns_description_,
|
||||
ConstraintsDescription constraints_,
|
||||
const String & comment,
|
||||
bool compress_ = false);
|
||||
const MemorySettings & settings = MemorySettings());
|
||||
|
||||
String getName() const override { return "Memory"; }
|
||||
|
||||
@ -134,6 +135,11 @@ private:
|
||||
std::atomic<size_t> total_size_rows = 0;
|
||||
|
||||
bool compress;
|
||||
UInt64 min_rows_to_keep;
|
||||
UInt64 max_rows_to_keep;
|
||||
UInt64 min_bytes_to_keep;
|
||||
UInt64 max_bytes_to_keep;
|
||||
|
||||
|
||||
friend class ReadFromMemoryStorageStep;
|
||||
};
|
||||
|
@ -1618,6 +1618,7 @@ class ClickHouseCluster:
|
||||
with_installed_binary=False,
|
||||
external_dirs=None,
|
||||
tmpfs=None,
|
||||
mem_limit=None,
|
||||
zookeeper_docker_compose_path=None,
|
||||
minio_certs_dir=None,
|
||||
minio_data_dir=None,
|
||||
@ -1728,6 +1729,7 @@ class ClickHouseCluster:
|
||||
with_installed_binary=with_installed_binary,
|
||||
external_dirs=external_dirs,
|
||||
tmpfs=tmpfs or [],
|
||||
mem_limit=mem_limit,
|
||||
config_root_name=config_root_name,
|
||||
extra_configs=extra_configs,
|
||||
)
|
||||
@ -3203,6 +3205,7 @@ services:
|
||||
{krb5_conf}
|
||||
entrypoint: {entrypoint_cmd}
|
||||
tmpfs: {tmpfs}
|
||||
{mem_limit}
|
||||
cap_add:
|
||||
- SYS_PTRACE
|
||||
- NET_ADMIN
|
||||
@ -3288,6 +3291,7 @@ class ClickHouseInstance:
|
||||
with_installed_binary=False,
|
||||
external_dirs=None,
|
||||
tmpfs=None,
|
||||
mem_limit=None,
|
||||
config_root_name="clickhouse",
|
||||
extra_configs=[],
|
||||
):
|
||||
@ -3299,6 +3303,10 @@ class ClickHouseInstance:
|
||||
|
||||
self.external_dirs = external_dirs
|
||||
self.tmpfs = tmpfs or []
|
||||
if mem_limit is not None:
|
||||
self.mem_limit = "mem_limit : " + mem_limit
|
||||
else:
|
||||
self.mem_limit = ""
|
||||
self.base_config_dir = (
|
||||
p.abspath(p.join(base_path, base_config_dir)) if base_config_dir else None
|
||||
)
|
||||
@ -4644,6 +4652,7 @@ class ClickHouseInstance:
|
||||
db_dir=db_dir,
|
||||
external_dirs_volumes=external_dirs_volumes,
|
||||
tmpfs=str(self.tmpfs),
|
||||
mem_limit=self.mem_limit,
|
||||
logs_dir=logs_dir,
|
||||
depends_on=str(depends_on),
|
||||
user=os.getuid(),
|
||||
|
@ -6,7 +6,7 @@ from helpers.cluster import ClickHouseCluster
|
||||
cluster = ClickHouseCluster(__file__)
|
||||
|
||||
node1 = cluster.add_instance(
|
||||
"node1", user_configs=["config/config.xml"], with_zookeeper=True
|
||||
"node1", user_configs=["config/config.xml"], with_zookeeper=False
|
||||
)
|
||||
|
||||
|
||||
|
@ -0,0 +1,7 @@
|
||||
<clickhouse>
|
||||
<text_log>
|
||||
<database>system</database>
|
||||
<table>text_log</table>
|
||||
<flush_interval_milliseconds>500</flush_interval_milliseconds>
|
||||
</text_log>
|
||||
</clickhouse>
|
53
tests/integration/test_memory_limit_observer/test.py
Normal file
53
tests/integration/test_memory_limit_observer/test.py
Normal file
@ -0,0 +1,53 @@
|
||||
import pytest
|
||||
import logging
|
||||
import time
|
||||
|
||||
from helpers.cluster import ClickHouseCluster, run_and_check
|
||||
|
||||
cluster = ClickHouseCluster(__file__)
|
||||
|
||||
node1 = cluster.add_instance(
|
||||
"node1", main_configs=["config/text_log.xml"], mem_limit="5g"
|
||||
)
|
||||
|
||||
|
||||
@pytest.fixture(scope="module")
|
||||
def started_cluster():
|
||||
try:
|
||||
cluster.start()
|
||||
|
||||
yield cluster
|
||||
|
||||
finally:
|
||||
cluster.shutdown()
|
||||
|
||||
|
||||
def get_latest_mem_limit():
|
||||
for _ in range(10):
|
||||
try:
|
||||
mem_limit = float(
|
||||
node1.query(
|
||||
"""
|
||||
select extract(message, '\\d+\\.\\d+') from system.text_log
|
||||
where message like '%Setting max_server_memory_usage was set to%' and
|
||||
message not like '%like%' order by event_time desc limit 1
|
||||
"""
|
||||
).strip()
|
||||
)
|
||||
return mem_limit
|
||||
except Exception as e:
|
||||
time.sleep(1)
|
||||
raise Exception("Cannot get memory limit")
|
||||
|
||||
|
||||
def test_observe_memory_limit(started_cluster):
|
||||
original_max_mem = get_latest_mem_limit()
|
||||
logging.debug(f"get original memory limit {original_max_mem}")
|
||||
run_and_check(["docker", "update", "--memory=10g", node1.docker_id])
|
||||
for _ in range(30):
|
||||
time.sleep(10)
|
||||
new_max_mem = get_latest_mem_limit()
|
||||
logging.debug(f"get new memory limit {new_max_mem}")
|
||||
if new_max_mem > original_max_mem:
|
||||
return
|
||||
raise Exception("the memory limit does not increase as expected")
|
7
tests/queries/0_stateless/00765_locate.reference
Normal file
7
tests/queries/0_stateless/00765_locate.reference
Normal file
@ -0,0 +1,7 @@
|
||||
-- negative tests
|
||||
-- test mysql compatibility setting
|
||||
0
|
||||
0
|
||||
3
|
||||
-- the function name needs to be case-insensitive for historical reasons
|
||||
0
|
15
tests/queries/0_stateless/00765_locate.sql
Normal file
15
tests/queries/0_stateless/00765_locate.sql
Normal file
@ -0,0 +1,15 @@
|
||||
SET send_logs_level = 'fatal';
|
||||
|
||||
SELECT '-- negative tests';
|
||||
SELECT locate(); -- { serverError NUMBER_OF_ARGUMENTS_DOESNT_MATCH }
|
||||
SELECT locate(1, 'abc'); -- { serverError ILLEGAL_TYPE_OF_ARGUMENT }
|
||||
SELECT locate('abc', 1); -- { serverError ILLEGAL_TYPE_OF_ARGUMENT }
|
||||
SELECT locate('abc', 'abc', 'abc'); -- { serverError ILLEGAL_TYPE_OF_ARGUMENT }
|
||||
|
||||
SELECT '-- test mysql compatibility setting';
|
||||
SELECT locate('abcabc', 'ca');
|
||||
SELECT locate('abcabc', 'ca') SETTINGS function_locate_has_mysql_compatible_argument_order = true;
|
||||
SELECT locate('abcabc', 'ca') SETTINGS function_locate_has_mysql_compatible_argument_order = false;
|
||||
|
||||
SELECT '-- the function name needs to be case-insensitive for historical reasons';
|
||||
SELECT LoCaTe('abcabc', 'ca');
|
@ -4,7 +4,6 @@ foo
|
||||
FOO
|
||||
baz
|
||||
zzz
|
||||
2
|
||||
fo
|
||||
oo
|
||||
o
|
||||
|
@ -6,7 +6,6 @@ select LOWER('Foo');
|
||||
select UPPER('Foo');
|
||||
select REPLACE('bar', 'r', 'z');
|
||||
select REGEXP_REPLACE('bar', '.', 'z');
|
||||
select Locate('foo', 'o');
|
||||
select SUBSTRING('foo', 1, 2);
|
||||
select Substr('foo', 2);
|
||||
select mid('foo', 3);
|
||||
|
@ -26,7 +26,6 @@ SELECT
|
||||
least(1),
|
||||
length('1'),
|
||||
log(1),
|
||||
position('1', '1'),
|
||||
log(1),
|
||||
log10(1),
|
||||
log2(1),
|
||||
|
@ -1 +1 @@
|
||||
EXPLAIN SYNTAX SELECT CAST(1 AS INT), CEIL(1), CEILING(1), CHAR(49), CHAR_LENGTH('1'), CHARACTER_LENGTH('1'), COALESCE(1), CONCAT('1', '1'), CORR(1, 1), COS(1), COUNT(1), COVAR_POP(1, 1), COVAR_SAMP(1, 1), DATABASE(), SCHEMA(), DATEDIFF('DAY', toDate('2020-10-24'), toDate('2019-10-24')), EXP(1), FLATTEN([[1]]), FLOOR(1), FQDN(), GREATEST(1), IF(1, 1, 1), IFNULL(1, 1), LCASE('A'), LEAST(1), LENGTH('1'), LN(1), LOCATE('1', '1'), LOG(1), LOG10(1), LOG2(1), LOWER('A'), MAX(1), MID('123', 1, 1), MIN(1), MOD(1, 1), NOT(1), NOW(), NOW64(), NULLIF(1, 1), PI(), POSITION('123', '2'), POW(1, 1), POWER(1, 1), RAND(), REPLACE('1', '1', '2'), REVERSE('123'), ROUND(1), SIN(1), SQRT(1), STDDEV_POP(1), STDDEV_SAMP(1), SUBSTR('123', 2), SUBSTRING('123', 2), SUM(1), TAN(1), TANH(1), TRUNC(1), TRUNCATE(1), UCASE('A'), UPPER('A'), USER(), VAR_POP(1), VAR_SAMP(1), WEEK(toDate('2020-10-24')), YEARWEEK(toDate('2020-10-24')) format TSVRaw;
|
||||
EXPLAIN SYNTAX SELECT CAST(1 AS INT), CEIL(1), CEILING(1), CHAR(49), CHAR_LENGTH('1'), CHARACTER_LENGTH('1'), COALESCE(1), CONCAT('1', '1'), CORR(1, 1), COS(1), COUNT(1), COVAR_POP(1, 1), COVAR_SAMP(1, 1), DATABASE(), SCHEMA(), DATEDIFF('DAY', toDate('2020-10-24'), toDate('2019-10-24')), EXP(1), FLATTEN([[1]]), FLOOR(1), FQDN(), GREATEST(1), IF(1, 1, 1), IFNULL(1, 1), LCASE('A'), LEAST(1), LENGTH('1'), LN(1), LOG(1), LOG10(1), LOG2(1), LOWER('A'), MAX(1), MID('123', 1, 1), MIN(1), MOD(1, 1), NOT(1), NOW(), NOW64(), NULLIF(1, 1), PI(), POSITION('123', '2'), POW(1, 1), POWER(1, 1), RAND(), REPLACE('1', '1', '2'), REVERSE('123'), ROUND(1), SIN(1), SQRT(1), STDDEV_POP(1), STDDEV_SAMP(1), SUBSTR('123', 2), SUBSTRING('123', 2), SUM(1), TAN(1), TANH(1), TRUNC(1), TRUNCATE(1), UCASE('A'), UPPER('A'), USER(), VAR_POP(1), VAR_SAMP(1), WEEK(toDate('2020-10-24')), YEARWEEK(toDate('2020-10-24')) format TSVRaw;
|
||||
|
@ -0,0 +1,16 @@
|
||||
TESTING BYTES
|
||||
8192
|
||||
9216
|
||||
9216
|
||||
65536
|
||||
TESTING ROWS
|
||||
50
|
||||
1000
|
||||
1020
|
||||
1100
|
||||
TESTING NO CIRCULAR-BUFFER
|
||||
8192
|
||||
9216
|
||||
17408
|
||||
82944
|
||||
TESTING INVALID SETTINGS
|
@ -0,0 +1,63 @@
|
||||
SET max_block_size = 65409; -- Default value
|
||||
|
||||
DROP TABLE IF EXISTS memory;
|
||||
CREATE TABLE memory (i UInt32) ENGINE = Memory SETTINGS min_bytes_to_keep = 4096, max_bytes_to_keep = 16384;
|
||||
|
||||
SELECT 'TESTING BYTES';
|
||||
/* 1. testing oldest block doesn't get deleted because of min-threshold */
|
||||
INSERT INTO memory SELECT * FROM numbers(0, 1600);
|
||||
SELECT total_bytes FROM system.tables WHERE name = 'memory' and database = currentDatabase();
|
||||
|
||||
/* 2. adding block that doesn't get deleted */
|
||||
INSERT INTO memory SELECT * FROM numbers(1000, 100);
|
||||
SELECT total_bytes FROM system.tables WHERE name = 'memory' and database = currentDatabase();
|
||||
|
||||
/* 3. testing oldest block gets deleted - 9216 bytes - 1100 */
|
||||
INSERT INTO memory SELECT * FROM numbers(9000, 1000);
|
||||
SELECT total_bytes FROM system.tables WHERE name = 'memory' and database = currentDatabase();
|
||||
|
||||
/* 4.check large block over-writes all bytes / rows */
|
||||
INSERT INTO memory SELECT * FROM numbers(9000, 10000);
|
||||
SELECT total_bytes FROM system.tables WHERE name = 'memory' and database = currentDatabase();
|
||||
|
||||
DROP TABLE IF EXISTS memory;
|
||||
CREATE TABLE memory (i UInt32) ENGINE = Memory SETTINGS min_rows_to_keep = 100, max_rows_to_keep = 1000;
|
||||
|
||||
SELECT 'TESTING ROWS';
|
||||
/* 1. add normal number of rows */
|
||||
INSERT INTO memory SELECT * FROM numbers(0, 50);
|
||||
SELECT total_rows FROM system.tables WHERE name = 'memory' and database = currentDatabase();
|
||||
|
||||
/* 2. table should have 1000 */
|
||||
INSERT INTO memory SELECT * FROM numbers(50, 950);
|
||||
SELECT total_rows FROM system.tables WHERE name = 'memory' and database = currentDatabase();
|
||||
|
||||
/* 3. table should have 1020 - removed first 50 */
|
||||
INSERT INTO memory SELECT * FROM numbers(2000, 70);
|
||||
SELECT total_rows FROM system.tables WHERE name = 'memory' and database = currentDatabase();
|
||||
|
||||
/* 4. check large block over-writes all rows */
|
||||
INSERT INTO memory SELECT * FROM numbers(3000, 1100);
|
||||
SELECT total_rows FROM system.tables WHERE name = 'memory' and database = currentDatabase();
|
||||
|
||||
SELECT 'TESTING NO CIRCULAR-BUFFER';
|
||||
DROP TABLE IF EXISTS memory;
|
||||
CREATE TABLE memory (i UInt32) ENGINE = Memory;
|
||||
|
||||
INSERT INTO memory SELECT * FROM numbers(0, 1600);
|
||||
SELECT total_bytes FROM system.tables WHERE name = 'memory' and database = currentDatabase();
|
||||
|
||||
INSERT INTO memory SELECT * FROM numbers(1000, 100);
|
||||
SELECT total_bytes FROM system.tables WHERE name = 'memory' and database = currentDatabase();
|
||||
|
||||
INSERT INTO memory SELECT * FROM numbers(9000, 1000);
|
||||
SELECT total_bytes FROM system.tables WHERE name = 'memory' and database = currentDatabase();
|
||||
|
||||
INSERT INTO memory SELECT * FROM numbers(9000, 10000);
|
||||
SELECT total_bytes FROM system.tables WHERE name = 'memory' and database = currentDatabase();
|
||||
|
||||
SELECT 'TESTING INVALID SETTINGS';
|
||||
CREATE TABLE faulty_memory (i UInt32) ENGINE = Memory SETTINGS min_rows_to_keep = 100; -- { serverError 452 }
|
||||
CREATE TABLE faulty_memory (i UInt32) ENGINE = Memory SETTINGS min_bytes_to_keep = 100; -- { serverError 452 }
|
||||
|
||||
DROP TABLE memory;
|
@ -1,2 +1,2 @@
|
||||
clickhouse_add_executable (check-marks main.cpp)
|
||||
target_link_libraries(check-marks PRIVATE dbms boost::program_options)
|
||||
target_link_libraries(check-marks PRIVATE dbms clickhouse_functions boost::program_options)
|
||||
|
@ -1,2 +1,2 @@
|
||||
clickhouse_add_executable(check-mysql-binlog main.cpp)
|
||||
target_link_libraries(check-mysql-binlog PRIVATE dbms boost::program_options)
|
||||
target_link_libraries(check-mysql-binlog PRIVATE dbms clickhouse_functions boost::program_options)
|
||||
|
@ -4,4 +4,4 @@ if (NOT TARGET ch_contrib::nuraft)
|
||||
endif ()
|
||||
|
||||
clickhouse_add_executable(keeper-data-dumper main.cpp)
|
||||
target_link_libraries(keeper-data-dumper PRIVATE dbms)
|
||||
target_link_libraries(keeper-data-dumper PRIVATE dbms clickhouse_functions)
|
||||
|
Loading…
Reference in New Issue
Block a user