mirror of
https://github.com/ClickHouse/ClickHouse.git
synced 2024-12-15 10:52:30 +00:00
Merge branch 'master' into ch_canh_fix_prefix_not_like
This commit is contained in:
commit
3545db0a6c
7
.github/workflows/nightly.yml
vendored
7
.github/workflows/nightly.yml
vendored
@ -128,6 +128,8 @@ jobs:
|
|||||||
SONAR_SCANNER_VERSION: 4.7.0.2747
|
SONAR_SCANNER_VERSION: 4.7.0.2747
|
||||||
SONAR_SERVER_URL: "https://sonarcloud.io"
|
SONAR_SERVER_URL: "https://sonarcloud.io"
|
||||||
BUILD_WRAPPER_OUT_DIR: build_wrapper_output_directory # Directory where build-wrapper output will be placed
|
BUILD_WRAPPER_OUT_DIR: build_wrapper_output_directory # Directory where build-wrapper output will be placed
|
||||||
|
CC: clang-15
|
||||||
|
CXX: clang++-15
|
||||||
steps:
|
steps:
|
||||||
- uses: actions/checkout@v2
|
- uses: actions/checkout@v2
|
||||||
with:
|
with:
|
||||||
@ -149,7 +151,7 @@ jobs:
|
|||||||
env:
|
env:
|
||||||
BUILD_WRAPPER_DOWNLOAD_URL: ${{ env.SONAR_SERVER_URL }}/static/cpp/build-wrapper-linux-x86.zip
|
BUILD_WRAPPER_DOWNLOAD_URL: ${{ env.SONAR_SERVER_URL }}/static/cpp/build-wrapper-linux-x86.zip
|
||||||
run: |
|
run: |
|
||||||
curl -sSLo "$HOME/.sonar/build-wrapper-linux-x86.zip ${{ env.BUILD_WRAPPER_DOWNLOAD_URL }}"
|
curl -sSLo "$HOME/.sonar/build-wrapper-linux-x86.zip" "${{ env.BUILD_WRAPPER_DOWNLOAD_URL }}"
|
||||||
unzip -o "$HOME/.sonar/build-wrapper-linux-x86.zip" -d "$HOME/.sonar/"
|
unzip -o "$HOME/.sonar/build-wrapper-linux-x86.zip" -d "$HOME/.sonar/"
|
||||||
echo "$HOME/.sonar/build-wrapper-linux-x86" >> "$GITHUB_PATH"
|
echo "$HOME/.sonar/build-wrapper-linux-x86" >> "$GITHUB_PATH"
|
||||||
- name: Set Up Build Tools
|
- name: Set Up Build Tools
|
||||||
@ -173,4 +175,5 @@ jobs:
|
|||||||
--define sonar.host.url="${{ env.SONAR_SERVER_URL }}" \
|
--define sonar.host.url="${{ env.SONAR_SERVER_URL }}" \
|
||||||
--define sonar.cfamily.build-wrapper-output="${{ env.BUILD_WRAPPER_OUT_DIR }}" \
|
--define sonar.cfamily.build-wrapper-output="${{ env.BUILD_WRAPPER_OUT_DIR }}" \
|
||||||
--define sonar.projectKey="ClickHouse_ClickHouse" \
|
--define sonar.projectKey="ClickHouse_ClickHouse" \
|
||||||
--define sonar.organization="clickhouse-java"
|
--define sonar.organization="clickhouse-java" \
|
||||||
|
--define sonar.exclusions="**/*.java,**/*.ts,**/*.js,**/*.css,**/*.sql"
|
||||||
|
2
contrib/cctz
vendored
2
contrib/cctz
vendored
@ -1 +1 @@
|
|||||||
Subproject commit 7a454c25c7d16053bcd327cdd16329212a08fa4a
|
Subproject commit 5c8528fb35e89ee0b3a7157490423fba0d4dd7b5
|
@ -21,6 +21,9 @@ set (LLVM_INCLUDE_DIRS
|
|||||||
"${ClickHouse_BINARY_DIR}/contrib/llvm-project/llvm/include"
|
"${ClickHouse_BINARY_DIR}/contrib/llvm-project/llvm/include"
|
||||||
)
|
)
|
||||||
set (LLVM_LIBRARY_DIRS "${ClickHouse_BINARY_DIR}/contrib/llvm-project/llvm")
|
set (LLVM_LIBRARY_DIRS "${ClickHouse_BINARY_DIR}/contrib/llvm-project/llvm")
|
||||||
|
# NOTE: You should not remove this line since otherwise it will use default 20,
|
||||||
|
# and llvm cannot be compiled with bundled libcxx and 20 standard.
|
||||||
|
set (CMAKE_CXX_STANDARD 14)
|
||||||
|
|
||||||
# This list was generated by listing all LLVM libraries, compiling the binary and removing all libraries while it still compiles.
|
# This list was generated by listing all LLVM libraries, compiling the binary and removing all libraries while it still compiles.
|
||||||
set (REQUIRED_LLVM_LIBRARIES
|
set (REQUIRED_LLVM_LIBRARIES
|
||||||
|
@ -139,7 +139,7 @@ The following settings can be specified in configuration file for given endpoint
|
|||||||
- `use_environment_credentials` — If set to `true`, S3 client will try to obtain credentials from environment variables and [Amazon EC2](https://en.wikipedia.org/wiki/Amazon_Elastic_Compute_Cloud) metadata for given endpoint. Optional, default value is `false`.
|
- `use_environment_credentials` — If set to `true`, S3 client will try to obtain credentials from environment variables and [Amazon EC2](https://en.wikipedia.org/wiki/Amazon_Elastic_Compute_Cloud) metadata for given endpoint. Optional, default value is `false`.
|
||||||
- `region` — Specifies S3 region name. Optional.
|
- `region` — Specifies S3 region name. Optional.
|
||||||
- `use_insecure_imds_request` — If set to `true`, S3 client will use insecure IMDS request while obtaining credentials from Amazon EC2 metadata. Optional, default value is `false`.
|
- `use_insecure_imds_request` — If set to `true`, S3 client will use insecure IMDS request while obtaining credentials from Amazon EC2 metadata. Optional, default value is `false`.
|
||||||
- `header` — Adds specified HTTP header to a request to given endpoint. Optional, can be speficied multiple times.
|
- `header` — Adds specified HTTP header to a request to given endpoint. Optional, can be specified multiple times.
|
||||||
- `server_side_encryption_customer_key_base64` — If specified, required headers for accessing S3 objects with SSE-C encryption will be set. Optional.
|
- `server_side_encryption_customer_key_base64` — If specified, required headers for accessing S3 objects with SSE-C encryption will be set. Optional.
|
||||||
- `max_single_read_retries` — The maximum number of attempts during single read. Default value is `4`. Optional.
|
- `max_single_read_retries` — The maximum number of attempts during single read. Default value is `4`. Optional.
|
||||||
|
|
||||||
|
@ -107,7 +107,7 @@ ALTER TABLE visits RENAME COLUMN webBrowser TO browser
|
|||||||
CLEAR COLUMN [IF EXISTS] name IN PARTITION partition_name
|
CLEAR COLUMN [IF EXISTS] name IN PARTITION partition_name
|
||||||
```
|
```
|
||||||
|
|
||||||
Resets all data in a column for a specified partition. Read more about setting the partition name in the section [How to specify the partition expression](#alter-how-to-specify-part-expr).
|
Resets all data in a column for a specified partition. Read more about setting the partition name in the section [How to set the partition expression](partition.md#how-to-set-partition-expression).
|
||||||
|
|
||||||
If the `IF EXISTS` clause is specified, the query won’t return an error if the column does not exist.
|
If the `IF EXISTS` clause is specified, the query won’t return an error if the column does not exist.
|
||||||
|
|
||||||
|
@ -39,7 +39,7 @@ ALTER TABLE mt DETACH PARTITION '2020-11-21';
|
|||||||
ALTER TABLE mt DETACH PART 'all_2_2_0';
|
ALTER TABLE mt DETACH PART 'all_2_2_0';
|
||||||
```
|
```
|
||||||
|
|
||||||
Read about setting the partition expression in a section [How to specify the partition expression](#alter-how-to-specify-part-expr).
|
Read about setting the partition expression in a section [How to set the partition expression](#how-to-set-partition-expression).
|
||||||
|
|
||||||
After the query is executed, you can do whatever you want with the data in the `detached` directory — delete it from the file system, or just leave it.
|
After the query is executed, you can do whatever you want with the data in the `detached` directory — delete it from the file system, or just leave it.
|
||||||
|
|
||||||
@ -53,7 +53,7 @@ ALTER TABLE table_name [ON CLUSTER cluster] DROP PARTITION|PART partition_expr
|
|||||||
|
|
||||||
Deletes the specified partition from the table. This query tags the partition as inactive and deletes data completely, approximately in 10 minutes.
|
Deletes the specified partition from the table. This query tags the partition as inactive and deletes data completely, approximately in 10 minutes.
|
||||||
|
|
||||||
Read about setting the partition expression in a section [How to specify the partition expression](#alter-how-to-specify-part-expr).
|
Read about setting the partition expression in a section [How to set the partition expression](#how-to-set-partition-expression).
|
||||||
|
|
||||||
The query is replicated – it deletes data on all replicas.
|
The query is replicated – it deletes data on all replicas.
|
||||||
|
|
||||||
@ -71,7 +71,7 @@ ALTER TABLE table_name [ON CLUSTER cluster] DROP DETACHED PARTITION|PART partiti
|
|||||||
```
|
```
|
||||||
|
|
||||||
Removes the specified part or all parts of the specified partition from `detached`.
|
Removes the specified part or all parts of the specified partition from `detached`.
|
||||||
Read more about setting the partition expression in a section [How to specify the partition expression](#alter-how-to-specify-part-expr).
|
Read more about setting the partition expression in a section [How to set the partition expression](#how-to-set-partition-expression).
|
||||||
|
|
||||||
## ATTACH PARTITION\|PART
|
## ATTACH PARTITION\|PART
|
||||||
|
|
||||||
@ -86,7 +86,7 @@ ALTER TABLE visits ATTACH PARTITION 201901;
|
|||||||
ALTER TABLE visits ATTACH PART 201901_2_2_0;
|
ALTER TABLE visits ATTACH PART 201901_2_2_0;
|
||||||
```
|
```
|
||||||
|
|
||||||
Read more about setting the partition expression in a section [How to specify the partition expression](#alter-how-to-specify-part-expr).
|
Read more about setting the partition expression in a section [How to set the partition expression](#how-to-set-partition-expression).
|
||||||
|
|
||||||
This query is replicated. The replica-initiator checks whether there is data in the `detached` directory.
|
This query is replicated. The replica-initiator checks whether there is data in the `detached` directory.
|
||||||
If data exists, the query checks its integrity. If everything is correct, the query adds the data to the table.
|
If data exists, the query checks its integrity. If everything is correct, the query adds the data to the table.
|
||||||
@ -166,7 +166,7 @@ This query creates a local backup of a specified partition. If the `PARTITION` c
|
|||||||
The entire backup process is performed without stopping the server.
|
The entire backup process is performed without stopping the server.
|
||||||
:::
|
:::
|
||||||
|
|
||||||
Note that for old-styled tables you can specify the prefix of the partition name (for example, `2019`) - then the query creates the backup for all the corresponding partitions. Read about setting the partition expression in a section [How to specify the partition expression](#alter-how-to-specify-part-expr).
|
Note that for old-styled tables you can specify the prefix of the partition name (for example, `2019`) - then the query creates the backup for all the corresponding partitions. Read about setting the partition expression in a section [How to set the partition expression](#how-to-set-partition-expression).
|
||||||
|
|
||||||
At the time of execution, for a data snapshot, the query creates hardlinks to a table data. Hardlinks are placed in the directory `/var/lib/clickhouse/shadow/N/...`, where:
|
At the time of execution, for a data snapshot, the query creates hardlinks to a table data. Hardlinks are placed in the directory `/var/lib/clickhouse/shadow/N/...`, where:
|
||||||
|
|
||||||
|
@ -22,7 +22,7 @@ The `OPTIMIZE` query is supported for [MergeTree](../../engines/table-engines/me
|
|||||||
When `OPTIMIZE` is used with the [ReplicatedMergeTree](../../engines/table-engines/mergetree-family/replication.md) family of table engines, ClickHouse creates a task for merging and waits for execution on all replicas (if the [replication_alter_partitions_sync](../../operations/settings/settings.md#replication-alter-partitions-sync) setting is set to `2`) or on current replica (if the [replication_alter_partitions_sync](../../operations/settings/settings.md#replication-alter-partitions-sync) setting is set to `1`).
|
When `OPTIMIZE` is used with the [ReplicatedMergeTree](../../engines/table-engines/mergetree-family/replication.md) family of table engines, ClickHouse creates a task for merging and waits for execution on all replicas (if the [replication_alter_partitions_sync](../../operations/settings/settings.md#replication-alter-partitions-sync) setting is set to `2`) or on current replica (if the [replication_alter_partitions_sync](../../operations/settings/settings.md#replication-alter-partitions-sync) setting is set to `1`).
|
||||||
|
|
||||||
- If `OPTIMIZE` does not perform a merge for any reason, it does not notify the client. To enable notifications, use the [optimize_throw_if_noop](../../operations/settings/settings.md#setting-optimize_throw_if_noop) setting.
|
- If `OPTIMIZE` does not perform a merge for any reason, it does not notify the client. To enable notifications, use the [optimize_throw_if_noop](../../operations/settings/settings.md#setting-optimize_throw_if_noop) setting.
|
||||||
- If you specify a `PARTITION`, only the specified partition is optimized. [How to set partition expression](../../sql-reference/statements/alter/index.md#alter-how-to-specify-part-expr).
|
- If you specify a `PARTITION`, only the specified partition is optimized. [How to set partition expression](alter/partition.md#how-to-set-partition-expression).
|
||||||
- If you specify `FINAL`, optimization is performed even when all the data is already in one part. Also merge is forced even if concurrent merges are performed.
|
- If you specify `FINAL`, optimization is performed even when all the data is already in one part. Also merge is forced even if concurrent merges are performed.
|
||||||
- If you specify `DEDUPLICATE`, then completely identical rows (unless by-clause is specified) will be deduplicated (all columns are compared), it makes sense only for the MergeTree engine.
|
- If you specify `DEDUPLICATE`, then completely identical rows (unless by-clause is specified) will be deduplicated (all columns are compared), it makes sense only for the MergeTree engine.
|
||||||
|
|
||||||
|
@ -10,6 +10,7 @@
|
|||||||
#include <Common/ProfileEvents.h>
|
#include <Common/ProfileEvents.h>
|
||||||
#include <Common/thread_local_rng.h>
|
#include <Common/thread_local_rng.h>
|
||||||
#include <Common/OvercommitTracker.h>
|
#include <Common/OvercommitTracker.h>
|
||||||
|
#include <Common/Stopwatch.h>
|
||||||
#include <Common/logger_useful.h>
|
#include <Common/logger_useful.h>
|
||||||
|
|
||||||
#include "config.h"
|
#include "config.h"
|
||||||
@ -86,6 +87,8 @@ inline std::string_view toDescription(OvercommitResult result)
|
|||||||
namespace ProfileEvents
|
namespace ProfileEvents
|
||||||
{
|
{
|
||||||
extern const Event QueryMemoryLimitExceeded;
|
extern const Event QueryMemoryLimitExceeded;
|
||||||
|
extern const Event MemoryAllocatorPurge;
|
||||||
|
extern const Event MemoryAllocatorPurgeTimeMicroseconds;
|
||||||
}
|
}
|
||||||
|
|
||||||
using namespace std::chrono_literals;
|
using namespace std::chrono_literals;
|
||||||
@ -229,7 +232,10 @@ void MemoryTracker::allocImpl(Int64 size, bool throw_if_memory_exceeded, MemoryT
|
|||||||
{
|
{
|
||||||
if (free_memory_in_allocator_arenas.exchange(-current_free_memory_in_allocator_arenas) > 0)
|
if (free_memory_in_allocator_arenas.exchange(-current_free_memory_in_allocator_arenas) > 0)
|
||||||
{
|
{
|
||||||
|
Stopwatch watch;
|
||||||
mallctl("arena." STRINGIFY(MALLCTL_ARENAS_ALL) ".purge", nullptr, nullptr, nullptr, 0);
|
mallctl("arena." STRINGIFY(MALLCTL_ARENAS_ALL) ".purge", nullptr, nullptr, nullptr, 0);
|
||||||
|
ProfileEvents::increment(ProfileEvents::MemoryAllocatorPurge);
|
||||||
|
ProfileEvents::increment(ProfileEvents::MemoryAllocatorPurgeTimeMicroseconds, watch.elapsedMicroseconds());
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -432,7 +438,7 @@ void MemoryTracker::reset()
|
|||||||
|
|
||||||
void MemoryTracker::setRSS(Int64 rss_, Int64 free_memory_in_allocator_arenas_)
|
void MemoryTracker::setRSS(Int64 rss_, Int64 free_memory_in_allocator_arenas_)
|
||||||
{
|
{
|
||||||
Int64 new_amount = rss_; // - free_memory_in_allocator_arenas_;
|
Int64 new_amount = rss_;
|
||||||
total_memory_tracker.amount.store(new_amount, std::memory_order_relaxed);
|
total_memory_tracker.amount.store(new_amount, std::memory_order_relaxed);
|
||||||
free_memory_in_allocator_arenas.store(free_memory_in_allocator_arenas_, std::memory_order_relaxed);
|
free_memory_in_allocator_arenas.store(free_memory_in_allocator_arenas_, std::memory_order_relaxed);
|
||||||
|
|
||||||
|
@ -229,6 +229,8 @@ The server successfully detected this situation and will download merged part fr
|
|||||||
M(UserTimeMicroseconds, "Total time spent in processing (queries and other tasks) threads executing CPU instructions in user space. This include time CPU pipeline was stalled due to cache misses, branch mispredictions, hyper-threading, etc.") \
|
M(UserTimeMicroseconds, "Total time spent in processing (queries and other tasks) threads executing CPU instructions in user space. This include time CPU pipeline was stalled due to cache misses, branch mispredictions, hyper-threading, etc.") \
|
||||||
M(SystemTimeMicroseconds, "Total time spent in processing (queries and other tasks) threads executing CPU instructions in OS kernel space. This include time CPU pipeline was stalled due to cache misses, branch mispredictions, hyper-threading, etc.") \
|
M(SystemTimeMicroseconds, "Total time spent in processing (queries and other tasks) threads executing CPU instructions in OS kernel space. This include time CPU pipeline was stalled due to cache misses, branch mispredictions, hyper-threading, etc.") \
|
||||||
M(MemoryOvercommitWaitTimeMicroseconds, "Total time spent in waiting for memory to be freed in OvercommitTracker.") \
|
M(MemoryOvercommitWaitTimeMicroseconds, "Total time spent in waiting for memory to be freed in OvercommitTracker.") \
|
||||||
|
M(MemoryAllocatorPurge, "Total number of times memory allocator purge was requested") \
|
||||||
|
M(MemoryAllocatorPurgeTimeMicroseconds, "Total number of times memory allocator purge was requested") \
|
||||||
M(SoftPageFaults, "The number of soft page faults in query execution threads. Soft page fault usually means a miss in the memory allocator cache which required a new memory mapping from the OS and subsequent allocation of a page of physical memory.") \
|
M(SoftPageFaults, "The number of soft page faults in query execution threads. Soft page fault usually means a miss in the memory allocator cache which required a new memory mapping from the OS and subsequent allocation of a page of physical memory.") \
|
||||||
M(HardPageFaults, "The number of hard page faults in query execution threads. High values indicate either that you forgot to turn off swap on your server, or eviction of memory pages of the ClickHouse binary during very high memory pressure, or successful usage of the 'mmap' read method for the tables data.") \
|
M(HardPageFaults, "The number of hard page faults in query execution threads. High values indicate either that you forgot to turn off swap on your server, or eviction of memory pages of the ClickHouse binary during very high memory pressure, or successful usage of the 'mmap' read method for the tables data.") \
|
||||||
\
|
\
|
||||||
|
@ -3,7 +3,6 @@
|
|||||||
#include <Common/Exception.h>
|
#include <Common/Exception.h>
|
||||||
#include <Common/Stopwatch.h>
|
#include <Common/Stopwatch.h>
|
||||||
#include <IO/WriteHelpers.h>
|
#include <IO/WriteHelpers.h>
|
||||||
#include <cmath>
|
|
||||||
|
|
||||||
namespace ProfileEvents
|
namespace ProfileEvents
|
||||||
{
|
{
|
||||||
@ -21,64 +20,57 @@ namespace ErrorCodes
|
|||||||
/// Just 10^9.
|
/// Just 10^9.
|
||||||
static constexpr auto NS = 1000000000UL;
|
static constexpr auto NS = 1000000000UL;
|
||||||
|
|
||||||
/// Tracking window. Actually the size is not really important. We just want to avoid
|
static const size_t default_burst_seconds = 1;
|
||||||
/// throttles when there are no actions for a long period time.
|
|
||||||
static const double window_ns = 1ULL * NS;
|
Throttler::Throttler(size_t max_speed_, const std::shared_ptr<Throttler> & parent_)
|
||||||
|
: max_speed(max_speed_)
|
||||||
|
, max_burst(max_speed_ * default_burst_seconds)
|
||||||
|
, limit_exceeded_exception_message("")
|
||||||
|
, tokens(max_burst)
|
||||||
|
, parent(parent_)
|
||||||
|
{}
|
||||||
|
|
||||||
|
Throttler::Throttler(size_t max_speed_, size_t limit_, const char * limit_exceeded_exception_message_,
|
||||||
|
const std::shared_ptr<Throttler> & parent_)
|
||||||
|
: max_speed(max_speed_)
|
||||||
|
, max_burst(max_speed_ * default_burst_seconds)
|
||||||
|
, limit(limit_)
|
||||||
|
, limit_exceeded_exception_message(limit_exceeded_exception_message_)
|
||||||
|
, tokens(max_burst)
|
||||||
|
, parent(parent_)
|
||||||
|
{}
|
||||||
|
|
||||||
void Throttler::add(size_t amount)
|
void Throttler::add(size_t amount)
|
||||||
{
|
{
|
||||||
size_t new_count;
|
// Values obtained under lock to be checked after release
|
||||||
/// This outer variable is always equal to smoothed_speed.
|
size_t count_value;
|
||||||
/// We use to avoid race condition.
|
double tokens_value;
|
||||||
double current_speed = 0;
|
|
||||||
|
|
||||||
{
|
{
|
||||||
std::lock_guard lock(mutex);
|
std::lock_guard lock(mutex);
|
||||||
|
|
||||||
auto now = clock_gettime_ns_adjusted(prev_ns);
|
auto now = clock_gettime_ns_adjusted(prev_ns);
|
||||||
/// If prev_ns is equal to zero (first `add` call) we known nothing about speed
|
if (max_speed)
|
||||||
/// and don't track anything.
|
|
||||||
if (max_speed && prev_ns != 0)
|
|
||||||
{
|
{
|
||||||
/// Time spent to process the amount of bytes
|
double delta_seconds = prev_ns ? static_cast<double>(now - prev_ns) / NS : 0;
|
||||||
double time_spent = now - prev_ns;
|
tokens = std::min<double>(tokens + max_speed * delta_seconds - amount, max_burst);
|
||||||
|
|
||||||
/// The speed in bytes per second is equal to amount / time_spent in seconds
|
|
||||||
auto new_speed = amount / (time_spent / NS);
|
|
||||||
|
|
||||||
/// We want to make old values of speed less important for our smoothed value
|
|
||||||
/// so we decay it's value with coef.
|
|
||||||
auto decay_coeff = std::pow(0.5, time_spent / window_ns);
|
|
||||||
|
|
||||||
/// Weighted average between previous and new speed
|
|
||||||
smoothed_speed = smoothed_speed * decay_coeff + (1 - decay_coeff) * new_speed;
|
|
||||||
current_speed = smoothed_speed;
|
|
||||||
}
|
}
|
||||||
|
|
||||||
count += amount;
|
count += amount;
|
||||||
new_count = count;
|
count_value = count;
|
||||||
|
tokens_value = tokens;
|
||||||
prev_ns = now;
|
prev_ns = now;
|
||||||
}
|
}
|
||||||
|
|
||||||
if (limit && new_count > limit)
|
if (limit && count_value > limit)
|
||||||
throw Exception(limit_exceeded_exception_message + std::string(" Maximum: ") + toString(limit), ErrorCodes::LIMIT_EXCEEDED);
|
throw Exception(limit_exceeded_exception_message + std::string(" Maximum: ") + toString(limit), ErrorCodes::LIMIT_EXCEEDED);
|
||||||
|
|
||||||
if (max_speed && current_speed > max_speed)
|
/// Wait unless there is positive amount of tokens - throttling
|
||||||
{
|
if (max_speed && tokens_value < 0)
|
||||||
/// If we was too fast then we have to sleep until our smoothed speed became <= max_speed
|
|
||||||
int64_t sleep_time = static_cast<int64_t>(-window_ns * std::log2(max_speed / current_speed));
|
|
||||||
|
|
||||||
if (sleep_time > 0)
|
|
||||||
{
|
{
|
||||||
|
int64_t sleep_time = static_cast<int64_t>(-tokens_value / max_speed * NS);
|
||||||
accumulated_sleep += sleep_time;
|
accumulated_sleep += sleep_time;
|
||||||
|
|
||||||
sleepForNanoseconds(sleep_time);
|
sleepForNanoseconds(sleep_time);
|
||||||
|
|
||||||
accumulated_sleep -= sleep_time;
|
accumulated_sleep -= sleep_time;
|
||||||
|
|
||||||
ProfileEvents::increment(ProfileEvents::ThrottlerSleepMicroseconds, sleep_time / 1000UL);
|
ProfileEvents::increment(ProfileEvents::ThrottlerSleepMicroseconds, sleep_time / 1000UL);
|
||||||
}
|
}
|
||||||
}
|
|
||||||
|
|
||||||
if (parent)
|
if (parent)
|
||||||
parent->add(amount);
|
parent->add(amount);
|
||||||
@ -89,9 +81,9 @@ void Throttler::reset()
|
|||||||
std::lock_guard lock(mutex);
|
std::lock_guard lock(mutex);
|
||||||
|
|
||||||
count = 0;
|
count = 0;
|
||||||
accumulated_sleep = 0;
|
tokens = max_burst;
|
||||||
smoothed_speed = 0;
|
|
||||||
prev_ns = 0;
|
prev_ns = 0;
|
||||||
|
// NOTE: do not zero `accumulated_sleep` to avoid races
|
||||||
}
|
}
|
||||||
|
|
||||||
bool Throttler::isThrottling() const
|
bool Throttler::isThrottling() const
|
||||||
|
@ -10,25 +10,26 @@
|
|||||||
namespace DB
|
namespace DB
|
||||||
{
|
{
|
||||||
|
|
||||||
/** Allows you to limit the speed of something (in entities per second) using sleep.
|
/** Allows you to limit the speed of something (in tokens per second) using sleep.
|
||||||
* Specifics of work:
|
* Implemented using Token Bucket Throttling algorithm.
|
||||||
* Tracks exponentially (pow of 1/2) smoothed speed with hardcoded window.
|
* Also allows you to set a limit on the maximum number of tokens. If exceeded, an exception will be thrown.
|
||||||
* See more comments in .cpp file.
|
|
||||||
*
|
|
||||||
* Also allows you to set a limit on the maximum number of entities. If exceeded, an exception will be thrown.
|
|
||||||
*/
|
*/
|
||||||
class Throttler
|
class Throttler
|
||||||
{
|
{
|
||||||
public:
|
public:
|
||||||
explicit Throttler(size_t max_speed_, const std::shared_ptr<Throttler> & parent_ = nullptr)
|
Throttler(size_t max_speed_, size_t max_burst_, const std::shared_ptr<Throttler> & parent_ = nullptr)
|
||||||
: max_speed(max_speed_), limit_exceeded_exception_message(""), parent(parent_) {}
|
: max_speed(max_speed_), max_burst(max_burst_), limit_exceeded_exception_message(""), tokens(max_burst), parent(parent_) {}
|
||||||
|
|
||||||
|
explicit Throttler(size_t max_speed_, const std::shared_ptr<Throttler> & parent_ = nullptr);
|
||||||
|
|
||||||
|
Throttler(size_t max_speed_, size_t max_burst_, size_t limit_, const char * limit_exceeded_exception_message_,
|
||||||
|
const std::shared_ptr<Throttler> & parent_ = nullptr)
|
||||||
|
: max_speed(max_speed_), max_burst(max_burst_), limit(limit_), limit_exceeded_exception_message(limit_exceeded_exception_message_), tokens(max_burst), parent(parent_) {}
|
||||||
|
|
||||||
Throttler(size_t max_speed_, size_t limit_, const char * limit_exceeded_exception_message_,
|
Throttler(size_t max_speed_, size_t limit_, const char * limit_exceeded_exception_message_,
|
||||||
const std::shared_ptr<Throttler> & parent_ = nullptr)
|
const std::shared_ptr<Throttler> & parent_ = nullptr);
|
||||||
: max_speed(max_speed_), limit(limit_), limit_exceeded_exception_message(limit_exceeded_exception_message_), parent(parent_) {}
|
|
||||||
|
|
||||||
/// Calculates the smoothed speed, sleeps if required and throws exception on
|
/// Use `amount` tokens, sleeps if required or throws exception on limit overflow.
|
||||||
/// limit overflow.
|
|
||||||
void add(size_t amount);
|
void add(size_t amount);
|
||||||
|
|
||||||
/// Not thread safe
|
/// Not thread safe
|
||||||
@ -45,15 +46,14 @@ public:
|
|||||||
|
|
||||||
private:
|
private:
|
||||||
size_t count{0};
|
size_t count{0};
|
||||||
const size_t max_speed{0};
|
const size_t max_speed{0}; /// in tokens per second.
|
||||||
|
const size_t max_burst{0}; /// in tokens.
|
||||||
const uint64_t limit{0}; /// 0 - not limited.
|
const uint64_t limit{0}; /// 0 - not limited.
|
||||||
const char * limit_exceeded_exception_message = nullptr;
|
const char * limit_exceeded_exception_message = nullptr;
|
||||||
std::mutex mutex;
|
std::mutex mutex;
|
||||||
std::atomic<uint64_t> accumulated_sleep{0};
|
std::atomic<uint64_t> accumulated_sleep{0}; // Accumulated sleep time over all waiting threads
|
||||||
/// Smoothed value of current speed. Updated in `add` method.
|
double tokens{0}; /// Amount of tokens available in token bucket. Updated in `add` method.
|
||||||
double smoothed_speed{0};
|
uint64_t prev_ns{0}; /// Previous `add` call time (in nanoseconds).
|
||||||
/// previous `add` call time (in nanoseconds)
|
|
||||||
uint64_t prev_ns{0};
|
|
||||||
|
|
||||||
/// Used to implement a hierarchy of throttlers
|
/// Used to implement a hierarchy of throttlers
|
||||||
std::shared_ptr<Throttler> parent;
|
std::shared_ptr<Throttler> parent;
|
||||||
|
@ -168,9 +168,6 @@ public:
|
|||||||
inline ResultValueType apply(const size_t i) const
|
inline ResultValueType apply(const size_t i) const
|
||||||
{
|
{
|
||||||
const auto a = !!vec[i];
|
const auto a = !!vec[i];
|
||||||
if constexpr (Op::isSaturable())
|
|
||||||
return Op::isSaturatedValue(a) ? a : Op::apply(a, next.apply(i));
|
|
||||||
else
|
|
||||||
return Op::apply(a, next.apply(i));
|
return Op::apply(a, next.apply(i));
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -703,19 +703,26 @@ void AsynchronousMetrics::update(TimePoint update_time)
|
|||||||
Int64 free_memory_in_allocator_arenas = 0;
|
Int64 free_memory_in_allocator_arenas = 0;
|
||||||
|
|
||||||
#if USE_JEMALLOC
|
#if USE_JEMALLOC
|
||||||
/// This is a memory which is kept by allocator.
|
/// According to jemalloc man, pdirty is:
|
||||||
/// Will subsract it from RSS to decrease memory drift.
|
///
|
||||||
|
/// Number of pages within unused extents that are potentially
|
||||||
|
/// dirty, and for which madvise() or similar has not been called.
|
||||||
|
///
|
||||||
|
/// So they will be subtracted from RSS to make accounting more
|
||||||
|
/// accurate, since those pages are not really RSS but a memory
|
||||||
|
/// that can be used at anytime via jemalloc.
|
||||||
free_memory_in_allocator_arenas = je_malloc_pdirty * getPageSize();
|
free_memory_in_allocator_arenas = je_malloc_pdirty * getPageSize();
|
||||||
#endif
|
#endif
|
||||||
|
|
||||||
Int64 difference = rss - free_memory_in_allocator_arenas - amount;
|
Int64 difference = rss - amount;
|
||||||
|
|
||||||
/// Log only if difference is high. This is for convenience. The threshold is arbitrary.
|
/// Log only if difference is high. This is for convenience. The threshold is arbitrary.
|
||||||
if (difference >= 1048576 || difference <= -1048576)
|
if (difference >= 1048576 || difference <= -1048576)
|
||||||
LOG_TRACE(log,
|
LOG_TRACE(log,
|
||||||
"MemoryTracking: was {}, peak {}, will set to {} (RSS), difference: {}",
|
"MemoryTracking: was {}, peak {}, free memory in arenas {}, will set to {} (RSS), difference: {}",
|
||||||
ReadableSize(amount),
|
ReadableSize(amount),
|
||||||
ReadableSize(peak),
|
ReadableSize(peak),
|
||||||
|
ReadableSize(free_memory_in_allocator_arenas),
|
||||||
ReadableSize(rss),
|
ReadableSize(rss),
|
||||||
ReadableSize(difference));
|
ReadableSize(difference));
|
||||||
|
|
||||||
|
@ -295,11 +295,11 @@ namespace
|
|||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
bool parseHosts(IParserBase::Pos & pos, Expected & expected, const String & prefix, AllowedClientHosts & hosts)
|
bool parseHosts(IParserBase::Pos & pos, Expected & expected, std::string_view prefix, AllowedClientHosts & hosts)
|
||||||
{
|
{
|
||||||
return IParserBase::wrapParseImpl(pos, [&]
|
return IParserBase::wrapParseImpl(pos, [&]
|
||||||
{
|
{
|
||||||
if (!prefix.empty() && !ParserKeyword{prefix.c_str()}.ignore(pos, expected))
|
if (!prefix.empty() && !ParserKeyword{prefix}.ignore(pos, expected))
|
||||||
return false;
|
return false;
|
||||||
|
|
||||||
if (!ParserKeyword{"HOST"}.ignore(pos, expected))
|
if (!ParserKeyword{"HOST"}.ignore(pos, expected))
|
||||||
@ -492,7 +492,6 @@ bool ParserCreateUserQuery::parseImpl(Pos & pos, ASTPtr & node, Expected & expec
|
|||||||
|
|
||||||
if (alter)
|
if (alter)
|
||||||
{
|
{
|
||||||
String maybe_new_name;
|
|
||||||
if (!new_name && (names->size() == 1) && parseRenameTo(pos, expected, new_name))
|
if (!new_name && (names->size() == 1) && parseRenameTo(pos, expected, new_name))
|
||||||
continue;
|
continue;
|
||||||
|
|
||||||
|
@ -71,7 +71,7 @@ bool ExecutingGraph::addEdges(uint64_t node)
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Add direct edges form output ports.
|
/// Add direct edges from output ports.
|
||||||
auto & outputs = from->getOutputs();
|
auto & outputs = from->getOutputs();
|
||||||
auto from_output = nodes[node]->direct_edges.size();
|
auto from_output = nodes[node]->direct_edges.size();
|
||||||
|
|
||||||
|
@ -88,6 +88,88 @@ String extractFixedPrefixFromLikePattern(const String & like_pattern, bool perfe
|
|||||||
return fixed_prefix;
|
return fixed_prefix;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/// for "^prefix..." string it returns "prefix"
|
||||||
|
static String extractFixedPrefixFromRegularExpression(const String & regexp)
|
||||||
|
{
|
||||||
|
if (regexp.size() <= 1 || regexp[0] != '^')
|
||||||
|
return {};
|
||||||
|
|
||||||
|
String fixed_prefix;
|
||||||
|
const char * begin = regexp.data() + 1;
|
||||||
|
const char * pos = begin;
|
||||||
|
const char * end = regexp.data() + regexp.size();
|
||||||
|
|
||||||
|
while (pos != end)
|
||||||
|
{
|
||||||
|
switch (*pos)
|
||||||
|
{
|
||||||
|
case '\0':
|
||||||
|
pos = end;
|
||||||
|
break;
|
||||||
|
|
||||||
|
case '\\':
|
||||||
|
{
|
||||||
|
++pos;
|
||||||
|
if (pos == end)
|
||||||
|
break;
|
||||||
|
|
||||||
|
switch (*pos)
|
||||||
|
{
|
||||||
|
case '|':
|
||||||
|
case '(':
|
||||||
|
case ')':
|
||||||
|
case '^':
|
||||||
|
case '$':
|
||||||
|
case '.':
|
||||||
|
case '[':
|
||||||
|
case '?':
|
||||||
|
case '*':
|
||||||
|
case '+':
|
||||||
|
case '{':
|
||||||
|
fixed_prefix += *pos;
|
||||||
|
break;
|
||||||
|
default:
|
||||||
|
/// all other escape sequences are not supported
|
||||||
|
pos = end;
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
|
||||||
|
++pos;
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
|
||||||
|
/// non-trivial cases
|
||||||
|
case '|':
|
||||||
|
fixed_prefix.clear();
|
||||||
|
[[fallthrough]];
|
||||||
|
case '(':
|
||||||
|
case '[':
|
||||||
|
case '^':
|
||||||
|
case '$':
|
||||||
|
case '.':
|
||||||
|
case '+':
|
||||||
|
pos = end;
|
||||||
|
break;
|
||||||
|
|
||||||
|
/// Quantifiers that allow a zero number of occurrences.
|
||||||
|
case '{':
|
||||||
|
case '?':
|
||||||
|
case '*':
|
||||||
|
if (!fixed_prefix.empty())
|
||||||
|
fixed_prefix.pop_back();
|
||||||
|
|
||||||
|
pos = end;
|
||||||
|
break;
|
||||||
|
default:
|
||||||
|
fixed_prefix += *pos;
|
||||||
|
pos++;
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
return fixed_prefix;
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
/** For a given string, get a minimum string that is strictly greater than all strings with this prefix,
|
/** For a given string, get a minimum string that is strictly greater than all strings with this prefix,
|
||||||
* or return an empty string if there are no such strings.
|
* or return an empty string if there are no such strings.
|
||||||
@ -604,6 +686,27 @@ const KeyCondition::AtomMap KeyCondition::atom_map
|
|||||||
return true;
|
return true;
|
||||||
}
|
}
|
||||||
},
|
},
|
||||||
|
{
|
||||||
|
"match",
|
||||||
|
[] (RPNElement & out, const Field & value)
|
||||||
|
{
|
||||||
|
if (value.getType() != Field::Types::String)
|
||||||
|
return false;
|
||||||
|
|
||||||
|
String prefix = extractFixedPrefixFromRegularExpression(value.get<const String &>());
|
||||||
|
if (prefix.empty())
|
||||||
|
return false;
|
||||||
|
|
||||||
|
String right_bound = firstStringThatIsGreaterThanAllStringsWithPrefix(prefix);
|
||||||
|
|
||||||
|
out.function = RPNElement::FUNCTION_IN_RANGE;
|
||||||
|
out.range = !right_bound.empty()
|
||||||
|
? Range(prefix, true, right_bound, false)
|
||||||
|
: Range::createLeftBounded(prefix, true);
|
||||||
|
|
||||||
|
return true;
|
||||||
|
}
|
||||||
|
},
|
||||||
{
|
{
|
||||||
"isNotNull",
|
"isNotNull",
|
||||||
[] (RPNElement & out, const Field &)
|
[] (RPNElement & out, const Field &)
|
||||||
@ -1761,7 +1864,7 @@ bool KeyCondition::tryParseAtomFromAST(const Tree & node, ContextPtr context, Bl
|
|||||||
else if (func_name == "in" || func_name == "notIn" ||
|
else if (func_name == "in" || func_name == "notIn" ||
|
||||||
func_name == "like" || func_name == "notLike" ||
|
func_name == "like" || func_name == "notLike" ||
|
||||||
func_name == "ilike" || func_name == "notIlike" ||
|
func_name == "ilike" || func_name == "notIlike" ||
|
||||||
func_name == "startsWith")
|
func_name == "startsWith" || func_name == "match")
|
||||||
{
|
{
|
||||||
/// "const IN data_column" doesn't make sense (unlike "data_column IN const")
|
/// "const IN data_column" doesn't make sense (unlike "data_column IN const")
|
||||||
return false;
|
return false;
|
||||||
|
@ -2199,6 +2199,7 @@ void MergeTreeData::dropAllData()
|
|||||||
|
|
||||||
LOG_TRACE(log, "dropAllData: removing all data parts from memory.");
|
LOG_TRACE(log, "dropAllData: removing all data parts from memory.");
|
||||||
data_parts_indexes.clear();
|
data_parts_indexes.clear();
|
||||||
|
all_data_dropped = true;
|
||||||
}
|
}
|
||||||
catch (...)
|
catch (...)
|
||||||
{
|
{
|
||||||
@ -3142,7 +3143,7 @@ void MergeTreeData::removePartsInRangeFromWorkingSet(MergeTreeTransaction * txn,
|
|||||||
removePartsInRangeFromWorkingSetAndGetPartsToRemoveFromZooKeeper(txn, drop_range, lock);
|
removePartsInRangeFromWorkingSetAndGetPartsToRemoveFromZooKeeper(txn, drop_range, lock);
|
||||||
}
|
}
|
||||||
|
|
||||||
MergeTreeData::DataPartsVector MergeTreeData::removePartsInRangeFromWorkingSetAndGetPartsToRemoveFromZooKeeper(
|
MergeTreeData::PartsToRemoveFromZooKeeper MergeTreeData::removePartsInRangeFromWorkingSetAndGetPartsToRemoveFromZooKeeper(
|
||||||
MergeTreeTransaction * txn, const MergeTreePartInfo & drop_range, DataPartsLock & lock)
|
MergeTreeTransaction * txn, const MergeTreePartInfo & drop_range, DataPartsLock & lock)
|
||||||
{
|
{
|
||||||
DataPartsVector parts_to_remove;
|
DataPartsVector parts_to_remove;
|
||||||
@ -3220,15 +3221,20 @@ MergeTreeData::DataPartsVector MergeTreeData::removePartsInRangeFromWorkingSetAn
|
|||||||
/// FIXME refactor removePartsFromWorkingSet(...), do not remove parts twice
|
/// FIXME refactor removePartsFromWorkingSet(...), do not remove parts twice
|
||||||
removePartsFromWorkingSet(txn, parts_to_remove, clear_without_timeout, lock);
|
removePartsFromWorkingSet(txn, parts_to_remove, clear_without_timeout, lock);
|
||||||
|
|
||||||
|
/// Since we can return parts in Deleting state, we have to use a wrapper that restricts access to such parts.
|
||||||
|
PartsToRemoveFromZooKeeper parts_to_remove_from_zookeeper;
|
||||||
|
for (auto & part : parts_to_remove)
|
||||||
|
parts_to_remove_from_zookeeper.emplace_back(std::move(part));
|
||||||
|
|
||||||
for (auto & part : inactive_parts_to_remove_immediately)
|
for (auto & part : inactive_parts_to_remove_immediately)
|
||||||
{
|
{
|
||||||
if (!drop_range.contains(part->info))
|
if (!drop_range.contains(part->info))
|
||||||
continue;
|
continue;
|
||||||
part->remove_time.store(0, std::memory_order_relaxed);
|
part->remove_time.store(0, std::memory_order_relaxed);
|
||||||
parts_to_remove.push_back(std::move(part));
|
parts_to_remove_from_zookeeper.emplace_back(std::move(part), /* was_active */ false);
|
||||||
}
|
}
|
||||||
|
|
||||||
return parts_to_remove;
|
return parts_to_remove_from_zookeeper;
|
||||||
}
|
}
|
||||||
|
|
||||||
void MergeTreeData::restoreAndActivatePart(const DataPartPtr & part, DataPartsLock * acquired_lock)
|
void MergeTreeData::restoreAndActivatePart(const DataPartPtr & part, DataPartsLock * acquired_lock)
|
||||||
@ -5176,9 +5182,27 @@ void MergeTreeData::Transaction::rollback()
|
|||||||
buf << ".";
|
buf << ".";
|
||||||
LOG_DEBUG(data.log, "Undoing transaction.{}", buf.str());
|
LOG_DEBUG(data.log, "Undoing transaction.{}", buf.str());
|
||||||
|
|
||||||
|
auto lock = data.lockParts();
|
||||||
|
|
||||||
|
if (data.data_parts_indexes.empty())
|
||||||
|
{
|
||||||
|
/// Table was dropped concurrently and all parts (including PreActive parts) were cleared, so there's nothing to rollback
|
||||||
|
if (!data.all_data_dropped)
|
||||||
|
{
|
||||||
|
Strings part_names;
|
||||||
|
for (const auto & part : precommitted_parts)
|
||||||
|
part_names.emplace_back(part->name);
|
||||||
|
throw Exception(ErrorCodes::LOGICAL_ERROR, "There are some PreActive parts ({}) to rollback, "
|
||||||
|
"but data parts set is empty and table {} was not dropped. It's a bug",
|
||||||
|
fmt::join(part_names, ", "), data.getStorageID().getNameForLogs());
|
||||||
|
}
|
||||||
|
}
|
||||||
|
else
|
||||||
|
{
|
||||||
data.removePartsFromWorkingSet(txn,
|
data.removePartsFromWorkingSet(txn,
|
||||||
DataPartsVector(precommitted_parts.begin(), precommitted_parts.end()),
|
DataPartsVector(precommitted_parts.begin(), precommitted_parts.end()),
|
||||||
/* clear_without_timeout = */ true);
|
/* clear_without_timeout = */ true, &lock);
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
clear();
|
clear();
|
||||||
|
@ -584,10 +584,33 @@ public:
|
|||||||
/// Used in REPLACE PARTITION command.
|
/// Used in REPLACE PARTITION command.
|
||||||
void removePartsInRangeFromWorkingSet(MergeTreeTransaction * txn, const MergeTreePartInfo & drop_range, DataPartsLock & lock);
|
void removePartsInRangeFromWorkingSet(MergeTreeTransaction * txn, const MergeTreePartInfo & drop_range, DataPartsLock & lock);
|
||||||
|
|
||||||
|
/// This wrapper is required to restrict access to parts in Deleting state
|
||||||
|
class PartToRemoveFromZooKeeper
|
||||||
|
{
|
||||||
|
DataPartPtr part;
|
||||||
|
bool was_active;
|
||||||
|
|
||||||
|
public:
|
||||||
|
explicit PartToRemoveFromZooKeeper(DataPartPtr && part_, bool was_active_ = true)
|
||||||
|
: part(std::move(part_)), was_active(was_active_)
|
||||||
|
{
|
||||||
|
}
|
||||||
|
|
||||||
|
/// It's safe to get name of any part
|
||||||
|
const String & getPartName() const { return part->name; }
|
||||||
|
|
||||||
|
DataPartPtr getPartIfItWasActive() const
|
||||||
|
{
|
||||||
|
return was_active ? part : nullptr;
|
||||||
|
}
|
||||||
|
};
|
||||||
|
|
||||||
|
using PartsToRemoveFromZooKeeper = std::vector<PartToRemoveFromZooKeeper>;
|
||||||
|
|
||||||
/// Same as above, but also returns list of parts to remove from ZooKeeper.
|
/// Same as above, but also returns list of parts to remove from ZooKeeper.
|
||||||
/// It includes parts that have been just removed by these method
|
/// It includes parts that have been just removed by these method
|
||||||
/// and Outdated parts covered by drop_range that were removed earlier for any reason.
|
/// and Outdated parts covered by drop_range that were removed earlier for any reason.
|
||||||
DataPartsVector removePartsInRangeFromWorkingSetAndGetPartsToRemoveFromZooKeeper(
|
PartsToRemoveFromZooKeeper removePartsInRangeFromWorkingSetAndGetPartsToRemoveFromZooKeeper(
|
||||||
MergeTreeTransaction * txn, const MergeTreePartInfo & drop_range, DataPartsLock & lock);
|
MergeTreeTransaction * txn, const MergeTreePartInfo & drop_range, DataPartsLock & lock);
|
||||||
|
|
||||||
/// Restores Outdated part and adds it to working set
|
/// Restores Outdated part and adds it to working set
|
||||||
@ -640,6 +663,9 @@ public:
|
|||||||
/// Deletes the data directory and flushes the uncompressed blocks cache and the marks cache.
|
/// Deletes the data directory and flushes the uncompressed blocks cache and the marks cache.
|
||||||
void dropAllData();
|
void dropAllData();
|
||||||
|
|
||||||
|
/// This flag is for hardening and assertions.
|
||||||
|
bool all_data_dropped = false;
|
||||||
|
|
||||||
/// Drop data directories if they are empty. It is safe to call this method if table creation was unsuccessful.
|
/// Drop data directories if they are empty. It is safe to call this method if table creation was unsuccessful.
|
||||||
void dropIfEmpty();
|
void dropIfEmpty();
|
||||||
|
|
||||||
|
@ -1827,7 +1827,7 @@ void StorageReplicatedMergeTree::executeDropRange(const LogEntry & entry)
|
|||||||
/// Therefore, we use all data parts.
|
/// Therefore, we use all data parts.
|
||||||
|
|
||||||
auto metadata_snapshot = getInMemoryMetadataPtr();
|
auto metadata_snapshot = getInMemoryMetadataPtr();
|
||||||
DataPartsVector parts_to_remove;
|
PartsToRemoveFromZooKeeper parts_to_remove;
|
||||||
{
|
{
|
||||||
auto data_parts_lock = lockParts();
|
auto data_parts_lock = lockParts();
|
||||||
parts_to_remove = removePartsInRangeFromWorkingSetAndGetPartsToRemoveFromZooKeeper(NO_TRANSACTION_RAW, drop_range_info, data_parts_lock);
|
parts_to_remove = removePartsInRangeFromWorkingSetAndGetPartsToRemoveFromZooKeeper(NO_TRANSACTION_RAW, drop_range_info, data_parts_lock);
|
||||||
@ -1849,8 +1849,11 @@ void StorageReplicatedMergeTree::executeDropRange(const LogEntry & entry)
|
|||||||
/// If DETACH clone parts to detached/ directory
|
/// If DETACH clone parts to detached/ directory
|
||||||
for (const auto & part : parts_to_remove)
|
for (const auto & part : parts_to_remove)
|
||||||
{
|
{
|
||||||
LOG_INFO(log, "Detaching {}", part->getDataPartStorage().getPartDirectory());
|
if (auto part_to_detach = part.getPartIfItWasActive())
|
||||||
part->makeCloneInDetached("", metadata_snapshot);
|
{
|
||||||
|
LOG_INFO(log, "Detaching {}", part_to_detach->getDataPartStorage().getPartDirectory());
|
||||||
|
part_to_detach->makeCloneInDetached("", metadata_snapshot);
|
||||||
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -1941,7 +1944,7 @@ bool StorageReplicatedMergeTree::executeReplaceRange(const LogEntry & entry)
|
|||||||
|
|
||||||
PartDescriptions all_parts;
|
PartDescriptions all_parts;
|
||||||
PartDescriptions parts_to_add;
|
PartDescriptions parts_to_add;
|
||||||
DataPartsVector parts_to_remove;
|
PartsToRemoveFromZooKeeper parts_to_remove;
|
||||||
|
|
||||||
auto table_lock_holder_dst_table = lockForShare(
|
auto table_lock_holder_dst_table = lockForShare(
|
||||||
RWLockImpl::NO_QUERY, getSettings()->lock_acquire_timeout_for_background_operations);
|
RWLockImpl::NO_QUERY, getSettings()->lock_acquire_timeout_for_background_operations);
|
||||||
@ -1972,7 +1975,7 @@ bool StorageReplicatedMergeTree::executeReplaceRange(const LogEntry & entry)
|
|||||||
String parts_to_remove_str;
|
String parts_to_remove_str;
|
||||||
for (const auto & part : parts_to_remove)
|
for (const auto & part : parts_to_remove)
|
||||||
{
|
{
|
||||||
parts_to_remove_str += part->name;
|
parts_to_remove_str += part.getPartName();
|
||||||
parts_to_remove_str += " ";
|
parts_to_remove_str += " ";
|
||||||
}
|
}
|
||||||
LOG_TRACE(log, "Replacing {} parts {}with empty set", parts_to_remove.size(), parts_to_remove_str);
|
LOG_TRACE(log, "Replacing {} parts {}with empty set", parts_to_remove.size(), parts_to_remove_str);
|
||||||
@ -2248,7 +2251,7 @@ bool StorageReplicatedMergeTree::executeReplaceRange(const LogEntry & entry)
|
|||||||
String parts_to_remove_str;
|
String parts_to_remove_str;
|
||||||
for (const auto & part : parts_to_remove)
|
for (const auto & part : parts_to_remove)
|
||||||
{
|
{
|
||||||
parts_to_remove_str += part->name;
|
parts_to_remove_str += part.getPartName();
|
||||||
parts_to_remove_str += " ";
|
parts_to_remove_str += " ";
|
||||||
}
|
}
|
||||||
LOG_TRACE(log, "Replacing {} parts {}with {} parts {}", parts_to_remove.size(), parts_to_remove_str,
|
LOG_TRACE(log, "Replacing {} parts {}with {} parts {}", parts_to_remove.size(), parts_to_remove_str,
|
||||||
@ -6230,11 +6233,11 @@ void StorageReplicatedMergeTree::clearOldPartsAndRemoveFromZK()
|
|||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
void StorageReplicatedMergeTree::removePartsFromZooKeeperWithRetries(DataPartsVector & parts, size_t max_retries)
|
void StorageReplicatedMergeTree::removePartsFromZooKeeperWithRetries(PartsToRemoveFromZooKeeper & parts, size_t max_retries)
|
||||||
{
|
{
|
||||||
Strings part_names_to_remove;
|
Strings part_names_to_remove;
|
||||||
for (const auto & part : parts)
|
for (const auto & part : parts)
|
||||||
part_names_to_remove.emplace_back(part->name);
|
part_names_to_remove.emplace_back(part.getPartName());
|
||||||
|
|
||||||
return removePartsFromZooKeeperWithRetries(part_names_to_remove, max_retries);
|
return removePartsFromZooKeeperWithRetries(part_names_to_remove, max_retries);
|
||||||
}
|
}
|
||||||
@ -6561,7 +6564,7 @@ void StorageReplicatedMergeTree::replacePartitionFrom(
|
|||||||
if (replace)
|
if (replace)
|
||||||
clearBlocksInPartition(*zookeeper, drop_range.partition_id, drop_range.max_block, drop_range.max_block);
|
clearBlocksInPartition(*zookeeper, drop_range.partition_id, drop_range.max_block, drop_range.max_block);
|
||||||
|
|
||||||
DataPartsVector parts_to_remove;
|
PartsToRemoveFromZooKeeper parts_to_remove;
|
||||||
Coordination::Responses op_results;
|
Coordination::Responses op_results;
|
||||||
|
|
||||||
try
|
try
|
||||||
@ -6797,7 +6800,7 @@ void StorageReplicatedMergeTree::movePartitionToTable(const StoragePtr & dest_ta
|
|||||||
|
|
||||||
clearBlocksInPartition(*zookeeper, drop_range.partition_id, drop_range.max_block, drop_range.max_block);
|
clearBlocksInPartition(*zookeeper, drop_range.partition_id, drop_range.max_block, drop_range.max_block);
|
||||||
|
|
||||||
DataPartsVector parts_to_remove;
|
PartsToRemoveFromZooKeeper parts_to_remove;
|
||||||
Coordination::Responses op_results;
|
Coordination::Responses op_results;
|
||||||
|
|
||||||
try
|
try
|
||||||
|
@ -549,7 +549,7 @@ private:
|
|||||||
|
|
||||||
/// Remove parts from ZooKeeper, throw exception if unable to do so after max_retries.
|
/// Remove parts from ZooKeeper, throw exception if unable to do so after max_retries.
|
||||||
void removePartsFromZooKeeperWithRetries(const Strings & part_names, size_t max_retries = 5);
|
void removePartsFromZooKeeperWithRetries(const Strings & part_names, size_t max_retries = 5);
|
||||||
void removePartsFromZooKeeperWithRetries(DataPartsVector & parts, size_t max_retries = 5);
|
void removePartsFromZooKeeperWithRetries(PartsToRemoveFromZooKeeper & parts, size_t max_retries = 5);
|
||||||
|
|
||||||
/// Removes a part from ZooKeeper and adds a task to the queue to download it. It is supposed to do this with broken parts.
|
/// Removes a part from ZooKeeper and adds a task to the queue to download it. It is supposed to do this with broken parts.
|
||||||
void removePartAndEnqueueFetch(const String & part_name);
|
void removePartAndEnqueueFetch(const String & part_name);
|
||||||
|
@ -1,7 +1,7 @@
|
|||||||
-- Tags: shard
|
-- Tags: shard
|
||||||
|
|
||||||
-- Limit to 10 MB/sec
|
-- Limit to 100 KB/sec
|
||||||
SET max_network_bandwidth = 10000000;
|
SET max_network_bandwidth = 100000;
|
||||||
|
|
||||||
-- Lower max_block_size, so we can start throttling sooner. Otherwise query will be executed too quickly.
|
-- Lower max_block_size, so we can start throttling sooner. Otherwise query will be executed too quickly.
|
||||||
SET max_block_size = 100;
|
SET max_block_size = 100;
|
||||||
@ -11,7 +11,7 @@ CREATE TEMPORARY TABLE times (t DateTime);
|
|||||||
-- rand64 is uncompressable data. Each number will take 8 bytes of bandwidth.
|
-- rand64 is uncompressable data. Each number will take 8 bytes of bandwidth.
|
||||||
-- This query should execute in no less than 1.6 seconds if throttled.
|
-- This query should execute in no less than 1.6 seconds if throttled.
|
||||||
INSERT INTO times SELECT now();
|
INSERT INTO times SELECT now();
|
||||||
SELECT sum(ignore(*)) FROM (SELECT rand64() FROM remote('127.0.0.{2,3}', numbers(2000000)));
|
SELECT sum(ignore(*)) FROM (SELECT rand64() FROM remote('127.0.0.{2,3}', numbers(20000)));
|
||||||
INSERT INTO times SELECT now();
|
INSERT INTO times SELECT now();
|
||||||
|
|
||||||
SELECT max(t) - min(t) >= 1 FROM times;
|
SELECT max(t) - min(t) >= 1 FROM times;
|
||||||
|
@ -0,0 +1,5 @@
|
|||||||
|
4
|
||||||
|
1
|
||||||
|
3
|
||||||
|
4
|
||||||
|
4
|
9
tests/queries/0_stateless/02462_match_regexp_pk.sql
Normal file
9
tests/queries/0_stateless/02462_match_regexp_pk.sql
Normal file
@ -0,0 +1,9 @@
|
|||||||
|
CREATE TABLE mt_match_pk (v String) ENGINE = MergeTree ORDER BY v SETTINGS index_granularity = 1;
|
||||||
|
INSERT INTO mt_match_pk VALUES ('a'), ('aaa'), ('aba'), ('bac'), ('acccca');
|
||||||
|
|
||||||
|
SET force_primary_key = 1;
|
||||||
|
SELECT count() FROM mt_match_pk WHERE match(v, '^a');
|
||||||
|
SELECT count() FROM mt_match_pk WHERE match(v, '^ab');
|
||||||
|
SELECT count() FROM mt_match_pk WHERE match(v, '^a.');
|
||||||
|
SELECT count() FROM mt_match_pk WHERE match(v, '^ab*');
|
||||||
|
SELECT count() FROM mt_match_pk WHERE match(v, '^ac?');
|
@ -0,0 +1 @@
|
|||||||
|
EXPLAIN AST ALTER user WITH a; -- { clientError SYNTAX_ERROR }
|
Loading…
Reference in New Issue
Block a user