Per MergeTree table query limit

This commit is contained in:
Amos Bird 2021-01-25 13:01:39 +08:00
parent 8084e4d614
commit 66fe97d8bd
No known key found for this signature in database
GPG Key ID: 80D430DCBECFEDB4
11 changed files with 193 additions and 15 deletions

View File

@ -105,6 +105,8 @@ Pipe::Holder & Pipe::Holder::operator=(Holder && rhs)
for (auto & plan : rhs.query_plans) for (auto & plan : rhs.query_plans)
query_plans.emplace_back(std::move(plan)); query_plans.emplace_back(std::move(plan));
query_id_holder = std::move(rhs.query_id_holder);
return *this; return *this;
} }

View File

@ -1,6 +1,7 @@
#pragma once #pragma once
#include <Processors/IProcessor.h> #include <Processors/IProcessor.h>
#include <Processors/Sources/SourceWithProgress.h> #include <Processors/Sources/SourceWithProgress.h>
#include <Processors/QueryPlan/QueryIdHolder.h>
#include <Processors/QueryPlan/QueryPlan.h> #include <Processors/QueryPlan/QueryPlan.h>
namespace DB namespace DB
@ -108,6 +109,7 @@ public:
/// This methods are from QueryPipeline. Needed to make conversion from pipeline to pipe possible. /// This methods are from QueryPipeline. Needed to make conversion from pipeline to pipe possible.
void addInterpreterContext(std::shared_ptr<Context> context) { holder.interpreter_context.emplace_back(std::move(context)); } void addInterpreterContext(std::shared_ptr<Context> context) { holder.interpreter_context.emplace_back(std::move(context)); }
void addStorageHolder(StoragePtr storage) { holder.storage_holders.emplace_back(std::move(storage)); } void addStorageHolder(StoragePtr storage) { holder.storage_holders.emplace_back(std::move(storage)); }
void addQueryIdHolder(std::shared_ptr<QueryIdHolder> query_id_holder) { holder.query_id_holder = std::move(query_id_holder); }
/// For queries with nested interpreters (i.e. StorageDistributed) /// For queries with nested interpreters (i.e. StorageDistributed)
void addQueryPlan(std::unique_ptr<QueryPlan> plan) { holder.query_plans.emplace_back(std::move(plan)); } void addQueryPlan(std::unique_ptr<QueryPlan> plan) { holder.query_plans.emplace_back(std::move(plan)); }
@ -128,6 +130,7 @@ private:
std::vector<StoragePtr> storage_holders; std::vector<StoragePtr> storage_holders;
std::vector<TableLockHolder> table_locks; std::vector<TableLockHolder> table_locks;
std::vector<std::unique_ptr<QueryPlan>> query_plans; std::vector<std::unique_ptr<QueryPlan>> query_plans;
std::shared_ptr<QueryIdHolder> query_id_holder;
}; };
Holder holder; Holder holder;

View File

@ -0,0 +1,15 @@
#include <Processors/QueryPlan/QueryIdHolder.h>
#include <Storages/MergeTree/MergeTreeData.h>
namespace DB
{
QueryIdHolder::QueryIdHolder(const String & query_id_, const MergeTreeData & data_) : query_id(query_id_), data(data_)
{
}
QueryIdHolder::~QueryIdHolder()
{
data.removeQueryId(query_id);
}
}

View File

@ -0,0 +1,21 @@
#pragma once
#include <string>
namespace DB
{
class MergeTreeData;
/// Holds the current query id and do something meaningful in destructor.
/// Currently it's used for cleaning query id in the MergeTreeData query set.
struct QueryIdHolder
{
QueryIdHolder(const std::string & query_id_, const MergeTreeData & data_);
~QueryIdHolder();
std::string query_id;
const MergeTreeData & data;
};
}

View File

