Merged with master

This commit is contained in:
Vxider 2022-05-12 23:04:07 +08:00
commit dc33c3a9bb
19 changed files with 290 additions and 62 deletions

View File

@ -2,7 +2,7 @@
[SSL 'strict' option](../server-configuration-parameters/settings.md#server_configuration_parameters-openssl) enables mandatory certificate validation for the incoming connections. In this case, only connections with trusted certificates can be established. Connections with untrusted certificates will be rejected. Thus, certificate validation allows to uniquely authenticate an incoming connection. `Common Name` field of the certificate is used to identify connected user. This allows to associate multiple certificates with the same user. Additionally, reissuing and revoking of the certificates does not affect the ClickHouse configuration. [SSL 'strict' option](../server-configuration-parameters/settings.md#server_configuration_parameters-openssl) enables mandatory certificate validation for the incoming connections. In this case, only connections with trusted certificates can be established. Connections with untrusted certificates will be rejected. Thus, certificate validation allows to uniquely authenticate an incoming connection. `Common Name` field of the certificate is used to identify connected user. This allows to associate multiple certificates with the same user. Additionally, reissuing and revoking of the certificates does not affect the ClickHouse configuration.
To enable SSL certificate authentication, a list of `Common Name`'s for each ClickHouse user must be sspecified in the settings file `config.xml `: To enable SSL certificate authentication, a list of `Common Name`'s for each ClickHouse user must be specified in the settings file `users.xml `:
**Example** **Example**
```xml ```xml
@ -10,11 +10,11 @@ To enable SSL certificate authentication, a list of `Common Name`'s for each Cli
<!- ... --> <!- ... -->
<users> <users>
<user_name> <user_name>
<certificates> <ssl_certificates>
<common_name>host.domain.com:example_user</common_name> <common_name>host.domain.com:example_user</common_name>
<common_name>host.domain.com:example_user_dev</common_name> <common_name>host.domain.com:example_user_dev</common_name>
<!-- More names --> <!-- More names -->
</certificates> </ssl_certificates>
<!-- Other settings --> <!-- Other settings -->
</user_name> </user_name>
</users> </users>

View File

@ -1745,3 +1745,13 @@ Possible values:
- Positive integer. - Positive integer.
Default value: `10000`. Default value: `10000`.
## global_memory_usage_overcommit_max_wait_microseconds {#global_memory_usage_overcommit_max_wait_microseconds}
Sets maximum waiting time for global overcommit tracker.
Possible values:
- Positive integer.
Default value: `200`.

View File

@ -0,0 +1,37 @@
# Memory overcommit
Memory overcommit is an experimental technique intended to allow to set more flexible memory limits for queries.
The idea of this technique is to introduce settings which can represent guaranteed amount of memory a query can use.
When memory overcommit is enabled and the memory limit is reached ClickHouse will select the most overcommitted query and try to free memory by killing this query.
When memory limit is reached any query will wait some time during attempt to allocate new memory.
If timeout is passed and memory is freed, the query continues execution.
Otherwise an exception will be thrown and the query is killed.
Selection of query to stop or kill is performed by either global or user overcommit trackers depending on what memory limit is reached.
If overcommit tracker can't choose query to stop, MEMORY_LIMIT_EXCEEDED exception is thrown.
## User overcommit tracker
User overcommit tracker finds a query with the biggest overcommit ratio in the user's query list.
Overcommit ratio for a query is computed as number of allocated bytes divided by value of `memory_overcommit_ratio_denominator` setting.
If `memory_overcommit_ratio_denominator` for the query is equals to zero, overcommit tracker won't choose this query.
Waiting timeout is set by `memory_usage_overcommit_max_wait_microseconds` setting.
**Example**
```sql
SELECT number FROM numbers(1000) GROUP BY number SETTINGS memory_overcommit_ratio_denominator=4000, memory_usage_overcommit_max_wait_microseconds=500
```
## Global overcommit tracker
Global overcommit tracker finds a query with the biggest overcommit ratio in the list of all queries.
In this case overcommit ratio is computed as number of allocated bytes divided by value of `memory_overcommit_ratio_denominator_for_user` setting.
If `memory_overcommit_ratio_denominator_for_user` for the query is equals to zero, overcommit tracker won't choose this query.
Waiting timeout is set by `global_memory_usage_overcommit_max_wait_microseconds` parameter in the configuration file.

View File

@ -4263,3 +4263,29 @@ Possible values:
- 1 — Enabled. - 1 — Enabled.
Default value: 1. Default value: 1.
## memory_overcommit_ratio_denominator
It represents soft memory limit in case when hard limit is reached on user level.
This value is used to compute overcommit ratio for the query.
Zero means skip the query.
Read more about [memory overcommit](memory-overcommit.md).
Default value: `1GiB`.
## memory_usage_overcommit_max_wait_microseconds
Maximum time thread will wait for memory to be freed in the case of memory overcommit on a user level.
If the timeout is reached and memory is not freed, an exception is thrown.
Read more about [memory overcommit](memory-overcommit.md).
Default value: `200`.
## memory_overcommit_ratio_denominator_for_user
It represents soft memory limit in case when hard limit is reached on global level.
This value is used to compute overcommit ratio for the query.
Zero means skip the query.
Read more about [memory overcommit](memory-overcommit.md).
Default value: `1GiB`.

View File

@ -67,11 +67,11 @@ namespace
size_t num_password_fields = has_no_password + has_password_plaintext + has_password_sha256_hex + has_password_double_sha1_hex + has_ldap + has_kerberos + has_certificates; size_t num_password_fields = has_no_password + has_password_plaintext + has_password_sha256_hex + has_password_double_sha1_hex + has_ldap + has_kerberos + has_certificates;
if (num_password_fields > 1) if (num_password_fields > 1)
throw Exception("More than one field of 'password', 'password_sha256_hex', 'password_double_sha1_hex', 'no_password', 'ldap', 'kerberos', 'certificates' are used to specify authentication info for user " + user_name + ". Must be only one of them.", throw Exception("More than one field of 'password', 'password_sha256_hex', 'password_double_sha1_hex', 'no_password', 'ldap', 'kerberos', 'ssl_certificates' are used to specify authentication info for user " + user_name + ". Must be only one of them.",
ErrorCodes::BAD_ARGUMENTS); ErrorCodes::BAD_ARGUMENTS);
if (num_password_fields < 1) if (num_password_fields < 1)
throw Exception("Either 'password' or 'password_sha256_hex' or 'password_double_sha1_hex' or 'no_password' or 'ldap' or 'kerberos' or 'certificates' must be specified for user " + user_name + ".", ErrorCodes::BAD_ARGUMENTS); throw Exception("Either 'password' or 'password_sha256_hex' or 'password_double_sha1_hex' or 'no_password' or 'ldap' or 'kerberos' or 'ssl_certificates' must be specified for user " + user_name + ".", ErrorCodes::BAD_ARGUMENTS);
if (has_password_plaintext) if (has_password_plaintext)
{ {

View File

@ -22,7 +22,7 @@ namespace DB
{ {
class IColumn; class IColumn;
static constexpr UInt64 operator""_Gb(unsigned long long value) static constexpr UInt64 operator""_GiB(unsigned long long value)
{ {
return value * 1024 * 1024 * 1024; return value * 1024 * 1024 * 1024;
} }
@ -362,14 +362,14 @@ static constexpr UInt64 operator""_Gb(unsigned long long value)
M(OverflowMode, distinct_overflow_mode, OverflowMode::THROW, "What to do when the limit is exceeded.", 0) \ M(OverflowMode, distinct_overflow_mode, OverflowMode::THROW, "What to do when the limit is exceeded.", 0) \
\ \
M(UInt64, max_memory_usage, 0, "Maximum memory usage for processing of single query. Zero means unlimited.", 0) \ M(UInt64, max_memory_usage, 0, "Maximum memory usage for processing of single query. Zero means unlimited.", 0) \
M(UInt64, max_guaranteed_memory_usage, 10_Gb, "Maximum guaranteed memory usage for processing of single query. It represents soft limit. Zero means unlimited.", 0) \ M(UInt64, memory_overcommit_ratio_denominator, 1_GiB, "It represents soft memory limit on the user level. This value is used to compute query overcommit ratio.", 0) \
M(UInt64, max_memory_usage_for_user, 0, "Maximum memory usage for processing all concurrently running queries for the user. Zero means unlimited.", 0) \ M(UInt64, max_memory_usage_for_user, 0, "Maximum memory usage for processing all concurrently running queries for the user. Zero means unlimited.", 0) \
M(UInt64, max_guaranteed_memory_usage_for_user, 10_Gb, "Maximum guaranteed memory usage for processing all concurrently running queries for the user. It represents soft limit. Zero means unlimited.", 0) \ M(UInt64, memory_overcommit_ratio_denominator_for_user, 1_GiB, "It represents soft memory limit on the global level. This value is used to compute query overcommit ratio.", 0) \
M(UInt64, max_untracked_memory, (4 * 1024 * 1024), "Small allocations and deallocations are grouped in thread local variable and tracked or profiled only when amount (in absolute value) becomes larger than specified value. If the value is higher than 'memory_profiler_step' it will be effectively lowered to 'memory_profiler_step'.", 0) \ M(UInt64, max_untracked_memory, (4 * 1024 * 1024), "Small allocations and deallocations are grouped in thread local variable and tracked or profiled only when amount (in absolute value) becomes larger than specified value. If the value is higher than 'memory_profiler_step' it will be effectively lowered to 'memory_profiler_step'.", 0) \
M(UInt64, memory_profiler_step, (4 * 1024 * 1024), "Whenever query memory usage becomes larger than every next step in number of bytes the memory profiler will collect the allocating stack trace. Zero means disabled memory profiler. Values lower than a few megabytes will slow down query processing.", 0) \ M(UInt64, memory_profiler_step, (4 * 1024 * 1024), "Whenever query memory usage becomes larger than every next step in number of bytes the memory profiler will collect the allocating stack trace. Zero means disabled memory profiler. Values lower than a few megabytes will slow down query processing.", 0) \
M(Float, memory_profiler_sample_probability, 0., "Collect random allocations and deallocations and write them into system.trace_log with 'MemorySample' trace_type. The probability is for every alloc/free regardless to the size of the allocation. Note that sampling happens only when the amount of untracked memory exceeds 'max_untracked_memory'. You may want to set 'max_untracked_memory' to 0 for extra fine grained sampling.", 0) \ M(Float, memory_profiler_sample_probability, 0., "Collect random allocations and deallocations and write them into system.trace_log with 'MemorySample' trace_type. The probability is for every alloc/free regardless to the size of the allocation. Note that sampling happens only when the amount of untracked memory exceeds 'max_untracked_memory'. You may want to set 'max_untracked_memory' to 0 for extra fine grained sampling.", 0) \
\ \
M(UInt64, memory_usage_overcommit_max_wait_microseconds, 0, "Maximum time thread will wait for memory to be freed in the case of memory overcommit. If timeout is reached and memory is not freed, exception is thrown", 0) \ M(UInt64, memory_usage_overcommit_max_wait_microseconds, 200, "Maximum time thread will wait for memory to be freed in the case of memory overcommit on user level. If timeout is reached and memory is not freed, exception is thrown.", 0) \
\ \
M(UInt64, max_network_bandwidth, 0, "The maximum speed of data exchange over the network in bytes per second for a query. Zero means unlimited.", 0) \ M(UInt64, max_network_bandwidth, 0, "The maximum speed of data exchange over the network in bytes per second for a query. Zero means unlimited.", 0) \
M(UInt64, max_network_bytes, 0, "The maximum number of bytes (compressed) to receive or transmit over the network for execution of the query.", 0) \ M(UInt64, max_network_bytes, 0, "The maximum number of bytes (compressed) to receive or transmit over the network for execution of the query.", 0) \

View File

@ -16,25 +16,27 @@
#include <Parsers/IAST_fwd.h> #include <Parsers/IAST_fwd.h>
#include <Parsers/parseQuery.h> #include <Parsers/parseQuery.h>
#include <Common/Exception.h> #include <Common/Exception.h>
#include <Common/WeakHash.h>
#include <Common/typeid_cast.h> #include <Common/typeid_cast.h>
namespace DB namespace DB
{ {
namespace ErrorCodes namespace ErrorCodes
{ {
extern const int LOGICAL_ERROR; extern const int LOGICAL_ERROR;
extern const int SET_SIZE_LIMIT_EXCEEDED; extern const int SET_SIZE_LIMIT_EXCEEDED;
extern const int BAD_ARGUMENTS; extern const int BAD_ARGUMENTS;
} }
namespace JoinStuff
{
ConcurrentHashJoin::ConcurrentHashJoin(ContextPtr context_, std::shared_ptr<TableJoin> table_join_, size_t slots_, const Block & right_sample_block, bool any_take_last_row_) ConcurrentHashJoin::ConcurrentHashJoin(ContextPtr context_, std::shared_ptr<TableJoin> table_join_, size_t slots_, const Block & right_sample_block, bool any_take_last_row_)
: context(context_) : context(context_)
, table_join(table_join_) , table_join(table_join_)
, slots(slots_) , slots(slots_)
{ {
if (!slots_ || slots_ >= 256) if (slots < 1 || 255 < slots)
{ {
throw Exception(ErrorCodes::BAD_ARGUMENTS, "Invalid argument slot : {}", slots_); throw Exception(ErrorCodes::BAD_ARGUMENTS, "Number of slots should be [1, 255], got {}", slots);
} }
for (size_t i = 0; i < slots; ++i) for (size_t i = 0; i < slots; ++i)
@ -43,36 +45,44 @@ ConcurrentHashJoin::ConcurrentHashJoin(ContextPtr context_, std::shared_ptr<Tabl
inner_hash_join->data = std::make_unique<HashJoin>(table_join_, right_sample_block, any_take_last_row_); inner_hash_join->data = std::make_unique<HashJoin>(table_join_, right_sample_block, any_take_last_row_);
hash_joins.emplace_back(std::move(inner_hash_join)); hash_joins.emplace_back(std::move(inner_hash_join));
} }
} }
bool ConcurrentHashJoin::addJoinedBlock(const Block & block, bool check_limits) bool ConcurrentHashJoin::addJoinedBlock(const Block & right_block, bool check_limits)
{ {
Blocks dispatched_blocks = dispatchBlock(table_join->getOnlyClause().key_names_right, block); Blocks dispatched_blocks = dispatchBlock(table_join->getOnlyClause().key_names_right, right_block);
std::list<size_t> pending_blocks; size_t blocks_left = 0;
for (const auto & block : dispatched_blocks)
{
if (block)
{
++blocks_left;
}
}
while (blocks_left > 0)
{
/// insert blocks into corresponding HashJoin instances
for (size_t i = 0; i < dispatched_blocks.size(); ++i) for (size_t i = 0; i < dispatched_blocks.size(); ++i)
pending_blocks.emplace_back(i);
while (!pending_blocks.empty())
{ {
for (auto iter = pending_blocks.begin(); iter != pending_blocks.end();)
{
auto & i = *iter;
auto & hash_join = hash_joins[i]; auto & hash_join = hash_joins[i];
auto & dispatched_block = dispatched_blocks[i]; auto & dispatched_block = dispatched_blocks[i];
if (hash_join->mutex.try_lock())
if (dispatched_block)
{ {
if (!hash_join->data->addJoinedBlock(dispatched_block, check_limits)) /// if current hash_join is already processed by another thread, skip it and try later
{ std::unique_lock<std::mutex> lock(hash_join->mutex, std::try_to_lock);
hash_join->mutex.unlock(); if (!lock.owns_lock())
continue;
bool limit_exceeded = !hash_join->data->addJoinedBlock(dispatched_block, check_limits);
dispatched_block = {};
blocks_left--;
if (limit_exceeded)
return false; return false;
} }
hash_join->mutex.unlock();
iter = pending_blocks.erase(iter);
}
else
iter++;
} }
} }
@ -161,30 +171,32 @@ std::shared_ptr<NotJoinedBlocks> ConcurrentHashJoin::getNonJoinedBlocks(
throw Exception(ErrorCodes::LOGICAL_ERROR, "Invalid join type. join kind: {}, strictness: {}", table_join->kind(), table_join->strictness()); throw Exception(ErrorCodes::LOGICAL_ERROR, "Invalid join type. join kind: {}, strictness: {}", table_join->kind(), table_join->strictness());
} }
static IColumn::Selector hashToSelector(const WeakHash32 & hash, size_t num_shards)
{
const auto & data = hash.getData();
size_t num_rows = data.size();
IColumn::Selector selector(num_rows);
for (size_t i = 0; i < num_rows; ++i)
selector[i] = data[i] % num_shards;
return selector;
}
Blocks ConcurrentHashJoin::dispatchBlock(const Strings & key_columns_names, const Block & from_block) Blocks ConcurrentHashJoin::dispatchBlock(const Strings & key_columns_names, const Block & from_block)
{ {
Blocks result;
size_t num_shards = hash_joins.size(); size_t num_shards = hash_joins.size();
size_t num_rows = from_block.rows(); size_t num_rows = from_block.rows();
size_t num_cols = from_block.columns(); size_t num_cols = from_block.columns();
ColumnRawPtrs key_cols; WeakHash32 hash(num_rows);
for (const auto & key_name : key_columns_names) for (const auto & key_name : key_columns_names)
{ {
key_cols.push_back(from_block.getByName(key_name).column.get()); const auto & key_col = from_block.getByName(key_name).column;
} key_col->updateWeakHash32(hash);
IColumn::Selector selector(num_rows);
for (size_t i = 0; i < num_rows; ++i)
{
SipHash hash;
for (const auto & key_col : key_cols)
{
key_col->updateHashWithValue(i, hash);
}
selector[i] = hash.get64() % num_shards;
} }
auto selector = hashToSelector(hash, num_shards);
Blocks result;
for (size_t i = 0; i < num_shards; ++i) for (size_t i = 0; i < num_shards; ++i)
{ {
result.emplace_back(from_block.cloneEmpty()); result.emplace_back(from_block.cloneEmpty());
@ -203,4 +215,3 @@ Blocks ConcurrentHashJoin::dispatchBlock(const Strings & key_columns_names, cons
} }
} }
}

View File

@ -15,8 +15,7 @@
namespace DB namespace DB
{ {
namespace JoinStuff
{
/** /**
* Can run addJoinedBlock() parallelly to speedup the join process. On test, it almose linear speedup by * Can run addJoinedBlock() parallelly to speedup the join process. On test, it almose linear speedup by
* the degree of parallelism. * the degree of parallelism.
@ -33,6 +32,7 @@ namespace JoinStuff
*/ */
class ConcurrentHashJoin : public IJoin class ConcurrentHashJoin : public IJoin
{ {
public: public:
explicit ConcurrentHashJoin(ContextPtr context_, std::shared_ptr<TableJoin> table_join_, size_t slots_, const Block & right_sample_block, bool any_take_last_row_ = false); explicit ConcurrentHashJoin(ContextPtr context_, std::shared_ptr<TableJoin> table_join_, size_t slots_, const Block & right_sample_block, bool any_take_last_row_ = false);
~ConcurrentHashJoin() override = default; ~ConcurrentHashJoin() override = default;
@ -49,6 +49,7 @@ public:
bool supportParallelJoin() const override { return true; } bool supportParallelJoin() const override { return true; }
std::shared_ptr<NotJoinedBlocks> std::shared_ptr<NotJoinedBlocks>
getNonJoinedBlocks(const Block & left_sample_block, const Block & result_sample_block, UInt64 max_block_size) const override; getNonJoinedBlocks(const Block & left_sample_block, const Block & result_sample_block, UInt64 max_block_size) const override;
private: private:
struct InternalHashJoin struct InternalHashJoin
{ {
@ -71,5 +72,5 @@ private:
Blocks dispatchBlock(const Strings & key_columns_names, const Block & from_block); Blocks dispatchBlock(const Strings & key_columns_names, const Block & from_block);
}; };
}
} }

View File

@ -1020,7 +1020,7 @@ static std::shared_ptr<IJoin> chooseJoinAlgorithm(std::shared_ptr<TableJoin> ana
{ {
if (analyzed_join->allowParallelHashJoin()) if (analyzed_join->allowParallelHashJoin())
{ {
return std::make_shared<JoinStuff::ConcurrentHashJoin>(context, analyzed_join, context->getSettings().max_threads, sample_block); return std::make_shared<ConcurrentHashJoin>(context, analyzed_join, context->getSettings().max_threads, sample_block);
} }
return std::make_shared<HashJoin>(analyzed_join, sample_block); return std::make_shared<HashJoin>(analyzed_join, sample_block);
} }

View File

@ -212,7 +212,7 @@ ProcessList::EntryPtr ProcessList::insert(const String & query_, const IAST * as
/// Set query-level memory trackers /// Set query-level memory trackers
thread_group->memory_tracker.setOrRaiseHardLimit(settings.max_memory_usage); thread_group->memory_tracker.setOrRaiseHardLimit(settings.max_memory_usage);
thread_group->memory_tracker.setSoftLimit(settings.max_guaranteed_memory_usage); thread_group->memory_tracker.setSoftLimit(settings.memory_overcommit_ratio_denominator);
if (query_context->hasTraceCollector()) if (query_context->hasTraceCollector())
{ {
@ -242,7 +242,7 @@ ProcessList::EntryPtr ProcessList::insert(const String & query_, const IAST * as
/// Track memory usage for all simultaneously running queries from single user. /// Track memory usage for all simultaneously running queries from single user.
user_process_list.user_memory_tracker.setOrRaiseHardLimit(settings.max_memory_usage_for_user); user_process_list.user_memory_tracker.setOrRaiseHardLimit(settings.max_memory_usage_for_user);
user_process_list.user_memory_tracker.setSoftLimit(settings.max_guaranteed_memory_usage_for_user); user_process_list.user_memory_tracker.setSoftLimit(settings.memory_overcommit_ratio_denominator_for_user);
user_process_list.user_memory_tracker.setDescription("(for user)"); user_process_list.user_memory_tracker.setDescription("(for user)");
user_process_list.user_overcommit_tracker.setMaxWaitTime(settings.memory_usage_overcommit_max_wait_microseconds); user_process_list.user_overcommit_tracker.setMaxWaitTime(settings.memory_usage_overcommit_max_wait_microseconds);

View File

@ -347,7 +347,7 @@ std::unique_ptr<QueryPipelineBuilder> QueryPipelineBuilder::joinPipelines(
/// ╞> FillingJoin ─> Resize ╣ ╞> Joining ─> (totals) /// ╞> FillingJoin ─> Resize ╣ ╞> Joining ─> (totals)
/// (totals) ─────────┘ ╙─────┘ /// (totals) ─────────┘ ╙─────┘
auto num_streams = left->getNumStreams(); size_t num_streams = left->getNumStreams();
if (join->supportParallelJoin() && !right->hasTotals()) if (join->supportParallelJoin() && !right->hasTotals())
{ {

View File

@ -87,7 +87,7 @@ MergeListElement::MergeListElement(
/// thread_group::memory_tracker, but MemoryTrackerThreadSwitcher will reset parent). /// thread_group::memory_tracker, but MemoryTrackerThreadSwitcher will reset parent).
memory_tracker.setProfilerStep(settings.memory_profiler_step); memory_tracker.setProfilerStep(settings.memory_profiler_step);
memory_tracker.setSampleProbability(settings.memory_profiler_sample_probability); memory_tracker.setSampleProbability(settings.memory_profiler_sample_probability);
memory_tracker.setSoftLimit(settings.max_guaranteed_memory_usage); memory_tracker.setSoftLimit(settings.memory_overcommit_ratio_denominator);
if (settings.memory_tracker_fault_probability) if (settings.memory_tracker_fault_probability)
memory_tracker.setFaultProbability(settings.memory_tracker_fault_probability); memory_tracker.setFaultProbability(settings.memory_tracker_fault_probability);

View File

@ -40,7 +40,11 @@
#include <Processors/Transforms/WatermarkTransform.h> #include <Processors/Transforms/WatermarkTransform.h>
#include <Processors/Transforms/SquashingChunksTransform.h> #include <Processors/Transforms/SquashingChunksTransform.h>
#include <Processors/Transforms/MaterializingTransform.h> #include <Processors/Transforms/MaterializingTransform.h>
#include <Processors/Transforms/MergeSortingTransform.h> #include <Processors/QueryPlan/QueryPlan.h>
#include <Processors/QueryPlan/SettingQuotaAndLimitsStep.h>
#include <Processors/QueryPlan/BuildQueryPipelineSettings.h>
#include <Processors/QueryPlan/ExpressionStep.h>
#include <Processors/QueryPlan/Optimizations/QueryPlanOptimizationSettings.h>
#include <Processors/Executors/PipelineExecutor.h> #include <Processors/Executors/PipelineExecutor.h>
#include <Processors/Sinks/EmptySink.h> #include <Processors/Sinks/EmptySink.h>
#include <Storages/StorageFactory.h> #include <Storages/StorageFactory.h>
@ -977,6 +981,76 @@ void StorageWindowView::threadFuncFireEvent()
} }
} }
Pipe StorageWindowView::read(
const Names & column_names,
const StorageSnapshotPtr & storage_snapshot,
SelectQueryInfo & query_info,
ContextPtr local_context,
QueryProcessingStage::Enum processed_stage,
const size_t max_block_size,
const unsigned num_streams)
{
QueryPlan plan;
read(plan, column_names, storage_snapshot, query_info, local_context, processed_stage, max_block_size, num_streams);
return plan.convertToPipe(
QueryPlanOptimizationSettings::fromContext(local_context), BuildQueryPipelineSettings::fromContext(local_context));
}
void StorageWindowView::read(
QueryPlan & query_plan,
const Names & column_names,
const StorageSnapshotPtr & storage_snapshot,
SelectQueryInfo & query_info,
ContextPtr local_context,
QueryProcessingStage::Enum processed_stage,
const size_t max_block_size,
const unsigned num_streams)
{
if (target_table_id.empty())
return;
auto storage = getTargetStorage();
auto lock = storage->lockForShare(local_context->getCurrentQueryId(), local_context->getSettingsRef().lock_acquire_timeout);
auto target_metadata_snapshot = storage->getInMemoryMetadataPtr();
auto target_storage_snapshot = storage->getStorageSnapshot(target_metadata_snapshot, local_context);
if (query_info.order_optimizer)
query_info.input_order_info = query_info.order_optimizer->getInputOrder(target_metadata_snapshot, local_context);
storage->read(query_plan, column_names, target_storage_snapshot, query_info, local_context, processed_stage, max_block_size, num_streams);
if (query_plan.isInitialized())
{
auto wv_header = getHeaderForProcessingStage(column_names, storage_snapshot, query_info, local_context, processed_stage);
auto target_header = query_plan.getCurrentDataStream().header;
if (!blocksHaveEqualStructure(wv_header, target_header))
{
auto converting_actions = ActionsDAG::makeConvertingActions(
target_header.getColumnsWithTypeAndName(), wv_header.getColumnsWithTypeAndName(), ActionsDAG::MatchColumnsMode::Name);
auto converting_step = std::make_unique<ExpressionStep>(query_plan.getCurrentDataStream(), converting_actions);
converting_step->setStepDescription("Convert Target table structure to WindowView structure");
query_plan.addStep(std::move(converting_step));
}
StreamLocalLimits limits;
SizeLimits leaf_limits;
/// Add table lock for target table.
auto adding_limits_and_quota = std::make_unique<SettingQuotaAndLimitsStep>(
query_plan.getCurrentDataStream(),
storage,
std::move(lock),
limits,
leaf_limits,
nullptr,
nullptr);
adding_limits_and_quota->setStepDescription("Lock target table for WindowView");
query_plan.addStep(std::move(adding_limits_and_quota));
}
}
Pipe StorageWindowView::watch( Pipe StorageWindowView::watch(
const Names & /*column_names*/, const Names & /*column_names*/,
const SelectQueryInfo & query_info, const SelectQueryInfo & query_info,

View File

@ -137,6 +137,25 @@ public:
void startup() override; void startup() override;
void shutdown() override; void shutdown() override;
Pipe read(
const Names & column_names,
const StorageSnapshotPtr & storage_snapshot,
SelectQueryInfo & query_info,
ContextPtr context,
QueryProcessingStage::Enum processed_stage,
size_t max_block_size,
unsigned num_streams) override;
void read(
QueryPlan & query_plan,
const Names & column_names,
const StorageSnapshotPtr & storage_snapshot,
SelectQueryInfo & query_info,
ContextPtr context,
QueryProcessingStage::Enum processed_stage,
size_t max_block_size,
unsigned num_streams) override;
Pipe watch( Pipe watch(
const Names & column_names, const Names & column_names,
const SelectQueryInfo & query_info, const SelectQueryInfo & query_info,

View File

@ -18,8 +18,8 @@ def start_cluster():
cluster.shutdown() cluster.shutdown()
TEST_QUERY_A = "SELECT number FROM numbers(1000) GROUP BY number SETTINGS max_guaranteed_memory_usage_for_user=1" TEST_QUERY_A = "SELECT number FROM numbers(1000) GROUP BY number SETTINGS memory_overcommit_ratio_denominator_for_user=1"
TEST_QUERY_B = "SELECT number FROM numbers(1000) GROUP BY number SETTINGS max_guaranteed_memory_usage_for_user=2" TEST_QUERY_B = "SELECT number FROM numbers(1000) GROUP BY number SETTINGS memory_overcommit_ratio_denominator_for_user=2"
def test_overcommited_is_killed(): def test_overcommited_is_killed():

View File

@ -0,0 +1,14 @@
1 1 1990-01-01 12:00:05
1 2 1990-01-01 12:00:05
1 3 1990-01-01 12:00:05
1 4 1990-01-01 12:00:10
1 5 1990-01-01 12:00:10
1 6 1990-01-01 12:00:15
1 7 1990-01-01 12:00:15
1
2
3
4
5
6
7

View File

@ -0,0 +1,35 @@
#!/usr/bin/env bash
CURDIR=$(cd "$(dirname "${BASH_SOURCE[0]}")" && pwd)
# shellcheck source=../shell_config.sh
. "$CURDIR"/../shell_config.sh
$CLICKHOUSE_CLIENT --multiquery <<EOF
SET allow_experimental_window_view = 1;
DROP TABLE IF EXISTS mt;
DROP TABLE IF EXISTS dst;
DROP TABLE IF EXISTS wv;
CREATE TABLE dst(count UInt64, market Int32, w_end DateTime) Engine=MergeTree ORDER BY tuple();
CREATE TABLE mt(a Int32, market Int32, timestamp DateTime) ENGINE=MergeTree ORDER BY tuple();
CREATE WINDOW VIEW wv TO dst WATERMARK=ASCENDING AS SELECT count(a) AS count, market, tumbleEnd(wid) AS w_end FROM mt GROUP BY tumble(timestamp, INTERVAL '5' SECOND, 'US/Samoa') AS wid, market;
INSERT INTO mt VALUES (1, 1, '1990/01/01 12:00:00');
INSERT INTO mt VALUES (1, 2, '1990/01/01 12:00:01');
INSERT INTO mt VALUES (1, 3, '1990/01/01 12:00:02');
INSERT INTO mt VALUES (1, 4, '1990/01/01 12:00:05');
INSERT INTO mt VALUES (1, 5, '1990/01/01 12:00:06');
INSERT INTO mt VALUES (1, 6, '1990/01/01 12:00:10');
INSERT INTO mt VALUES (1, 7, '1990/01/01 12:00:11');
INSERT INTO mt VALUES (1, 8, '1990/01/01 12:00:30');
EOF
while true; do
$CLICKHOUSE_CLIENT --query="SELECT count(*) FROM wv" | grep -q "7" && break || sleep .5 ||:
done
$CLICKHOUSE_CLIENT --query="SELECT * FROM wv ORDER BY market, w_end;"
$CLICKHOUSE_CLIENT --query="SELECT market FROM wv ORDER BY market, w_end;"
$CLICKHOUSE_CLIENT --query="DROP TABLE wv"
$CLICKHOUSE_CLIENT --query="DROP TABLE mt"
$CLICKHOUSE_CLIENT --query="DROP TABLE dst"

View File

@ -11,13 +11,13 @@ $CLICKHOUSE_CLIENT -q 'GRANT ALL ON *.* TO u02104'
function overcommited() function overcommited()
{ {
$CLICKHOUSE_CLIENT -u u02104 -q 'SELECT number FROM numbers(130000) GROUP BY number SETTINGS max_guaranteed_memory_usage=1,memory_usage_overcommit_max_wait_microseconds=500' 2>&1 \ $CLICKHOUSE_CLIENT -u u02104 -q 'SELECT number FROM numbers(130000) GROUP BY number SETTINGS memory_overcommit_ratio_denominator=1,memory_usage_overcommit_max_wait_microseconds=500' 2>&1 \
| grep -F -q "MEMORY_LIMIT_EXCEEDED" && echo "OVERCOMMITED WITH USER LIMIT IS KILLED" | grep -F -q "MEMORY_LIMIT_EXCEEDED" && echo "OVERCOMMITED WITH USER LIMIT IS KILLED"
} }
function expect_execution() function expect_execution()
{ {
$CLICKHOUSE_CLIENT -u u02104 -q 'SELECT number FROM numbers(130000) GROUP BY number SETTINGS max_memory_usage_for_user=5000000,max_guaranteed_memory_usage=2,memory_usage_overcommit_max_wait_microseconds=500' >/dev/null 2>/dev/null $CLICKHOUSE_CLIENT -u u02104 -q 'SELECT number FROM numbers(130000) GROUP BY number SETTINGS max_memory_usage_for_user=5000000,memory_overcommit_ratio_denominator=2,memory_usage_overcommit_max_wait_microseconds=500' >/dev/null 2>/dev/null
} }
export -f overcommited export -f overcommited

View File

@ -1,4 +1,5 @@
set join_algorithm='parallel_hash'; SET join_algorithm='parallel_hash';
SELECT SELECT
EventDate, EventDate,
hits, hits,