mirror of
https://github.com/ClickHouse/ClickHouse.git
synced 2024-11-28 02:21:59 +00:00
Merge pull request #46835 from ClickHouse/reduce_mem_in_mutation_with_subquery
Reduce memory consumption by mutations with big subqueries used with IN
This commit is contained in:
commit
ba5ca15c40
@ -5124,7 +5124,7 @@ ProjectionNames QueryAnalyzer::resolveFunction(QueryTreeNodePtr & node, Identifi
|
||||
|
||||
/// Create constant set column for constant folding
|
||||
|
||||
auto column_set = ColumnSet::create(1, std::move(set));
|
||||
auto column_set = ColumnSet::create(1, FutureSet(std::move(set)));
|
||||
argument_columns[1].column = ColumnConst::create(std::move(column_set), 1);
|
||||
}
|
||||
|
||||
|
@ -1,5 +1,6 @@
|
||||
#pragma once
|
||||
|
||||
#include <Interpreters/PreparedSets.h>
|
||||
#include <Columns/IColumnDummy.h>
|
||||
#include <Core/Field.h>
|
||||
|
||||
@ -20,7 +21,7 @@ class ColumnSet final : public COWHelper<IColumnDummy, ColumnSet>
|
||||
private:
|
||||
friend class COWHelper<IColumnDummy, ColumnSet>;
|
||||
|
||||
ColumnSet(size_t s_, const ConstSetPtr & data_) : data(data_) { s = s_; }
|
||||
ColumnSet(size_t s_, FutureSet data_) : data(std::move(data_)) { s = s_; }
|
||||
ColumnSet(const ColumnSet &) = default;
|
||||
|
||||
public:
|
||||
@ -28,13 +29,13 @@ public:
|
||||
TypeIndex getDataType() const override { return TypeIndex::Set; }
|
||||
MutableColumnPtr cloneDummy(size_t s_) const override { return ColumnSet::create(s_, data); }
|
||||
|
||||
ConstSetPtr getData() const { return data; }
|
||||
ConstSetPtr getData() const { if (!data.isReady()) return nullptr; return data.get(); }
|
||||
|
||||
// Used only for debugging, making it DUMPABLE
|
||||
Field operator[](size_t) const override { return {}; }
|
||||
|
||||
private:
|
||||
ConstSetPtr data;
|
||||
FutureSet data;
|
||||
};
|
||||
|
||||
}
|
||||
|
@ -302,6 +302,7 @@ class IColumn;
|
||||
M(Bool, http_skip_not_found_url_for_globs, true, "Skip url's for globs with HTTP_NOT_FOUND error", 0) \
|
||||
M(Bool, optimize_throw_if_noop, false, "If setting is enabled and OPTIMIZE query didn't actually assign a merge then an explanatory exception is thrown", 0) \
|
||||
M(Bool, use_index_for_in_with_subqueries, true, "Try using an index if there is a subquery or a table expression on the right side of the IN operator.", 0) \
|
||||
M(UInt64, use_index_for_in_with_subqueries_max_values, 0, "The maximum size of set in the right hand side of the IN operator to use table index for filtering. It allows to avoid performance degradation and higher memory usage due to preparation of additional data structures for large queries. Zero means no limit.", 0) \
|
||||
M(Bool, joined_subquery_requires_alias, true, "Force joined subqueries and table functions to have aliases for correct name qualification.", 0) \
|
||||
M(Bool, empty_result_for_aggregation_by_empty_set, false, "Return empty result when aggregating without keys on empty set.", 0) \
|
||||
M(Bool, empty_result_for_aggregation_by_constant_keys_on_empty_set, true, "Return empty result when aggregating by constant keys on empty set.", 0) \
|
||||
@ -569,6 +570,7 @@ class IColumn;
|
||||
M(Bool, query_cache_squash_partial_results, true, "Squash partial result blocks to blocks of size 'max_block_size'. Reduces performance of inserts into the query cache but improves the compressability of cache entries.", 0) \
|
||||
M(Seconds, query_cache_ttl, 60, "After this time in seconds entries in the query cache become stale", 0) \
|
||||
M(Bool, query_cache_share_between_users, false, "Allow other users to read entry in the query cache", 0) \
|
||||
M(Bool, enable_sharing_sets_for_mutations, true, "Allow sharing set objects build for IN subqueries between different tasks of the same mutation. This reduces memory usage and CPU consumption", 0) \
|
||||
\
|
||||
M(Bool, optimize_rewrite_sum_if_to_count_if, false, "Rewrite sumIf() and sum(if()) function countIf() function when logically equivalent", 0) \
|
||||
M(Bool, optimize_rewrite_aggregate_function_with_if, true, "Rewrite aggregate functions with if expression as argument when logically equivalent. For example, avg(if(cond, col, null)) can be rewritten to avgIf(cond, col)", 0) \
|
||||
|
@ -20,7 +20,7 @@ public:
|
||||
bool isParametric() const override { return true; }
|
||||
|
||||
// Used for expressions analysis.
|
||||
MutableColumnPtr createColumn() const override { return ColumnSet::create(0, nullptr); }
|
||||
MutableColumnPtr createColumn() const override { return ColumnSet::create(0, FutureSet{}); }
|
||||
|
||||
// Used only for debugging, making it DUMPABLE
|
||||
Field getDefault() const override { return Tuple(); }
|
||||
|
@ -952,14 +952,16 @@ void ActionsMatcher::visit(const ASTFunction & node, const ASTPtr & ast, Data &
|
||||
return;
|
||||
}
|
||||
|
||||
SetPtr prepared_set;
|
||||
FutureSet prepared_set;
|
||||
if (checkFunctionIsInOrGlobalInOperator(node))
|
||||
{
|
||||
/// Let's find the type of the first argument (then getActionsImpl will be called again and will not affect anything).
|
||||
visit(node.arguments->children.at(0), data);
|
||||
|
||||
if (!data.no_makeset && !(data.is_create_parameterized_view && !analyzeReceiveQueryParams(ast).empty())
|
||||
&& (prepared_set = makeSet(node, data, data.no_subqueries)))
|
||||
if (!data.no_makeset && !(data.is_create_parameterized_view && !analyzeReceiveQueryParams(ast).empty()))
|
||||
prepared_set = makeSet(node, data, data.no_subqueries);
|
||||
|
||||
if (prepared_set.isValid())
|
||||
{
|
||||
/// Transform tuple or subquery into a set.
|
||||
}
|
||||
@ -1172,14 +1174,15 @@ void ActionsMatcher::visit(const ASTFunction & node, const ASTPtr & ast, Data &
|
||||
num_arguments += columns.size() - 1;
|
||||
arg += columns.size() - 1;
|
||||
}
|
||||
else if (checkFunctionIsInOrGlobalInOperator(node) && arg == 1 && prepared_set)
|
||||
else if (checkFunctionIsInOrGlobalInOperator(node) && arg == 1 && prepared_set.isValid())
|
||||
{
|
||||
ColumnWithTypeAndName column;
|
||||
column.type = std::make_shared<DataTypeSet>();
|
||||
|
||||
/// If the argument is a set given by an enumeration of values (so, the set was already built), give it a unique name,
|
||||
/// so that sets with the same literal representation do not fuse together (they can have different types).
|
||||
if (!prepared_set->empty())
|
||||
const bool is_constant_set = prepared_set.isCreated();
|
||||
if (is_constant_set)
|
||||
column.name = data.getUniqueName("__set");
|
||||
else
|
||||
column.name = child->getColumnName();
|
||||
@ -1189,7 +1192,7 @@ void ActionsMatcher::visit(const ASTFunction & node, const ASTPtr & ast, Data &
|
||||
auto column_set = ColumnSet::create(1, prepared_set);
|
||||
/// If prepared_set is not empty, we have a set made with literals.
|
||||
/// Create a const ColumnSet to make constant folding work
|
||||
if (!prepared_set->empty())
|
||||
if (is_constant_set)
|
||||
column.column = ColumnConst::create(std::move(column_set), 1);
|
||||
else
|
||||
column.column = std::move(column_set);
|
||||
@ -1370,10 +1373,10 @@ void ActionsMatcher::visit(const ASTLiteral & literal, const ASTPtr & /* ast */,
|
||||
data.addColumn(std::move(column));
|
||||
}
|
||||
|
||||
SetPtr ActionsMatcher::makeSet(const ASTFunction & node, Data & data, bool no_subqueries)
|
||||
FutureSet ActionsMatcher::makeSet(const ASTFunction & node, Data & data, bool no_subqueries)
|
||||
{
|
||||
if (!data.prepared_sets)
|
||||
return nullptr;
|
||||
return {};
|
||||
|
||||
/** You need to convert the right argument to a set.
|
||||
* This can be a table name, a value, a value enumeration, or a subquery.
|
||||
@ -1390,8 +1393,12 @@ SetPtr ActionsMatcher::makeSet(const ASTFunction & node, Data & data, bool no_su
|
||||
if (no_subqueries)
|
||||
return {};
|
||||
auto set_key = PreparedSetKey::forSubquery(*right_in_operand);
|
||||
if (SetPtr set = data.prepared_sets->get(set_key))
|
||||
return set;
|
||||
|
||||
{
|
||||
auto set = data.prepared_sets->getFuture(set_key);
|
||||
if (set.isValid())
|
||||
return set;
|
||||
}
|
||||
|
||||
/// A special case is if the name of the table is specified on the right side of the IN statement,
|
||||
/// and the table has the type Set (a previously prepared set).
|
||||
@ -1407,7 +1414,7 @@ SetPtr ActionsMatcher::makeSet(const ASTFunction & node, Data & data, bool no_su
|
||||
{
|
||||
SetPtr set = storage_set->getSet();
|
||||
data.prepared_sets->set(set_key, set);
|
||||
return set;
|
||||
return FutureSet(set);
|
||||
}
|
||||
}
|
||||
}
|
||||
@ -1439,7 +1446,8 @@ SetPtr ActionsMatcher::makeSet(const ASTFunction & node, Data & data, bool no_su
|
||||
const auto & index = data.actions_stack.getLastActionsIndex();
|
||||
if (data.prepared_sets && index.contains(left_in_operand->getColumnName()))
|
||||
/// An explicit enumeration of values in parentheses.
|
||||
return makeExplicitSet(&node, last_actions, false, data.getContext(), data.set_size_limit, *data.prepared_sets);
|
||||
return FutureSet(
|
||||
makeExplicitSet(&node, last_actions, false, data.getContext(), data.set_size_limit, *data.prepared_sets));
|
||||
else
|
||||
return {};
|
||||
}
|
||||
|
@ -219,7 +219,7 @@ private:
|
||||
static void visit(const ASTLiteral & literal, const ASTPtr & ast, Data & data);
|
||||
static void visit(ASTExpressionList & expression_list, const ASTPtr & ast, Data & data);
|
||||
|
||||
static SetPtr makeSet(const ASTFunction & node, Data & data, bool no_subqueries);
|
||||
static FutureSet makeSet(const ASTFunction & node, Data & data, bool no_subqueries);
|
||||
static ASTs doUntuple(const ASTFunction * function, ActionsMatcher::Data & data);
|
||||
static std::optional<NameAndTypePair> getNameAndTypeFromAST(const ASTPtr & ast, Data & data);
|
||||
};
|
||||
|
@ -43,6 +43,7 @@
|
||||
#include <Interpreters/ExternalLoaderXMLConfigRepository.h>
|
||||
#include <Interpreters/TemporaryDataOnDisk.h>
|
||||
#include <Interpreters/Cache/QueryCache.h>
|
||||
#include <Interpreters/PreparedSets.h>
|
||||
#include <Core/Settings.h>
|
||||
#include <Core/SettingsQuirks.h>
|
||||
#include <Access/AccessControl.h>
|
||||
@ -4342,6 +4343,16 @@ bool Context::canUseParallelReplicasOnFollower() const
|
||||
&& getClientInfo().collaborate_with_initiator;
|
||||
}
|
||||
|
||||
void Context::setPreparedSetsCache(const PreparedSetsCachePtr & cache)
|
||||
{
|
||||
prepared_sets_cache = cache;
|
||||
}
|
||||
|
||||
PreparedSetsCachePtr Context::getPreparedSetsCache() const
|
||||
{
|
||||
return prepared_sets_cache;
|
||||
}
|
||||
|
||||
UInt64 Context::getClientProtocolVersion() const
|
||||
{
|
||||
return client_protocol_version;
|
||||
|
@ -193,6 +193,9 @@ class MergeTreeMetadataCache;
|
||||
using MergeTreeMetadataCachePtr = std::shared_ptr<MergeTreeMetadataCache>;
|
||||
#endif
|
||||
|
||||
class PreparedSetsCache;
|
||||
using PreparedSetsCachePtr = std::shared_ptr<PreparedSetsCache>;
|
||||
|
||||
/// An empty interface for an arbitrary object that may be attached by a shared pointer
|
||||
/// to query context, when using ClickHouse as a library.
|
||||
struct IHostContext
|
||||
@ -399,6 +402,10 @@ private:
|
||||
/// Temporary data for query execution accounting.
|
||||
TemporaryDataOnDiskScopePtr temp_data_on_disk;
|
||||
|
||||
/// Prepared sets that can be shared between different queries. One use case is when is to share prepared sets between
|
||||
/// mutation tasks of one mutation executed against different parts of the same table.
|
||||
PreparedSetsCachePtr prepared_sets_cache;
|
||||
|
||||
public:
|
||||
/// Some counters for current query execution.
|
||||
/// Most of them are workarounds and should be removed in the future.
|
||||
@ -1128,6 +1135,9 @@ public:
|
||||
|
||||
ParallelReplicasMode getParallelReplicasMode() const;
|
||||
|
||||
void setPreparedSetsCache(const PreparedSetsCachePtr & cache);
|
||||
PreparedSetsCachePtr getPreparedSetsCache() const;
|
||||
|
||||
private:
|
||||
std::unique_lock<std::recursive_mutex> getLock() const;
|
||||
|
||||
|
@ -56,6 +56,7 @@
|
||||
#include <Core/Names.h>
|
||||
#include <Core/NamesAndTypes.h>
|
||||
#include <Common/logger_useful.h>
|
||||
#include <QueryPipeline/SizeLimits.h>
|
||||
|
||||
|
||||
#include <DataTypes/DataTypesNumber.h>
|
||||
@ -68,6 +69,7 @@
|
||||
#include <Interpreters/interpretSubquery.h>
|
||||
#include <Interpreters/JoinUtils.h>
|
||||
#include <Interpreters/misc.h>
|
||||
#include <Interpreters/PreparedSets.h>
|
||||
|
||||
#include <IO/Operators.h>
|
||||
#include <IO/WriteBufferFromString.h>
|
||||
@ -147,6 +149,11 @@ ExpressionAnalyzerData::~ExpressionAnalyzerData() = default;
|
||||
ExpressionAnalyzer::ExtractedSettings::ExtractedSettings(const Settings & settings_)
|
||||
: use_index_for_in_with_subqueries(settings_.use_index_for_in_with_subqueries)
|
||||
, size_limits_for_set(settings_.max_rows_in_set, settings_.max_bytes_in_set, settings_.set_overflow_mode)
|
||||
, size_limits_for_set_used_with_index(
|
||||
(settings_.use_index_for_in_with_subqueries_max_values &&
|
||||
settings_.use_index_for_in_with_subqueries_max_values < settings_.max_rows_in_set) ?
|
||||
size_limits_for_set :
|
||||
SizeLimits(settings_.use_index_for_in_with_subqueries_max_values, settings_.max_bytes_in_set, OverflowMode::BREAK))
|
||||
, distributed_group_by_no_merge(settings_.distributed_group_by_no_merge)
|
||||
{}
|
||||
|
||||
@ -450,7 +457,7 @@ void ExpressionAnalyzer::tryMakeSetForIndexFromSubquery(const ASTPtr & subquery_
|
||||
|
||||
auto set_key = PreparedSetKey::forSubquery(*subquery_or_table_name);
|
||||
|
||||
if (prepared_sets->get(set_key))
|
||||
if (prepared_sets->getFuture(set_key).isValid())
|
||||
return; /// Already prepared.
|
||||
|
||||
if (auto set_ptr_from_storage_set = isPlainStorageSetInSubquery(subquery_or_table_name))
|
||||
@ -459,25 +466,57 @@ void ExpressionAnalyzer::tryMakeSetForIndexFromSubquery(const ASTPtr & subquery_
|
||||
return;
|
||||
}
|
||||
|
||||
auto interpreter_subquery = interpretSubquery(subquery_or_table_name, getContext(), {}, query_options);
|
||||
auto io = interpreter_subquery->execute();
|
||||
PullingAsyncPipelineExecutor executor(io.pipeline);
|
||||
|
||||
SetPtr set = std::make_shared<Set>(settings.size_limits_for_set, true, getContext()->getSettingsRef().transform_null_in);
|
||||
set->setHeader(executor.getHeader().getColumnsWithTypeAndName());
|
||||
|
||||
Block block;
|
||||
while (executor.pull(block))
|
||||
auto build_set = [&] () -> SetPtr
|
||||
{
|
||||
if (block.rows() == 0)
|
||||
continue;
|
||||
LOG_TRACE(getLogger(), "Building set, key: {}", set_key.toString());
|
||||
|
||||
/// If the limits have been exceeded, give up and let the default subquery processing actions take place.
|
||||
if (!set->insertFromBlock(block.getColumnsWithTypeAndName()))
|
||||
return;
|
||||
auto interpreter_subquery = interpretSubquery(subquery_or_table_name, getContext(), {}, query_options);
|
||||
auto io = interpreter_subquery->execute();
|
||||
PullingAsyncPipelineExecutor executor(io.pipeline);
|
||||
|
||||
SetPtr set = std::make_shared<Set>(settings.size_limits_for_set_used_with_index, true, getContext()->getSettingsRef().transform_null_in);
|
||||
set->setHeader(executor.getHeader().getColumnsWithTypeAndName());
|
||||
|
||||
Block block;
|
||||
while (executor.pull(block))
|
||||
{
|
||||
if (block.rows() == 0)
|
||||
continue;
|
||||
|
||||
/// If the limits have been exceeded, give up and let the default subquery processing actions take place.
|
||||
if (!set->insertFromBlock(block.getColumnsWithTypeAndName()))
|
||||
return nullptr;
|
||||
}
|
||||
|
||||
set->finishInsert();
|
||||
|
||||
return set;
|
||||
};
|
||||
|
||||
SetPtr set;
|
||||
|
||||
auto set_cache = getContext()->getPreparedSetsCache();
|
||||
if (set_cache)
|
||||
{
|
||||
auto from_cache = set_cache->findOrPromiseToBuild(set_key.toString());
|
||||
if (from_cache.index() == 0)
|
||||
{
|
||||
set = build_set();
|
||||
std::get<0>(from_cache).set_value(set);
|
||||
}
|
||||
else
|
||||
{
|
||||
LOG_TRACE(getLogger(), "Waiting for set, key: {}", set_key.toString());
|
||||
set = std::get<1>(from_cache).get();
|
||||
}
|
||||
}
|
||||
else
|
||||
{
|
||||
set = build_set();
|
||||
}
|
||||
|
||||
set->finishInsert();
|
||||
if (!set)
|
||||
return;
|
||||
|
||||
prepared_sets->set(set_key, std::move(set));
|
||||
}
|
||||
|
@ -95,6 +95,7 @@ private:
|
||||
{
|
||||
const bool use_index_for_in_with_subqueries;
|
||||
const SizeLimits size_limits_for_set;
|
||||
const SizeLimits size_limits_for_set_used_with_index;
|
||||
const UInt64 distributed_group_by_no_merge;
|
||||
|
||||
explicit ExtractedSettings(const Settings & settings_);
|
||||
|
@ -1,7 +1,10 @@
|
||||
#include <chrono>
|
||||
#include <variant>
|
||||
#include <Interpreters/PreparedSets.h>
|
||||
#include <Processors/QueryPlan/QueryPlan.h>
|
||||
#include <Interpreters/InterpreterSelectWithUnionQuery.h>
|
||||
#include <Interpreters/Set.h>
|
||||
#include <IO/Operators.h>
|
||||
|
||||
namespace DB
|
||||
{
|
||||
@ -43,6 +46,26 @@ bool PreparedSetKey::operator==(const PreparedSetKey & other) const
|
||||
return true;
|
||||
}
|
||||
|
||||
String PreparedSetKey::toString() const
|
||||
{
|
||||
WriteBufferFromOwnString buf;
|
||||
buf << "__set_" << ast_hash.first << "_" << ast_hash.second;
|
||||
if (!types.empty())
|
||||
{
|
||||
buf << "(";
|
||||
bool first = true;
|
||||
for (const auto & type : types)
|
||||
{
|
||||
if (!first)
|
||||
buf << ",";
|
||||
first = false;
|
||||
buf << type->getName();
|
||||
}
|
||||
buf << ")";
|
||||
}
|
||||
return buf.str();
|
||||
}
|
||||
|
||||
SubqueryForSet & PreparedSets::createOrGetSubquery(const String & subquery_id, const PreparedSetKey & key,
|
||||
SizeLimits set_size_limit, bool transform_null_in)
|
||||
{
|
||||
@ -51,10 +74,20 @@ SubqueryForSet & PreparedSets::createOrGetSubquery(const String & subquery_id, c
|
||||
/// If you already created a Set with the same subquery / table for another ast
|
||||
/// In that case several PreparedSetKey would share same subquery and set
|
||||
/// Not sure if it's really possible case (maybe for distributed query when set was filled by external table?)
|
||||
if (subquery.set)
|
||||
if (subquery.set.isValid())
|
||||
sets[key] = subquery.set;
|
||||
else
|
||||
sets[key] = subquery.set = std::make_shared<Set>(set_size_limit, false, transform_null_in);
|
||||
{
|
||||
subquery.set_in_progress = std::make_shared<Set>(set_size_limit, false, transform_null_in);
|
||||
sets[key] = FutureSet(subquery.promise_to_fill_set.get_future());
|
||||
}
|
||||
|
||||
if (!subquery.set_in_progress)
|
||||
{
|
||||
subquery.key = key.toString();
|
||||
subquery.set_in_progress = std::make_shared<Set>(set_size_limit, false, transform_null_in);
|
||||
}
|
||||
|
||||
return subquery;
|
||||
}
|
||||
|
||||
@ -62,13 +95,27 @@ SubqueryForSet & PreparedSets::createOrGetSubquery(const String & subquery_id, c
|
||||
/// It's aimed to fill external table passed to SubqueryForSet::createSource.
|
||||
SubqueryForSet & PreparedSets::getSubquery(const String & subquery_id) { return subqueries[subquery_id]; }
|
||||
|
||||
void PreparedSets::set(const PreparedSetKey & key, SetPtr set_) { sets[key] = set_; }
|
||||
void PreparedSets::set(const PreparedSetKey & key, SetPtr set_) { sets[key] = FutureSet(set_); }
|
||||
|
||||
SetPtr & PreparedSets::get(const PreparedSetKey & key) { return sets[key]; }
|
||||
|
||||
std::vector<SetPtr> PreparedSets::getByTreeHash(IAST::Hash ast_hash)
|
||||
FutureSet PreparedSets::getFuture(const PreparedSetKey & key) const
|
||||
{
|
||||
std::vector<SetPtr> res;
|
||||
auto it = sets.find(key);
|
||||
if (it == sets.end())
|
||||
return {};
|
||||
return it->second;
|
||||
}
|
||||
|
||||
SetPtr PreparedSets::get(const PreparedSetKey & key) const
|
||||
{
|
||||
auto it = sets.find(key);
|
||||
if (it == sets.end() || !it->second.isReady())
|
||||
return nullptr;
|
||||
return it->second.get();
|
||||
}
|
||||
|
||||
std::vector<FutureSet> PreparedSets::getByTreeHash(IAST::Hash ast_hash) const
|
||||
{
|
||||
std::vector<FutureSet> res;
|
||||
for (const auto & it : this->sets)
|
||||
{
|
||||
if (it.first.ast_hash == ast_hash)
|
||||
@ -106,4 +153,45 @@ QueryPlanPtr SubqueryForSet::detachSource()
|
||||
return res;
|
||||
}
|
||||
|
||||
|
||||
FutureSet::FutureSet(SetPtr set)
|
||||
{
|
||||
std::promise<SetPtr> promise;
|
||||
promise.set_value(set);
|
||||
*this = FutureSet(promise.get_future());
|
||||
}
|
||||
|
||||
|
||||
bool FutureSet::isReady() const
|
||||
{
|
||||
return future_set.valid() &&
|
||||
future_set.wait_for(std::chrono::seconds(0)) == std::future_status::ready;
|
||||
}
|
||||
|
||||
bool FutureSet::isCreated() const
|
||||
{
|
||||
return isReady() && get() != nullptr && get()->isCreated();
|
||||
}
|
||||
|
||||
|
||||
std::variant<std::promise<SetPtr>, SharedSet> PreparedSetsCache::findOrPromiseToBuild(const String & key)
|
||||
{
|
||||
std::lock_guard lock(cache_mutex);
|
||||
|
||||
auto it = cache.find(key);
|
||||
if (it != cache.end())
|
||||
{
|
||||
/// If the set is being built, return its future, but if it's ready and is nullptr then we should retry building it.
|
||||
if (it->second.future.valid() &&
|
||||
(it->second.future.wait_for(std::chrono::seconds(0)) != std::future_status::ready || it->second.future.get() != nullptr))
|
||||
return it->second.future;
|
||||
}
|
||||
|
||||
/// Insert the entry into the cache so that other threads can find it and start waiting for the set.
|
||||
std::promise<SetPtr> promise_to_fill_set;
|
||||
Entry & entry = cache[key];
|
||||
entry.future = promise_to_fill_set.get_future();
|
||||
return promise_to_fill_set;
|
||||
}
|
||||
|
||||
};
|
||||
|
@ -2,6 +2,7 @@
|
||||
|
||||
#include <Parsers/IAST.h>
|
||||
#include <DataTypes/IDataType.h>
|
||||
#include <future>
|
||||
#include <memory>
|
||||
#include <unordered_map>
|
||||
#include <vector>
|
||||
@ -19,6 +20,37 @@ class Set;
|
||||
using SetPtr = std::shared_ptr<Set>;
|
||||
class InterpreterSelectWithUnionQuery;
|
||||
|
||||
/// Represents a set in a query that might be referenced at analysis time and built later during execution.
|
||||
/// Also it can represent a constant set that is ready to use.
|
||||
/// At analysis stage the FutureSets are created but not necessarily filled. Then for non-constant sets there
|
||||
/// must be an explicit step to build them before they can be used.
|
||||
/// FutureSet objects can be stored in PreparedSets and are not intended to be used from multiple threads.
|
||||
class FutureSet final
|
||||
{
|
||||
public:
|
||||
FutureSet() = default;
|
||||
|
||||
/// Create FutureSet from an object that will be created in the future.
|
||||
explicit FutureSet(const std::shared_future<SetPtr> & future_set_) : future_set(future_set_) {}
|
||||
|
||||
/// Create FutureSet from a ready set.
|
||||
explicit FutureSet(SetPtr readySet);
|
||||
|
||||
/// The set object will be ready in the future, as opposed to 'null' object when FutureSet is default constructed.
|
||||
bool isValid() const { return future_set.valid(); }
|
||||
|
||||
/// The the value of SetPtr is ready, but the set object might not have been filled yet.
|
||||
bool isReady() const;
|
||||
|
||||
/// The set object is ready and filled.
|
||||
bool isCreated() const;
|
||||
|
||||
SetPtr get() const { chassert(isReady()); return future_set.get(); }
|
||||
|
||||
private:
|
||||
std::shared_future<SetPtr> future_set;
|
||||
};
|
||||
|
||||
/// Information on how to build set for the [GLOBAL] IN section.
|
||||
class SubqueryForSet
|
||||
{
|
||||
@ -33,7 +65,12 @@ public:
|
||||
std::unique_ptr<QueryPlan> detachSource();
|
||||
|
||||
/// Build this set from the result of the subquery.
|
||||
SetPtr set;
|
||||
String key;
|
||||
SetPtr set_in_progress;
|
||||
/// After set_in_progress is finished it will be put into promise_to_fill_set and thus all FutureSet's
|
||||
/// that are referencing this set will be filled.
|
||||
std::promise<SetPtr> promise_to_fill_set;
|
||||
FutureSet set = FutureSet{promise_to_fill_set.get_future()};
|
||||
|
||||
/// If set, put the result into the table.
|
||||
/// This is a temporary table for transferring to remote servers for distributed query processing.
|
||||
@ -59,6 +96,8 @@ struct PreparedSetKey
|
||||
|
||||
bool operator==(const PreparedSetKey & other) const;
|
||||
|
||||
String toString() const;
|
||||
|
||||
struct Hash
|
||||
{
|
||||
UInt64 operator()(const PreparedSetKey & key) const { return key.ast_hash.first; }
|
||||
@ -75,7 +114,8 @@ public:
|
||||
SubqueryForSet & getSubquery(const String & subquery_id);
|
||||
|
||||
void set(const PreparedSetKey & key, SetPtr set_);
|
||||
SetPtr & get(const PreparedSetKey & key);
|
||||
FutureSet getFuture(const PreparedSetKey & key) const;
|
||||
SetPtr get(const PreparedSetKey & key) const;
|
||||
|
||||
/// Get subqueries and clear them.
|
||||
/// We need to build a plan for subqueries just once. That's why we can clear them after accessing them.
|
||||
@ -84,12 +124,12 @@ public:
|
||||
|
||||
/// Returns all sets that match the given ast hash not checking types
|
||||
/// Used in KeyCondition and MergeTreeIndexConditionBloomFilter to make non exact match for types in PreparedSetKey
|
||||
std::vector<SetPtr> getByTreeHash(IAST::Hash ast_hash);
|
||||
std::vector<FutureSet> getByTreeHash(IAST::Hash ast_hash) const;
|
||||
|
||||
bool empty() const;
|
||||
|
||||
private:
|
||||
std::unordered_map<PreparedSetKey, SetPtr, PreparedSetKey::Hash> sets;
|
||||
std::unordered_map<PreparedSetKey, FutureSet, PreparedSetKey::Hash> sets;
|
||||
|
||||
/// This is the information required for building sets
|
||||
SubqueriesForSets subqueries;
|
||||
@ -97,4 +137,31 @@ private:
|
||||
|
||||
using PreparedSetsPtr = std::shared_ptr<PreparedSets>;
|
||||
|
||||
/// A reference to a set that is being built by another task.
|
||||
/// The difference from FutureSet is that this object can be used to wait for the set to be built in another thread.
|
||||
using SharedSet = std::shared_future<SetPtr>;
|
||||
|
||||
/// This set cache is used to avoid building the same set multiple times. It is different from PreparedSets in way that
|
||||
/// it can be used across multiple queries. One use case is when we execute the same mutation on multiple parts. In this
|
||||
/// case each part is processed by a separate mutation task but they can share the same set.
|
||||
class PreparedSetsCache
|
||||
{
|
||||
public:
|
||||
/// Lookup for set in the cache.
|
||||
/// If it is found, get the future to be able to wait for the set to be built.
|
||||
/// Otherwise create a promise, build the set and set the promise value.
|
||||
std::variant<std::promise<SetPtr>, SharedSet> findOrPromiseToBuild(const String & key);
|
||||
|
||||
private:
|
||||
struct Entry
|
||||
{
|
||||
SharedSet future; /// Other tasks can wait for the set to be built.
|
||||
};
|
||||
|
||||
std::mutex cache_mutex;
|
||||
std::unordered_map<String, Entry> cache;
|
||||
};
|
||||
|
||||
using PreparedSetsCachePtr = std::shared_ptr<PreparedSetsCache>;
|
||||
|
||||
}
|
||||
|
@ -236,6 +236,11 @@ bool Set::insertFromBlock(const Columns & columns)
|
||||
return limits.check(data.getTotalRowCount(), data.getTotalByteCount(), "IN-set", ErrorCodes::SET_SIZE_LIMIT_EXCEEDED);
|
||||
}
|
||||
|
||||
void Set::checkIsCreated() const
|
||||
{
|
||||
if (!is_created.load())
|
||||
throw Exception(ErrorCodes::LOGICAL_ERROR, "Logical error: Trying to use set before it has been built.");
|
||||
}
|
||||
|
||||
ColumnPtr Set::execute(const ColumnsWithTypeAndName & columns, bool negative) const
|
||||
{
|
||||
|
@ -54,6 +54,8 @@ public:
|
||||
/// finishInsert and isCreated are thread-safe
|
||||
bool isCreated() const { return is_created.load(); }
|
||||
|
||||
void checkIsCreated() const;
|
||||
|
||||
/** For columns of 'block', check belonging of corresponding rows to the set.
|
||||
* Return UInt8 column with the result.
|
||||
*/
|
||||
@ -67,7 +69,7 @@ public:
|
||||
const DataTypes & getElementsTypes() const { return set_elements_types; }
|
||||
|
||||
bool hasExplicitSetElements() const { return fill_set_elements; }
|
||||
Columns getSetElements() const { return { set_elements.begin(), set_elements.end() }; }
|
||||
Columns getSetElements() const { checkIsCreated(); return { set_elements.begin(), set_elements.end() }; }
|
||||
|
||||
void checkColumnsNumber(size_t num_key_columns) const;
|
||||
bool areTypesEqual(size_t set_type_idx, const DataTypePtr & other_type) const;
|
||||
|
@ -1,6 +1,7 @@
|
||||
#include <Planner/CollectSets.h>
|
||||
|
||||
#include <Interpreters/Context.h>
|
||||
#include <Interpreters/PreparedSets.h>
|
||||
|
||||
#include <Storages/StorageSet.h>
|
||||
|
||||
@ -52,7 +53,8 @@ public:
|
||||
|
||||
if (storage_set)
|
||||
{
|
||||
planner_context.registerSet(set_key, PlannerSet(storage_set->getSet()));
|
||||
/// Handle storage_set as ready set.
|
||||
planner_context.registerSet(set_key, PlannerSet(FutureSet(storage_set->getSet())));
|
||||
}
|
||||
else if (const auto * constant_node = in_second_argument->as<ConstantNode>())
|
||||
{
|
||||
@ -62,16 +64,12 @@ public:
|
||||
constant_node->getResultType(),
|
||||
settings);
|
||||
|
||||
planner_context.registerSet(set_key, PlannerSet(std::move(set)));
|
||||
planner_context.registerSet(set_key, PlannerSet(FutureSet(std::move(set))));
|
||||
}
|
||||
else if (in_second_argument_node_type == QueryTreeNodeType::QUERY ||
|
||||
in_second_argument_node_type == QueryTreeNodeType::UNION)
|
||||
{
|
||||
SizeLimits size_limits_for_set = {settings.max_rows_in_set, settings.max_bytes_in_set, settings.set_overflow_mode};
|
||||
bool tranform_null_in = settings.transform_null_in;
|
||||
auto set = std::make_shared<Set>(size_limits_for_set, false /*fill_set_elements*/, tranform_null_in);
|
||||
|
||||
planner_context.registerSet(set_key, PlannerSet(std::move(set), in_second_argument));
|
||||
planner_context.registerSet(set_key, PlannerSet(in_second_argument));
|
||||
}
|
||||
else
|
||||
{
|
||||
|
@ -890,11 +890,11 @@ void addBuildSubqueriesForSetsStepIfNeeded(QueryPlan & query_plan,
|
||||
for (const auto & node : actions_to_execute->getNodes())
|
||||
{
|
||||
const auto & set_key = node.result_name;
|
||||
const auto * planner_set = planner_context->getSetOrNull(set_key);
|
||||
auto * planner_set = planner_context->getSetOrNull(set_key);
|
||||
if (!planner_set)
|
||||
continue;
|
||||
|
||||
if (planner_set->getSet()->isCreated() || !planner_set->getSubqueryNode())
|
||||
if (planner_set->getSet().isCreated() || !planner_set->getSubqueryNode())
|
||||
continue;
|
||||
|
||||
auto subquery_options = select_query_options.subquery();
|
||||
@ -904,8 +904,16 @@ void addBuildSubqueriesForSetsStepIfNeeded(QueryPlan & query_plan,
|
||||
planner_context->getGlobalPlannerContext());
|
||||
subquery_planner.buildQueryPlanIfNeeded();
|
||||
|
||||
const auto & settings = planner_context->getQueryContext()->getSettingsRef();
|
||||
SizeLimits size_limits_for_set = {settings.max_rows_in_set, settings.max_bytes_in_set, settings.set_overflow_mode};
|
||||
bool tranform_null_in = settings.transform_null_in;
|
||||
auto set = std::make_shared<Set>(size_limits_for_set, false /*fill_set_elements*/, tranform_null_in);
|
||||
|
||||
SubqueryForSet subquery_for_set;
|
||||
subquery_for_set.key = set_key;
|
||||
subquery_for_set.set_in_progress = set;
|
||||
subquery_for_set.set = planner_set->getSet();
|
||||
subquery_for_set.promise_to_fill_set = planner_set->extractPromiseToBuildSet();
|
||||
subquery_for_set.source = std::make_unique<QueryPlan>(std::move(subquery_planner).extractQueryPlan());
|
||||
|
||||
subqueries_for_sets.emplace(set_key, std::move(subquery_for_set));
|
||||
|
@ -632,7 +632,7 @@ PlannerActionsVisitorImpl::NodeNameAndNodeMinLevel PlannerActionsVisitorImpl::ma
|
||||
column.name = set_key;
|
||||
column.type = std::make_shared<DataTypeSet>();
|
||||
|
||||
bool set_is_created = planner_set.getSet()->isCreated();
|
||||
bool set_is_created = planner_set.getSet().isCreated();
|
||||
auto column_set = ColumnSet::create(1, planner_set.getSet());
|
||||
|
||||
if (set_is_created)
|
||||
|
@ -128,7 +128,7 @@ PlannerContext::SetKey PlannerContext::createSetKey(const QueryTreeNodePtr & set
|
||||
|
||||
void PlannerContext::registerSet(const SetKey & key, PlannerSet planner_set)
|
||||
{
|
||||
if (!planner_set.getSet())
|
||||
if (!planner_set.getSet().isValid())
|
||||
throw Exception(ErrorCodes::LOGICAL_ERROR, "Set must be initialized");
|
||||
|
||||
const auto & subquery_node = planner_set.getSubqueryNode();
|
||||
@ -162,7 +162,7 @@ const PlannerSet & PlannerContext::getSetOrThrow(const SetKey & key) const
|
||||
return it->second;
|
||||
}
|
||||
|
||||
const PlannerSet * PlannerContext::getSetOrNull(const SetKey & key) const
|
||||
PlannerSet * PlannerContext::getSetOrNull(const SetKey & key)
|
||||
{
|
||||
auto it = set_key_to_set.find(key);
|
||||
if (it == set_key_to_set.end())
|
||||
|
@ -7,6 +7,7 @@
|
||||
|
||||
#include <Interpreters/Context_fwd.h>
|
||||
#include <Interpreters/Set.h>
|
||||
#include <Interpreters/PreparedSets.h>
|
||||
|
||||
#include <Analyzer/IQueryTreeNode.h>
|
||||
|
||||
@ -56,18 +57,18 @@ class PlannerSet
|
||||
{
|
||||
public:
|
||||
/// Construct planner set that is ready for execution
|
||||
explicit PlannerSet(SetPtr set_)
|
||||
explicit PlannerSet(FutureSet set_)
|
||||
: set(std::move(set_))
|
||||
{}
|
||||
|
||||
/// Construct planner set with set and subquery node
|
||||
explicit PlannerSet(SetPtr set_, QueryTreeNodePtr subquery_node_)
|
||||
: set(std::move(set_))
|
||||
explicit PlannerSet(QueryTreeNodePtr subquery_node_)
|
||||
: set(promise_to_build_set.get_future())
|
||||
, subquery_node(std::move(subquery_node_))
|
||||
{}
|
||||
|
||||
/// Get set
|
||||
const SetPtr & getSet() const
|
||||
/// Get a reference to a set that might be not built yet
|
||||
const FutureSet & getSet() const
|
||||
{
|
||||
return set;
|
||||
}
|
||||
@ -78,8 +79,15 @@ public:
|
||||
return subquery_node;
|
||||
}
|
||||
|
||||
/// This promise will be fulfilled when set is built and all FutureSet objects will become ready
|
||||
std::promise<SetPtr> extractPromiseToBuildSet()
|
||||
{
|
||||
return std::move(promise_to_build_set);
|
||||
}
|
||||
|
||||
private:
|
||||
SetPtr set;
|
||||
std::promise<SetPtr> promise_to_build_set;
|
||||
FutureSet set;
|
||||
|
||||
QueryTreeNodePtr subquery_node;
|
||||
};
|
||||
@ -186,7 +194,7 @@ public:
|
||||
const PlannerSet & getSetOrThrow(const SetKey & key) const;
|
||||
|
||||
/// Get set for key, if no set is registered null is returned
|
||||
const PlannerSet * getSetOrNull(const SetKey & key) const;
|
||||
PlannerSet * getSetOrNull(const SetKey & key);
|
||||
|
||||
/// Get registered sets
|
||||
const SetKeyToSet & getRegisteredSets() const
|
||||
|
@ -60,7 +60,7 @@ void CreatingSetStep::describeActions(FormatSettings & settings) const
|
||||
String prefix(settings.offset, ' ');
|
||||
|
||||
settings.out << prefix;
|
||||
if (subquery_for_set.set)
|
||||
if (subquery_for_set.set_in_progress)
|
||||
settings.out << "Set: ";
|
||||
|
||||
settings.out << description << '\n';
|
||||
@ -68,7 +68,7 @@ void CreatingSetStep::describeActions(FormatSettings & settings) const
|
||||
|
||||
void CreatingSetStep::describeActions(JSONBuilder::JSONMap & map) const
|
||||
{
|
||||
if (subquery_for_set.set)
|
||||
if (subquery_for_set.set_in_progress)
|
||||
map.add("Set", description);
|
||||
}
|
||||
|
||||
|
@ -4,6 +4,7 @@
|
||||
|
||||
#include <Interpreters/Set.h>
|
||||
#include <Interpreters/IJoin.h>
|
||||
#include <Interpreters/Context.h>
|
||||
#include <Storages/IStorage.h>
|
||||
|
||||
#include <Common/logger_useful.h>
|
||||
@ -39,13 +40,52 @@ void CreatingSetsTransform::work()
|
||||
if (!is_initialized)
|
||||
init();
|
||||
|
||||
if (done_with_set && done_with_table)
|
||||
{
|
||||
finishConsume();
|
||||
input.close();
|
||||
}
|
||||
|
||||
IAccumulatingTransform::work();
|
||||
}
|
||||
|
||||
void CreatingSetsTransform::startSubquery()
|
||||
{
|
||||
if (subquery.set)
|
||||
LOG_TRACE(log, "Creating set.");
|
||||
/// Lookup the set in the cache if we don't need to build table.
|
||||
auto ctx = context.lock();
|
||||
if (ctx && ctx->getPreparedSetsCache() && !subquery.table)
|
||||
{
|
||||
/// Try to find the set in the cache and wait for it to be built.
|
||||
/// Retry if the set from cache fails to be built.
|
||||
while (true)
|
||||
{
|
||||
auto from_cache = ctx->getPreparedSetsCache()->findOrPromiseToBuild(subquery.key);
|
||||
if (from_cache.index() == 0)
|
||||
{
|
||||
promise_to_build = std::move(std::get<0>(from_cache));
|
||||
}
|
||||
else
|
||||
{
|
||||
LOG_TRACE(log, "Waiting for set to be build by another thread, key: {}", subquery.key);
|
||||
SharedSet set_built_by_another_thread = std::move(std::get<1>(from_cache));
|
||||
const SetPtr & ready_set = set_built_by_another_thread.get();
|
||||
if (!ready_set)
|
||||
{
|
||||
LOG_TRACE(log, "Failed to use set from cache, key: {}", subquery.key);
|
||||
continue;
|
||||
}
|
||||
|
||||
subquery.promise_to_fill_set.set_value(ready_set);
|
||||
subquery.set_in_progress.reset();
|
||||
done_with_set = true;
|
||||
set_from_cache = true;
|
||||
}
|
||||
break;
|
||||
}
|
||||
}
|
||||
|
||||
if (subquery.set_in_progress)
|
||||
LOG_TRACE(log, "Creating set, key: {}", subquery.key);
|
||||
if (subquery.table)
|
||||
LOG_TRACE(log, "Filling temporary table.");
|
||||
|
||||
@ -53,10 +93,10 @@ void CreatingSetsTransform::startSubquery()
|
||||
/// TODO: make via port
|
||||
table_out = QueryPipeline(subquery.table->write({}, subquery.table->getInMemoryMetadataPtr(), getContext()));
|
||||
|
||||
done_with_set = !subquery.set;
|
||||
done_with_set = !subquery.set_in_progress;
|
||||
done_with_table = !subquery.table;
|
||||
|
||||
if (done_with_set /*&& done_with_join*/ && done_with_table)
|
||||
if ((done_with_set && !set_from_cache) /*&& done_with_join*/ && done_with_table)
|
||||
throw Exception(ErrorCodes::LOGICAL_ERROR, "Logical error: nothing to do with subquery");
|
||||
|
||||
if (table_out.initialized())
|
||||
@ -68,12 +108,16 @@ void CreatingSetsTransform::startSubquery()
|
||||
|
||||
void CreatingSetsTransform::finishSubquery()
|
||||
{
|
||||
if (read_rows != 0)
|
||||
{
|
||||
auto seconds = watch.elapsedNanoseconds() / 1e9;
|
||||
auto seconds = watch.elapsedNanoseconds() / 1e9;
|
||||
|
||||
if (subquery.set)
|
||||
LOG_DEBUG(log, "Created Set with {} entries from {} rows in {} sec.", subquery.set->getTotalRowCount(), read_rows, seconds);
|
||||
if (set_from_cache)
|
||||
{
|
||||
LOG_DEBUG(log, "Got set from cache in {} sec.", seconds);
|
||||
}
|
||||
else if (read_rows != 0)
|
||||
{
|
||||
if (subquery.set_in_progress)
|
||||
LOG_DEBUG(log, "Created Set with {} entries from {} rows in {} sec.", subquery.set_in_progress->getTotalRowCount(), read_rows, seconds);
|
||||
if (subquery.table)
|
||||
LOG_DEBUG(log, "Created Table with {} rows in {} sec.", read_rows, seconds);
|
||||
}
|
||||
@ -87,8 +131,10 @@ void CreatingSetsTransform::init()
|
||||
{
|
||||
is_initialized = true;
|
||||
|
||||
if (subquery.set)
|
||||
subquery.set->setHeader(getInputPort().getHeader().getColumnsWithTypeAndName());
|
||||
if (subquery.set_in_progress)
|
||||
{
|
||||
subquery.set_in_progress->setHeader(getInputPort().getHeader().getColumnsWithTypeAndName());
|
||||
}
|
||||
|
||||
watch.restart();
|
||||
startSubquery();
|
||||
@ -101,7 +147,7 @@ void CreatingSetsTransform::consume(Chunk chunk)
|
||||
|
||||
if (!done_with_set)
|
||||
{
|
||||
if (!subquery.set->insertFromBlock(block.getColumnsWithTypeAndName()))
|
||||
if (!subquery.set_in_progress->insertFromBlock(block.getColumnsWithTypeAndName()))
|
||||
done_with_set = true;
|
||||
}
|
||||
|
||||
@ -124,8 +170,13 @@ void CreatingSetsTransform::consume(Chunk chunk)
|
||||
|
||||
Chunk CreatingSetsTransform::generate()
|
||||
{
|
||||
if (subquery.set)
|
||||
subquery.set->finishInsert();
|
||||
if (subquery.set_in_progress)
|
||||
{
|
||||
subquery.set_in_progress->finishInsert();
|
||||
subquery.promise_to_fill_set.set_value(subquery.set_in_progress);
|
||||
if (promise_to_build)
|
||||
promise_to_build->set_value(subquery.set_in_progress);
|
||||
}
|
||||
|
||||
if (table_out.initialized())
|
||||
{
|
||||
|
@ -43,10 +43,12 @@ public:
|
||||
|
||||
private:
|
||||
SubqueryForSet subquery;
|
||||
std::optional<std::promise<SetPtr>> promise_to_build;
|
||||
|
||||
QueryPipeline table_out;
|
||||
std::unique_ptr<PushingPipelineExecutor> executor;
|
||||
UInt64 read_rows = 0;
|
||||
bool set_from_cache = false;
|
||||
Stopwatch watch;
|
||||
|
||||
bool done_with_set = true;
|
||||
|
@ -52,6 +52,14 @@ void MutatePlainMergeTreeTask::prepare()
|
||||
std::move(profile_counters_snapshot));
|
||||
};
|
||||
|
||||
if (task_context->getSettingsRef().enable_sharing_sets_for_mutations)
|
||||
{
|
||||
/// If we have a prepared sets cache for this mutations, we will use it.
|
||||
auto mutation_id = future_part->part_info.mutation;
|
||||
auto prepared_sets_cache_for_mutation = storage.getPreparedSetsCache(mutation_id);
|
||||
task_context->setPreparedSetsCache(prepared_sets_cache_for_mutation);
|
||||
}
|
||||
|
||||
mutate_task = storage.merger_mutator.mutatePartToTemporaryPart(
|
||||
future_part, metadata_snapshot, merge_mutate_entry->commands, merge_list_entry.get(),
|
||||
time(nullptr), task_context, merge_mutate_entry->txn, merge_mutate_entry->tagger->reserved_space, table_lock_holder);
|
||||
|
@ -1724,6 +1724,8 @@ bool MutateTask::prepare()
|
||||
/// Allow mutations to work when force_index_by_date or force_primary_key is on.
|
||||
context_for_reading->setSetting("force_index_by_date", false);
|
||||
context_for_reading->setSetting("force_primary_key", false);
|
||||
/// Skip using large sets in KeyCondition
|
||||
context_for_reading->setSetting("use_index_for_in_with_subqueries_max_values", 100000);
|
||||
|
||||
for (const auto & command : *ctx->commands)
|
||||
if (!canSkipMutationCommandForPart(ctx->source_part, command, context_for_reading))
|
||||
|
@ -288,7 +288,7 @@ ConstSetPtr tryGetSetFromDAGNode(const ActionsDAG::Node * dag_node)
|
||||
{
|
||||
auto set = column_set->getData();
|
||||
|
||||
if (set->isCreated())
|
||||
if (set && set->isCreated())
|
||||
return set;
|
||||
}
|
||||
|
||||
@ -305,8 +305,8 @@ ConstSetPtr RPNBuilderTreeNode::tryGetPreparedSet() const
|
||||
{
|
||||
auto prepared_sets_with_same_hash = prepared_sets->getByTreeHash(ast_node->getTreeHash());
|
||||
for (auto & set : prepared_sets_with_same_hash)
|
||||
if (set->isCreated())
|
||||
return set;
|
||||
if (set.isCreated())
|
||||
return set.get();
|
||||
}
|
||||
else if (dag_node)
|
||||
{
|
||||
@ -368,8 +368,8 @@ ConstSetPtr RPNBuilderTreeNode::tryGetPreparedSet(
|
||||
auto tree_hash = ast_node->getTreeHash();
|
||||
for (const auto & set : prepared_sets->getByTreeHash(tree_hash))
|
||||
{
|
||||
if (types_match(set))
|
||||
return set;
|
||||
if (set.isCreated() && types_match(set.get()))
|
||||
return set.get();
|
||||
}
|
||||
}
|
||||
else
|
||||
|
@ -2181,6 +2181,35 @@ std::unique_ptr<MergeTreeSettings> StorageMergeTree::getDefaultSettings() const
|
||||
return std::make_unique<MergeTreeSettings>(getContext()->getMergeTreeSettings());
|
||||
}
|
||||
|
||||
PreparedSetsCachePtr StorageMergeTree::getPreparedSetsCache(Int64 mutation_id)
|
||||
{
|
||||
auto l = std::lock_guard(mutation_prepared_sets_cache_mutex);
|
||||
|
||||
/// Cleanup stale entries where the shared_ptr is expired.
|
||||
while (!mutation_prepared_sets_cache.empty())
|
||||
{
|
||||
auto it = mutation_prepared_sets_cache.begin();
|
||||
if (it->second.lock())
|
||||
break;
|
||||
mutation_prepared_sets_cache.erase(it);
|
||||
}
|
||||
|
||||
/// Look up an existing entry.
|
||||
auto it = mutation_prepared_sets_cache.find(mutation_id);
|
||||
if (it != mutation_prepared_sets_cache.end())
|
||||
{
|
||||
/// If the entry is still alive, return it.
|
||||
auto existing_set_cache = it->second.lock();
|
||||
if (existing_set_cache)
|
||||
return existing_set_cache;
|
||||
}
|
||||
|
||||
/// Create new entry.
|
||||
auto cache = std::make_shared<PreparedSetsCache>();
|
||||
mutation_prepared_sets_cache[mutation_id] = cache;
|
||||
return cache;
|
||||
}
|
||||
|
||||
void StorageMergeTree::fillNewPartName(MutableDataPartPtr & part, DataPartsLock &)
|
||||
{
|
||||
part->info.min_block = part->info.max_block = increment.get();
|
||||
|
@ -151,6 +151,13 @@ private:
|
||||
std::atomic<bool> shutdown_called {false};
|
||||
std::atomic<bool> flush_called {false};
|
||||
|
||||
/// PreparedSets cache for one executing mutation.
|
||||
/// NOTE: we only store weak_ptr to PreparedSetsCache, so that the cache is shared between mutation tasks that are executed in parallel.
|
||||
/// The goal is to avoiding consuming a lot of memory when the same big sets are used by multiple tasks at the same time.
|
||||
/// If the tasks are executed without time overlap, we will destroy the cache to free memory, and the next task might rebuild the same sets.
|
||||
std::mutex mutation_prepared_sets_cache_mutex;
|
||||
std::map<Int64, PreparedSetsCachePtr::weak_type> mutation_prepared_sets_cache;
|
||||
|
||||
void loadMutations();
|
||||
|
||||
/// Load and initialize deduplication logs. Even if deduplication setting
|
||||
@ -259,6 +266,8 @@ private:
|
||||
|
||||
std::unique_ptr<MergeTreeSettings> getDefaultSettings() const override;
|
||||
|
||||
PreparedSetsCachePtr getPreparedSetsCache(Int64 mutation_id);
|
||||
|
||||
friend class MergeTreeSink;
|
||||
friend class MergeTreeData;
|
||||
friend class MergePlainMergeTreeTask;
|
||||
|
@ -0,0 +1,9 @@
|
||||
40000
|
||||
all_1_1_0
|
||||
all_2_2_0
|
||||
all_3_3_0
|
||||
all_4_4_0
|
||||
5000 all_1_1_0_9
|
||||
5000 all_2_2_0_9
|
||||
5000 all_3_3_0_9
|
||||
5000 all_4_4_0_9
|
@ -0,0 +1,27 @@
|
||||
-- Tags: long, no-debug, no-tsan, no-asan, no-ubsan, no-msan
|
||||
|
||||
DROP TABLE IF EXISTS 02581_trips;
|
||||
|
||||
CREATE TABLE 02581_trips(id UInt32, description String, id2 UInt32, PRIMARY KEY id) ENGINE=MergeTree ORDER BY id;
|
||||
|
||||
-- Make multiple parts
|
||||
INSERT INTO 02581_trips SELECT number, '', number FROM numbers(10000);
|
||||
INSERT INTO 02581_trips SELECT number+10000000, '', number FROM numbers(10000);
|
||||
INSERT INTO 02581_trips SELECT number+20000000, '', number FROM numbers(10000);
|
||||
INSERT INTO 02581_trips SELECT number+30000000, '', number FROM numbers(10000);
|
||||
|
||||
SELECT count() from 02581_trips WHERE description = '';
|
||||
|
||||
SELECT name FROM system.parts WHERE database=currentDatabase() AND table = '02581_trips' AND active ORDER BY name;
|
||||
|
||||
-- Start multiple mutations simultaneously
|
||||
SYSTEM STOP MERGES 02581_trips;
|
||||
ALTER TABLE 02581_trips UPDATE description='5' WHERE id IN (SELECT (number*10 + 5)::UInt32 FROM numbers(200000000)) SETTINGS mutations_sync=0;
|
||||
ALTER TABLE 02581_trips UPDATE description='6' WHERE id IN (SELECT (number*10 + 6)::UInt32 FROM numbers(200000000)) SETTINGS mutations_sync=0;
|
||||
ALTER TABLE 02581_trips DELETE WHERE id IN (SELECT (number*10 + 7)::UInt32 FROM numbers(200000000)) SETTINGS mutations_sync=0;
|
||||
ALTER TABLE 02581_trips UPDATE description='8' WHERE id IN (SELECT (number*10 + 8)::UInt32 FROM numbers(200000000)) SETTINGS mutations_sync=0;
|
||||
SYSTEM START MERGES 02581_trips;
|
||||
DELETE FROM 02581_trips WHERE id IN (SELECT (number*10 + 9)::UInt32 FROM numbers(200000000));
|
||||
SELECT count(), _part from 02581_trips WHERE description = '' GROUP BY _part ORDER BY _part;
|
||||
|
||||
DROP TABLE 02581_trips;
|
@ -0,0 +1,19 @@
|
||||
-- { echoOn }
|
||||
SELECT count(), _part FROM 02581_trips GROUP BY _part ORDER BY _part;
|
||||
10000 all_1_1_0
|
||||
10000 all_2_2_0
|
||||
10000 all_3_3_0
|
||||
10000 all_4_4_0
|
||||
-- Run mutation with a 'IN big subquery'
|
||||
ALTER TABLE 02581_trips UPDATE description='1' WHERE id IN (SELECT (number*10+1)::UInt32 FROM numbers(10000000)) SETTINGS mutations_sync=2;
|
||||
SELECT count(), _part FROM 02581_trips WHERE description = '' GROUP BY _part ORDER BY _part;
|
||||
9000 all_1_1_0_5
|
||||
9000 all_2_2_0_5
|
||||
9000 all_3_3_0_5
|
||||
9000 all_4_4_0_5
|
||||
ALTER TABLE 02581_trips UPDATE description='2' WHERE id IN (SELECT (number*10+2)::UInt32 FROM numbers(10000)) SETTINGS mutations_sync=2;
|
||||
SELECT count(), _part FROM 02581_trips WHERE description = '' GROUP BY _part ORDER BY _part;
|
||||
8000 all_1_1_0_6
|
||||
8000 all_2_2_0_6
|
||||
8000 all_3_3_0_6
|
||||
8000 all_4_4_0_6
|
@ -0,0 +1,21 @@
|
||||
DROP TABLE IF EXISTS 02581_trips;
|
||||
|
||||
CREATE TABLE 02581_trips(id UInt32, description String) ENGINE=MergeTree ORDER BY id;
|
||||
|
||||
-- Make multiple parts
|
||||
INSERT INTO 02581_trips SELECT number, '' FROM numbers(10000);
|
||||
INSERT INTO 02581_trips SELECT number+10000, '' FROM numbers(10000);
|
||||
INSERT INTO 02581_trips SELECT number+20000, '' FROM numbers(10000);
|
||||
INSERT INTO 02581_trips SELECT number+30000, '' FROM numbers(10000);
|
||||
|
||||
-- { echoOn }
|
||||
SELECT count(), _part FROM 02581_trips GROUP BY _part ORDER BY _part;
|
||||
|
||||
-- Run mutation with a 'IN big subquery'
|
||||
ALTER TABLE 02581_trips UPDATE description='1' WHERE id IN (SELECT (number*10+1)::UInt32 FROM numbers(10000000)) SETTINGS mutations_sync=2;
|
||||
SELECT count(), _part FROM 02581_trips WHERE description = '' GROUP BY _part ORDER BY _part;
|
||||
ALTER TABLE 02581_trips UPDATE description='2' WHERE id IN (SELECT (number*10+2)::UInt32 FROM numbers(10000)) SETTINGS mutations_sync=2;
|
||||
SELECT count(), _part FROM 02581_trips WHERE description = '' GROUP BY _part ORDER BY _part;
|
||||
-- { echoOff }
|
||||
|
||||
DROP TABLE 02581_trips;
|
@ -0,0 +1,12 @@
|
||||
40000
|
||||
all_1_1_0
|
||||
all_2_2_0
|
||||
all_3_3_0
|
||||
all_4_4_0
|
||||
36000
|
||||
32000
|
||||
28000
|
||||
24000
|
||||
20000
|
||||
16000
|
||||
12000
|
@ -0,0 +1,57 @@
|
||||
-- Tags: long, no-tsan, no-asan, no-ubsan, no-msan
|
||||
|
||||
DROP TABLE IF EXISTS 02581_trips;
|
||||
|
||||
CREATE TABLE 02581_trips(id UInt32, description String, id2 UInt32, PRIMARY KEY id) ENGINE=MergeTree ORDER BY id;
|
||||
|
||||
-- Make multiple parts
|
||||
INSERT INTO 02581_trips SELECT number, '', number FROM numbers(10000);
|
||||
INSERT INTO 02581_trips SELECT number+10000000, '', number FROM numbers(10000);
|
||||
INSERT INTO 02581_trips SELECT number+20000000, '', number FROM numbers(10000);
|
||||
INSERT INTO 02581_trips SELECT number+30000000, '', number FROM numbers(10000);
|
||||
|
||||
SELECT count() from 02581_trips WHERE description = '';
|
||||
|
||||
|
||||
SELECT name FROM system.parts WHERE database=currentDatabase() AND table = '02581_trips' AND active ORDER BY name;
|
||||
|
||||
-- Run mutation with `id` a 'IN big subquery'
|
||||
ALTER TABLE 02581_trips UPDATE description='a' WHERE id IN (SELECT (number*10)::UInt32 FROM numbers(200000000)) SETTINGS mutations_sync=2;
|
||||
SELECT count() from 02581_trips WHERE description = '';
|
||||
|
||||
ALTER TABLE 02581_trips UPDATE description='a' WHERE id IN (SELECT (number*10 + 1)::UInt32 FROM numbers(200000000)) SETTINGS mutations_sync=2, max_rows_in_set=1000;
|
||||
SELECT count() from 02581_trips WHERE description = '';
|
||||
|
||||
-- Run mutation with func(`id`) IN big subquery
|
||||
ALTER TABLE 02581_trips UPDATE description='b' WHERE id::UInt64 IN (SELECT (number*10 + 2)::UInt32 FROM numbers(200000000)) SETTINGS mutations_sync=2;
|
||||
SELECT count() from 02581_trips WHERE description = '';
|
||||
|
||||
-- Run mutation with non-PK `id2` IN big subquery
|
||||
ALTER TABLE 02581_trips UPDATE description='c' WHERE id2 IN (SELECT (number*10 + 3)::UInt32 FROM numbers(200000000)) SETTINGS mutations_sync=2;
|
||||
SELECT count() from 02581_trips WHERE description = '';
|
||||
|
||||
-- Run mutation with PK and non-PK IN big subquery
|
||||
ALTER TABLE 02581_trips UPDATE description='c'
|
||||
WHERE
|
||||
(id IN (SELECT (number*10 + 4)::UInt32 FROM numbers(200000000))) OR
|
||||
(id2 IN (SELECT (number*10 + 4)::UInt32 FROM numbers(200000000)))
|
||||
SETTINGS mutations_sync=2;
|
||||
SELECT count() from 02581_trips WHERE description = '';
|
||||
|
||||
-- Run mutation with PK and non-PK IN big subquery
|
||||
ALTER TABLE 02581_trips UPDATE description='c'
|
||||
WHERE
|
||||
(id::UInt64 IN (SELECT (number*10 + 5)::UInt32 FROM numbers(200000000))) OR
|
||||
(id2::UInt64 IN (SELECT (number*10 + 5)::UInt32 FROM numbers(200000000)))
|
||||
SETTINGS mutations_sync=2;
|
||||
SELECT count() from 02581_trips WHERE description = '';
|
||||
|
||||
-- Run mutation with PK and non-PK IN big subquery
|
||||
ALTER TABLE 02581_trips UPDATE description='c'
|
||||
WHERE
|
||||
(id::UInt32 IN (SELECT (number*10 + 6)::UInt32 FROM numbers(200000000))) OR
|
||||
((id2+1)::String IN (SELECT (number*10 + 6)::UInt32 FROM numbers(200000000)))
|
||||
SETTINGS mutations_sync=2;
|
||||
SELECT count() from 02581_trips WHERE description = '';
|
||||
|
||||
DROP TABLE 02581_trips;
|
@ -0,0 +1,7 @@
|
||||
40000
|
||||
all_1_1_0
|
||||
all_2_2_0
|
||||
all_3_3_0
|
||||
all_4_4_0
|
||||
36000
|
||||
32000
|
@ -0,0 +1,32 @@
|
||||
DROP TABLE IF EXISTS 02581_trips;
|
||||
|
||||
CREATE TABLE 02581_trips(id UInt32, description String, id2 UInt32, PRIMARY KEY id) ENGINE=MergeTree ORDER BY id;
|
||||
|
||||
-- Make multiple parts
|
||||
INSERT INTO 02581_trips SELECT number, '', number FROM numbers(10000);
|
||||
INSERT INTO 02581_trips SELECT number+10000000, '', number FROM numbers(10000);
|
||||
INSERT INTO 02581_trips SELECT number+20000000, '', number FROM numbers(10000);
|
||||
INSERT INTO 02581_trips SELECT number+30000000, '', number FROM numbers(10000);
|
||||
|
||||
SELECT count() from 02581_trips WHERE description = '';
|
||||
|
||||
|
||||
SELECT name FROM system.parts WHERE database=currentDatabase() AND table = '02581_trips' AND active ORDER BY name;
|
||||
|
||||
CREATE TABLE 02581_set (id UInt32) ENGINE = Set;
|
||||
|
||||
INSERT INTO 02581_set SELECT number*10+7 FROM numbers(10000000);
|
||||
|
||||
-- Run mutation with PK `id` IN big set
|
||||
ALTER TABLE 02581_trips UPDATE description='d' WHERE id IN 02581_set SETTINGS mutations_sync=2;
|
||||
SELECT count() from 02581_trips WHERE description = '';
|
||||
|
||||
INSERT INTO 02581_set SELECT number*10+8 FROM numbers(10000000);
|
||||
|
||||
-- Run mutation with PK `id` IN big set after it is updated
|
||||
ALTER TABLE 02581_trips UPDATE description='d' WHERE id IN 02581_set SETTINGS mutations_sync=2;
|
||||
SELECT count() from 02581_trips WHERE description = '';
|
||||
|
||||
|
||||
DROP TABLE 02581_set;
|
||||
DROP TABLE 02581_trips;
|
Loading…
Reference in New Issue
Block a user