@ -114,6 +114,7 @@ namespace ErrorCodes
extern const int NOT_ENOUGH_SPACE; extern const int NOT_ENOUGH_SPACE;
extern const int ALTER_OF_COLUMN_IS_FORBIDDEN; extern const int ALTER_OF_COLUMN_IS_FORBIDDEN;
extern const int SUPPORT_IS_DISABLED; extern const int SUPPORT_IS_DISABLED;
extern const int TOO_MANY_SIMULTANEOUS_QUERIES;
} }
@ -3988,4 +3989,24 @@ void MergeTreeData::setDataVolume(size_t bytes, size_t rows, size_t parts)
total_active_size_rows.store(rows, std::memory_order_release); total_active_size_rows.store(rows, std::memory_order_release);
total_active_size_parts.store(parts, std::memory_order_release); total_active_size_parts.store(parts, std::memory_order_release);
} }
void MergeTreeData::insertQueryIdOrThrow(const String & query_id, size_t max_queries) const
{
std::lock_guard lock(query_id_set_mutex);
if (query_id_set.find(query_id) != query_id_set.end())
return;
if (query_id_set.size() >= max_queries)
throw Exception(
ErrorCodes::TOO_MANY_SIMULTANEOUS_QUERIES, "Too many simultaneous queries for table {}. Maximum is: {}", log_name, max_queries);
query_id_set.insert(query_id);
}
void MergeTreeData::removeQueryId(const String & query_id) const
{
std::lock_guard lock(query_id_set_mutex);
if (query_id_set.find(query_id) == query_id_set.end())
LOG_WARNING(log, "We have query_id removed but it's not recorded. This is a bug");
else
query_id_set.erase(query_id);
}
} }

View File

@ -702,6 +702,12 @@ public:
/// section from config.xml. /// section from config.xml.
CompressionCodecPtr getCompressionCodecForPart(size_t part_size_compressed, const IMergeTreeDataPart::TTLInfos & ttl_infos, time_t current_time) const; CompressionCodecPtr getCompressionCodecForPart(size_t part_size_compressed, const IMergeTreeDataPart::TTLInfos & ttl_infos, time_t current_time) const;
/// Record current query id where querying the table. Throw if there are already `max_queries` queries accessing the same table.
void insertQueryIdOrThrow(const String & query_id, size_t max_queries) const;
/// Remove current query id after query finished.
void removeQueryId(const String & query_id) const;
/// Limiting parallel sends per one table, used in DataPartsExchange /// Limiting parallel sends per one table, used in DataPartsExchange
std::atomic_uint current_table_sends {0}; std::atomic_uint current_table_sends {0};
@ -958,6 +964,10 @@ private:
std::atomic<size_t> total_active_size_bytes = 0; std::atomic<size_t> total_active_size_bytes = 0;
std::atomic<size_t> total_active_size_rows = 0; std::atomic<size_t> total_active_size_rows = 0;
std::atomic<size_t> total_active_size_parts = 0; std::atomic<size_t> total_active_size_parts = 0;
// Record all query ids which access the table. It's guarded by `query_id_set_mutex` and is always mutable.
mutable std::set<String> query_id_set;
mutable std::mutex query_id_set_mutex;
}; };
} }

View File

@ -33,6 +33,7 @@
#include <Processors/QueryPlan/MergingSortedStep.h> #include <Processors/QueryPlan/MergingSortedStep.h>
#include <Processors/QueryPlan/UnionStep.h> #include <Processors/QueryPlan/UnionStep.h>
#include <Processors/QueryPlan/MergingFinal.h> #include <Processors/QueryPlan/MergingFinal.h>
#include <Processors/QueryPlan/ReadNothingStep.h>
#include <DataTypes/DataTypeDate.h> #include <DataTypes/DataTypeDate.h>
#include <DataTypes/DataTypeEnum.h> #include <DataTypes/DataTypeEnum.h>
@ -707,8 +708,9 @@ QueryPlanPtr MergeTreeDataSelectExecutor::readFromParts(
if (parts_with_ranges.empty()) if (parts_with_ranges.empty())
return std::make_unique<QueryPlan>(); return std::make_unique<QueryPlan>();
const auto data_settings = data.getSettings();
auto max_partitions_to_read auto max_partitions_to_read
= settings.max_partitions_to_read.changed ? settings.max_partitions_to_read : data.getSettings()->max_partitions_to_read; = settings.max_partitions_to_read.changed ? settings.max_partitions_to_read : data_settings->max_partitions_to_read;
if (max_partitions_to_read > 0) if (max_partitions_to_read > 0)
{ {
std::set<String> partitions; std::set<String> partitions;
@ -722,6 +724,18 @@ QueryPlanPtr MergeTreeDataSelectExecutor::readFromParts(
max_partitions_to_read); max_partitions_to_read);
} }
String query_id;
if (data_settings->max_concurrent_queries > 0)
{
if (data_settings->min_marks_to_honor_max_concurrent_queries > 0
&& sum_marks >= data_settings->min_marks_to_honor_max_concurrent_queries)
{
query_id = context.getCurrentQueryId();
if (!query_id.empty())
data.insertQueryIdOrThrow(query_id, data_settings->max_concurrent_queries);
}
}
ProfileEvents::increment(ProfileEvents::SelectedParts, parts_with_ranges.size()); ProfileEvents::increment(ProfileEvents::SelectedParts, parts_with_ranges.size());
ProfileEvents::increment(ProfileEvents::SelectedRanges, sum_ranges); ProfileEvents::increment(ProfileEvents::SelectedRanges, sum_ranges);
ProfileEvents::increment(ProfileEvents::SelectedMarks, sum_marks); ProfileEvents::increment(ProfileEvents::SelectedMarks, sum_marks);
@ -758,7 +772,8 @@ QueryPlanPtr MergeTreeDataSelectExecutor::readFromParts(
virt_column_names, virt_column_names,
settings, settings,
reader_settings, reader_settings,
result_projection); result_projection,
query_id);
} }
else if ((settings.optimize_read_in_order || settings.optimize_aggregation_in_order) && query_info.input_order_info) else if ((settings.optimize_read_in_order || settings.optimize_aggregation_in_order) && query_info.input_order_info)
{ {
@ -781,7 +796,8 @@ QueryPlanPtr MergeTreeDataSelectExecutor::readFromParts(
virt_column_names, virt_column_names,
settings, settings,
reader_settings, reader_settings,
result_projection); result_projection,
query_id);
} }
else else
{ {
@ -795,7 +811,8 @@ QueryPlanPtr MergeTreeDataSelectExecutor::readFromParts(
query_info, query_info,
virt_column_names, virt_column_names,
settings, settings,
reader_settings); reader_settings,
query_id);
} }
if (!plan) if (!plan)
@ -895,7 +912,7 @@ size_t minMarksForConcurrentRead(
} }
static QueryPlanPtr createPlanFromPipe(Pipe pipe, const std::string & description = "") static QueryPlanPtr createPlanFromPipe(Pipe pipe, const String & query_id, const MergeTreeData & data, const std::string & description = "")
{ {
auto plan = std::make_unique<QueryPlan>(); auto plan = std::make_unique<QueryPlan>();
@ -903,6 +920,10 @@ static QueryPlanPtr createPlanFromPipe(Pipe pipe, const std::string & descriptio
if (!description.empty()) if (!description.empty())
storage_name += ' ' + description; storage_name += ' ' + description;
// Attach QueryIdHolder if needed
if (!query_id.empty())
pipe.addQueryIdHolder(std::make_shared<QueryIdHolder>(query_id, data));
auto step = std::make_unique<ReadFromStorageStep>(std::move(pipe), storage_name); auto step = std::make_unique<ReadFromStorageStep>(std::move(pipe), storage_name);
plan->addStep(std::move(step)); plan->addStep(std::move(step));
return plan; return plan;
@ -918,7 +939,8 @@ QueryPlanPtr MergeTreeDataSelectExecutor::spreadMarkRangesAmongStreams(
const SelectQueryInfo & query_info, const SelectQueryInfo & query_info,
const Names & virt_columns, const Names & virt_columns,
const Settings & settings, const Settings & settings,
const MergeTreeReaderSettings & reader_settings) const const MergeTreeReaderSettings & reader_settings,
const String & query_id) const
{ {
/// Count marks for each part. /// Count marks for each part.
std::vector<size_t> sum_marks_in_parts(parts.size()); std::vector<size_t> sum_marks_in_parts(parts.size());
@ -1003,7 +1025,7 @@ QueryPlanPtr MergeTreeDataSelectExecutor::spreadMarkRangesAmongStreams(
res.emplace_back(std::move(source)); res.emplace_back(std::move(source));
} }
return createPlanFromPipe(Pipe::unitePipes(std::move(res))); return createPlanFromPipe(Pipe::unitePipes(std::move(res)), query_id, data);
} }
else else
{ {
@ -1027,7 +1049,7 @@ QueryPlanPtr MergeTreeDataSelectExecutor::spreadMarkRangesAmongStreams(
if (pipe.numOutputPorts() > 1) if (pipe.numOutputPorts() > 1)
pipe.addTransform(std::make_shared<ConcatProcessor>(pipe.getHeader(), pipe.numOutputPorts())); pipe.addTransform(std::make_shared<ConcatProcessor>(pipe.getHeader(), pipe.numOutputPorts()));
return createPlanFromPipe(std::move(pipe)); return createPlanFromPipe(std::move(pipe), query_id, data);
} }
} }
@ -1051,7 +1073,8 @@ QueryPlanPtr MergeTreeDataSelectExecutor::spreadMarkRangesAmongStreamsWithOrder(
const Names & virt_columns, const Names & virt_columns,
const Settings & settings, const Settings & settings,
const MergeTreeReaderSettings & reader_settings, const MergeTreeReaderSettings & reader_settings,
ActionsDAGPtr & out_projection) const ActionsDAGPtr & out_projection,
const String & query_id) const
{ {
size_t sum_marks = 0; size_t sum_marks = 0;
const InputOrderInfoPtr & input_order_info = query_info.input_order_info; const InputOrderInfoPtr & input_order_info = query_info.input_order_info;
@ -1242,7 +1265,7 @@ QueryPlanPtr MergeTreeDataSelectExecutor::spreadMarkRangesAmongStreamsWithOrder(
} }
} }
auto plan = createPlanFromPipe(Pipe::unitePipes(std::move(pipes)), " with order"); auto plan = createPlanFromPipe(Pipe::unitePipes(std::move(pipes)), query_id, data, " with order");
if (input_order_info->direction != 1) if (input_order_info->direction != 1)
{ {
@ -1310,7 +1333,8 @@ QueryPlanPtr MergeTreeDataSelectExecutor::spreadMarkRangesAmongStreamsFinal(
const Names & virt_columns, const Names & virt_columns,
const Settings & settings, const Settings & settings,
const MergeTreeReaderSettings & reader_settings, const MergeTreeReaderSettings & reader_settings,
ActionsDAGPtr & out_projection) const ActionsDAGPtr & out_projection,
const String & query_id) const
{ {
const auto data_settings = data.getSettings(); const auto data_settings = data.getSettings();
size_t sum_marks = 0; size_t sum_marks = 0;
@ -1406,7 +1430,7 @@ QueryPlanPtr MergeTreeDataSelectExecutor::spreadMarkRangesAmongStreamsFinal(
if (!out_projection) if (!out_projection)
out_projection = createProjection(pipe.getHeader()); out_projection = createProjection(pipe.getHeader());
plan = createPlanFromPipe(std::move(pipe), "with final"); plan = createPlanFromPipe(std::move(pipe), query_id, data, "with final");
} }
/// If do_not_merge_across_partitions_select_final is true and there is only one part in partition /// If do_not_merge_across_partitions_select_final is true and there is only one part in partition

View File

@ -58,7 +58,8 @@ private:
const SelectQueryInfo & query_info, const SelectQueryInfo & query_info,
const Names & virt_columns, const Names & virt_columns,
const Settings & settings, const Settings & settings,
const MergeTreeReaderSettings & reader_settings) const; const MergeTreeReaderSettings & reader_settings,
const String & query_id) const;
/// out_projection - save projection only with columns, requested to read /// out_projection - save projection only with columns, requested to read
QueryPlanPtr spreadMarkRangesAmongStreamsWithOrder( QueryPlanPtr spreadMarkRangesAmongStreamsWithOrder(
@ -73,7 +74,8 @@ private:
const Names & virt_columns, const Names & virt_columns,
const Settings & settings, const Settings & settings,
const MergeTreeReaderSettings & reader_settings, const MergeTreeReaderSettings & reader_settings,
ActionsDAGPtr & out_projection) const; ActionsDAGPtr & out_projection,
const String & query_id) const;
QueryPlanPtr spreadMarkRangesAmongStreamsFinal( QueryPlanPtr spreadMarkRangesAmongStreamsFinal(
RangesInDataParts && parts, RangesInDataParts && parts,
@ -86,7 +88,8 @@ private:
const Names & virt_columns, const Names & virt_columns,
const Settings & settings, const Settings & settings,
const MergeTreeReaderSettings & reader_settings, const MergeTreeReaderSettings & reader_settings,
ActionsDAGPtr & out_projection) const; ActionsDAGPtr & out_projection,
const String & query_id) const;
/// Get the approximate value (bottom estimate - only by full marks) of the number of rows falling under the index. /// Get the approximate value (bottom estimate - only by full marks) of the number of rows falling under the index.
size_t getApproximateTotalRowsToRead( size_t getApproximateTotalRowsToRead(

View File

@ -111,6 +111,8 @@ struct Settings;
M(Bool, remove_empty_parts, true, "Remove empty parts after they were pruned by TTL, mutation, or collapsing merge algorithm", 0) \ M(Bool, remove_empty_parts, true, "Remove empty parts after they were pruned by TTL, mutation, or collapsing merge algorithm", 0) \
M(Bool, assign_part_uuids, false, "Generate UUIDs for parts. Before enabling check that all replicas support new format.", 0) \ M(Bool, assign_part_uuids, false, "Generate UUIDs for parts. Before enabling check that all replicas support new format.", 0) \
M(Int64, max_partitions_to_read, -1, "Limit the max number of partitions that can be accessed in one query. <= 0 means unlimited. This setting is the default that can be overridden by the query-level setting with the same name.", 0) \ M(Int64, max_partitions_to_read, -1, "Limit the max number of partitions that can be accessed in one query. <= 0 means unlimited. This setting is the default that can be overridden by the query-level setting with the same name.", 0) \
M(UInt64, max_concurrent_queries, 0, "Max number of concurrently executed queries related to the MergeTree table (0 - disabled). Queries will still be limited by other max_concurrent_queries settings.", 0) \
M(UInt64, min_marks_to_honor_max_concurrent_queries, 0, "Minimal number of marks to honor the MergeTree-level's max_concurrent_queries (0 - disabled). Queries will still be limited by other max_concurrent_queries settings.", 0) \
\ \
/** Obsolete settings. Kept for backward compatibility only. */ \ /** Obsolete settings. Kept for backward compatibility only. */ \
M(UInt64, min_relative_delay_to_yield_leadership, 120, "Obsolete setting, does nothing.", 0) \ M(UInt64, min_relative_delay_to_yield_leadership, 120, "Obsolete setting, does nothing.", 0) \

View File

@ -0,0 +1,14 @@
Spin up a long running query
Check if another query with some marks to read is throttled
yes
Check if another query with less marks to read is passed
0 100
Modify min_marks_to_honor_max_concurrent_queries to 1
Check if another query with less marks to read is throttled
yes
Modify max_concurrent_queries to 2
Check if another query is passed
0 100
Modify max_concurrent_queries back to 1
Check if another query with less marks to read is throttled
yes

View File

@ -0,0 +1,63 @@
#!/usr/bin/env bash
CURDIR=$(cd "$(dirname "${BASH_SOURCE[0]}")" && pwd)
# shellcheck source=../shell_config.sh
. "$CURDIR"/../shell_config.sh
${CLICKHOUSE_CLIENT} --multiline --multiquery --query "
drop table if exists simple;
create table simple (i int, j int) engine = MergeTree order by i
settings index_granularity = 1, max_concurrent_queries = 1, min_marks_to_honor_max_concurrent_queries = 2;
insert into simple select number, number + 100 from numbers(10);
"
echo "Spin up a long running query"
${CLICKHOUSE_CLIENT} --query "select sleepEachRow(1) from simple settings max_block_size = 1 format Null" --query_id "long_running_query" &
sleep 3
# query which reads marks >= min_marks_to_honor_max_concurrent_queries is throttled
echo "Check if another query with some marks to read is throttled"
${CLICKHOUSE_CLIENT} --query "select * from simple" 2> /dev/null;
CODE=$?
[ "$CODE" -ne "202" ] && echo "Expected error code: 202 but got: $CODE" && exit 1;
echo "yes"
# query which reads marks less than min_marks_to_honor_max_concurrent_queries is allowed
echo "Check if another query with less marks to read is passed"
${CLICKHOUSE_CLIENT} --query "select * from simple where i = 0"
# We can modify the settings to take effect for future queries
echo "Modify min_marks_to_honor_max_concurrent_queries to 1"
${CLICKHOUSE_CLIENT} --query "alter table simple modify setting min_marks_to_honor_max_concurrent_queries = 1"
# Now smaller queries are also throttled
echo "Check if another query with less marks to read is throttled"
${CLICKHOUSE_CLIENT} --query "select * from simple where i = 0" 2> /dev/null;
CODE=$?
[ "$CODE" -ne "202" ] && echo "Expected error code: 202 but got: $CODE" && exit 1;
echo "yes"
echo "Modify max_concurrent_queries to 2"
${CLICKHOUSE_CLIENT} --query "alter table simple modify setting max_concurrent_queries = 2"
# Now more queries are accepted
echo "Check if another query is passed"
${CLICKHOUSE_CLIENT} --query "select * from simple where i = 0"
echo "Modify max_concurrent_queries back to 1"
${CLICKHOUSE_CLIENT} --query "alter table simple modify setting max_concurrent_queries = 1"
# Now queries are throttled again
echo "Check if another query with less marks to read is throttled"
${CLICKHOUSE_CLIENT} --query "select * from simple where i = 0" 2> /dev/null;
CODE=$?
[ "$CODE" -ne "202" ] && echo "Expected error code: 202 but got: $CODE" && exit 1;
echo "yes"
wait
${CLICKHOUSE_CLIENT} --multiline --multiquery --query "
drop table simple
"