Merge branch 'arrayfunc_for_wideint' of github.com:infdahai/ClickHouse into arrayfunc_for_wideint

This commit is contained in:
clundro 2023-04-18 12:16:38 +08:00
commit b9615ac75d
42 changed files with 682 additions and 96 deletions

@ -1 +1 @@
Subproject commit e0accd517933ebb44aff84bc8db448ffd8ef1929 Subproject commit 2aedf7598a4040b23881dbe05b6afaca25a337ef

View File

@ -5124,7 +5124,7 @@ ProjectionNames QueryAnalyzer::resolveFunction(QueryTreeNodePtr & node, Identifi
/// Create constant set column for constant folding /// Create constant set column for constant folding
auto column_set = ColumnSet::create(1, std::move(set)); auto column_set = ColumnSet::create(1, FutureSet(std::move(set)));
argument_columns[1].column = ColumnConst::create(std::move(column_set), 1); argument_columns[1].column = ColumnConst::create(std::move(column_set), 1);
} }

View File

@ -1,5 +1,6 @@
#pragma once #pragma once
#include <Interpreters/PreparedSets.h>
#include <Columns/IColumnDummy.h> #include <Columns/IColumnDummy.h>
#include <Core/Field.h> #include <Core/Field.h>
@ -20,7 +21,7 @@ class ColumnSet final : public COWHelper<IColumnDummy, ColumnSet>
private: private:
friend class COWHelper<IColumnDummy, ColumnSet>; friend class COWHelper<IColumnDummy, ColumnSet>;
ColumnSet(size_t s_, const ConstSetPtr & data_) : data(data_) { s = s_; } ColumnSet(size_t s_, FutureSet data_) : data(std::move(data_)) { s = s_; }
ColumnSet(const ColumnSet &) = default; ColumnSet(const ColumnSet &) = default;
public: public:
@ -28,13 +29,13 @@ public:
TypeIndex getDataType() const override { return TypeIndex::Set; } TypeIndex getDataType() const override { return TypeIndex::Set; }
MutableColumnPtr cloneDummy(size_t s_) const override { return ColumnSet::create(s_, data); } MutableColumnPtr cloneDummy(size_t s_) const override { return ColumnSet::create(s_, data); }
ConstSetPtr getData() const { return data; } ConstSetPtr getData() const { if (!data.isReady()) return nullptr; return data.get(); }
// Used only for debugging, making it DUMPABLE // Used only for debugging, making it DUMPABLE
Field operator[](size_t) const override { return {}; } Field operator[](size_t) const override { return {}; }
private: private:
ConstSetPtr data; FutureSet data;
}; };
} }

View File

@ -302,6 +302,7 @@ class IColumn;
M(Bool, http_skip_not_found_url_for_globs, true, "Skip url's for globs with HTTP_NOT_FOUND error", 0) \ M(Bool, http_skip_not_found_url_for_globs, true, "Skip url's for globs with HTTP_NOT_FOUND error", 0) \
M(Bool, optimize_throw_if_noop, false, "If setting is enabled and OPTIMIZE query didn't actually assign a merge then an explanatory exception is thrown", 0) \ M(Bool, optimize_throw_if_noop, false, "If setting is enabled and OPTIMIZE query didn't actually assign a merge then an explanatory exception is thrown", 0) \
M(Bool, use_index_for_in_with_subqueries, true, "Try using an index if there is a subquery or a table expression on the right side of the IN operator.", 0) \ M(Bool, use_index_for_in_with_subqueries, true, "Try using an index if there is a subquery or a table expression on the right side of the IN operator.", 0) \
M(UInt64, use_index_for_in_with_subqueries_max_values, 0, "The maximum size of set in the right hand side of the IN operator to use table index for filtering. It allows to avoid performance degradation and higher memory usage due to preparation of additional data structures for large queries. Zero means no limit.", 0) \
M(Bool, joined_subquery_requires_alias, true, "Force joined subqueries and table functions to have aliases for correct name qualification.", 0) \ M(Bool, joined_subquery_requires_alias, true, "Force joined subqueries and table functions to have aliases for correct name qualification.", 0) \
M(Bool, empty_result_for_aggregation_by_empty_set, false, "Return empty result when aggregating without keys on empty set.", 0) \ M(Bool, empty_result_for_aggregation_by_empty_set, false, "Return empty result when aggregating without keys on empty set.", 0) \
M(Bool, empty_result_for_aggregation_by_constant_keys_on_empty_set, true, "Return empty result when aggregating by constant keys on empty set.", 0) \ M(Bool, empty_result_for_aggregation_by_constant_keys_on_empty_set, true, "Return empty result when aggregating by constant keys on empty set.", 0) \
@ -569,6 +570,7 @@ class IColumn;
M(Bool, query_cache_squash_partial_results, true, "Squash partial result blocks to blocks of size 'max_block_size'. Reduces performance of inserts into the query cache but improves the compressability of cache entries.", 0) \ M(Bool, query_cache_squash_partial_results, true, "Squash partial result blocks to blocks of size 'max_block_size'. Reduces performance of inserts into the query cache but improves the compressability of cache entries.", 0) \
M(Seconds, query_cache_ttl, 60, "After this time in seconds entries in the query cache become stale", 0) \ M(Seconds, query_cache_ttl, 60, "After this time in seconds entries in the query cache become stale", 0) \
M(Bool, query_cache_share_between_users, false, "Allow other users to read entry in the query cache", 0) \ M(Bool, query_cache_share_between_users, false, "Allow other users to read entry in the query cache", 0) \
M(Bool, enable_sharing_sets_for_mutations, true, "Allow sharing set objects build for IN subqueries between different tasks of the same mutation. This reduces memory usage and CPU consumption", 0) \
\ \
M(Bool, optimize_rewrite_sum_if_to_count_if, false, "Rewrite sumIf() and sum(if()) function countIf() function when logically equivalent", 0) \ M(Bool, optimize_rewrite_sum_if_to_count_if, false, "Rewrite sumIf() and sum(if()) function countIf() function when logically equivalent", 0) \
M(Bool, optimize_rewrite_aggregate_function_with_if, true, "Rewrite aggregate functions with if expression as argument when logically equivalent. For example, avg(if(cond, col, null)) can be rewritten to avgIf(cond, col)", 0) \ M(Bool, optimize_rewrite_aggregate_function_with_if, true, "Rewrite aggregate functions with if expression as argument when logically equivalent. For example, avg(if(cond, col, null)) can be rewritten to avgIf(cond, col)", 0) \

View File

@ -20,7 +20,7 @@ public:
bool isParametric() const override { return true; } bool isParametric() const override { return true; }
// Used for expressions analysis. // Used for expressions analysis.
MutableColumnPtr createColumn() const override { return ColumnSet::create(0, nullptr); } MutableColumnPtr createColumn() const override { return ColumnSet::create(0, FutureSet{}); }
// Used only for debugging, making it DUMPABLE // Used only for debugging, making it DUMPABLE
Field getDefault() const override { return Tuple(); } Field getDefault() const override { return Tuple(); }

View File

@ -42,6 +42,8 @@ void registerDiskCache(DiskFactory & factory, bool /* global_skip_access_check *
if (file_cache_settings.base_path.empty()) if (file_cache_settings.base_path.empty())
file_cache_settings.base_path = fs::path(context->getPath()) / "disks" / name / "cache/"; file_cache_settings.base_path = fs::path(context->getPath()) / "disks" / name / "cache/";
else if (fs::path(file_cache_settings.base_path).is_relative())
file_cache_settings.base_path = fs::path(context->getPath()) / "caches" / file_cache_settings.base_path;
auto cache = FileCacheFactory::instance().getOrCreate(name, file_cache_settings); auto cache = FileCacheFactory::instance().getOrCreate(name, file_cache_settings);
auto disk = disk_it->second; auto disk = disk_it->second;

View File

@ -0,0 +1,37 @@
#include <gtest/gtest.h>
#include <sstream>
#include <string>
// There are few places where stringstream is used to pass data to some 3d
// party code.
//
// And there was problems with feeding > INT_MAX to stringstream in libc++,
// this is the regression test for it.
//
// Since that places in Clickhouse can operate on buffers > INT_MAX (i.e.
// WriteBufferFromS3), so it is better to have a test for this in ClickHouse
// too.
TEST(stringstream, INTMAX)
{
std::stringstream ss;
ss.exceptions(std::ios::badbit);
std::string payload(1<<20, 'A');
// write up to INT_MAX-1MiB
for (size_t i = 0; i < (2ULL<<30) - payload.size(); i += payload.size())
{
ASSERT_NE(ss.tellp(), -1);
ss.write(payload.data(), payload.size());
// std::cerr << "i: " << ss.tellp()/1024/1024 << " MB\n";
}
ASSERT_NE(ss.tellp(), -1);
// write up to INT_MAX
ss.write(payload.data(), payload.size());
ASSERT_NE(ss.tellp(), -1);
// write one more 1MiB chunk
ss.write(payload.data(), payload.size());
}

View File

@ -952,14 +952,16 @@ void ActionsMatcher::visit(const ASTFunction & node, const ASTPtr & ast, Data &
return; return;
} }
SetPtr prepared_set; FutureSet prepared_set;
if (checkFunctionIsInOrGlobalInOperator(node)) if (checkFunctionIsInOrGlobalInOperator(node))
{ {
/// Let's find the type of the first argument (then getActionsImpl will be called again and will not affect anything). /// Let's find the type of the first argument (then getActionsImpl will be called again and will not affect anything).
visit(node.arguments->children.at(0), data); visit(node.arguments->children.at(0), data);
if (!data.no_makeset && !(data.is_create_parameterized_view && !analyzeReceiveQueryParams(ast).empty()) if (!data.no_makeset && !(data.is_create_parameterized_view && !analyzeReceiveQueryParams(ast).empty()))
&& (prepared_set = makeSet(node, data, data.no_subqueries))) prepared_set = makeSet(node, data, data.no_subqueries);
if (prepared_set.isValid())
{ {
/// Transform tuple or subquery into a set. /// Transform tuple or subquery into a set.
} }
@ -1172,14 +1174,15 @@ void ActionsMatcher::visit(const ASTFunction & node, const ASTPtr & ast, Data &
num_arguments += columns.size() - 1; num_arguments += columns.size() - 1;
arg += columns.size() - 1; arg += columns.size() - 1;
} }
else if (checkFunctionIsInOrGlobalInOperator(node) && arg == 1 && prepared_set) else if (checkFunctionIsInOrGlobalInOperator(node) && arg == 1 && prepared_set.isValid())
{ {
ColumnWithTypeAndName column; ColumnWithTypeAndName column;
column.type = std::make_shared<DataTypeSet>(); column.type = std::make_shared<DataTypeSet>();
/// If the argument is a set given by an enumeration of values (so, the set was already built), give it a unique name, /// If the argument is a set given by an enumeration of values (so, the set was already built), give it a unique name,
/// so that sets with the same literal representation do not fuse together (they can have different types). /// so that sets with the same literal representation do not fuse together (they can have different types).
if (!prepared_set->empty()) const bool is_constant_set = prepared_set.isCreated();
if (is_constant_set)
column.name = data.getUniqueName("__set"); column.name = data.getUniqueName("__set");
else else
column.name = child->getColumnName(); column.name = child->getColumnName();
@ -1189,7 +1192,7 @@ void ActionsMatcher::visit(const ASTFunction & node, const ASTPtr & ast, Data &
auto column_set = ColumnSet::create(1, prepared_set); auto column_set = ColumnSet::create(1, prepared_set);
/// If prepared_set is not empty, we have a set made with literals. /// If prepared_set is not empty, we have a set made with literals.
/// Create a const ColumnSet to make constant folding work /// Create a const ColumnSet to make constant folding work
if (!prepared_set->empty()) if (is_constant_set)
column.column = ColumnConst::create(std::move(column_set), 1); column.column = ColumnConst::create(std::move(column_set), 1);
else else
column.column = std::move(column_set); column.column = std::move(column_set);
@ -1370,10 +1373,10 @@ void ActionsMatcher::visit(const ASTLiteral & literal, const ASTPtr & /* ast */,
data.addColumn(std::move(column)); data.addColumn(std::move(column));
} }
SetPtr ActionsMatcher::makeSet(const ASTFunction & node, Data & data, bool no_subqueries) FutureSet ActionsMatcher::makeSet(const ASTFunction & node, Data & data, bool no_subqueries)
{ {
if (!data.prepared_sets) if (!data.prepared_sets)
return nullptr; return {};
/** You need to convert the right argument to a set. /** You need to convert the right argument to a set.
* This can be a table name, a value, a value enumeration, or a subquery. * This can be a table name, a value, a value enumeration, or a subquery.
@ -1390,8 +1393,12 @@ SetPtr ActionsMatcher::makeSet(const ASTFunction & node, Data & data, bool no_su
if (no_subqueries) if (no_subqueries)
return {}; return {};
auto set_key = PreparedSetKey::forSubquery(*right_in_operand); auto set_key = PreparedSetKey::forSubquery(*right_in_operand);
if (SetPtr set = data.prepared_sets->get(set_key))
{
auto set = data.prepared_sets->getFuture(set_key);
if (set.isValid())
return set; return set;
}
/// A special case is if the name of the table is specified on the right side of the IN statement, /// A special case is if the name of the table is specified on the right side of the IN statement,
/// and the table has the type Set (a previously prepared set). /// and the table has the type Set (a previously prepared set).
@ -1407,7 +1414,7 @@ SetPtr ActionsMatcher::makeSet(const ASTFunction & node, Data & data, bool no_su
{ {
SetPtr set = storage_set->getSet(); SetPtr set = storage_set->getSet();
data.prepared_sets->set(set_key, set); data.prepared_sets->set(set_key, set);
return set; return FutureSet(set);
} }
} }
} }
@ -1439,7 +1446,8 @@ SetPtr ActionsMatcher::makeSet(const ASTFunction & node, Data & data, bool no_su
const auto & index = data.actions_stack.getLastActionsIndex(); const auto & index = data.actions_stack.getLastActionsIndex();
if (data.prepared_sets && index.contains(left_in_operand->getColumnName())) if (data.prepared_sets && index.contains(left_in_operand->getColumnName()))
/// An explicit enumeration of values in parentheses. /// An explicit enumeration of values in parentheses.
return makeExplicitSet(&node, last_actions, false, data.getContext(), data.set_size_limit, *data.prepared_sets); return FutureSet(
makeExplicitSet(&node, last_actions, false, data.getContext(), data.set_size_limit, *data.prepared_sets));
else else
return {}; return {};
} }

View File

@ -219,7 +219,7 @@ private:
static void visit(const ASTLiteral & literal, const ASTPtr & ast, Data & data); static void visit(const ASTLiteral & literal, const ASTPtr & ast, Data & data);
static void visit(ASTExpressionList & expression_list, const ASTPtr & ast, Data & data); static void visit(ASTExpressionList & expression_list, const ASTPtr & ast, Data & data);
static SetPtr makeSet(const ASTFunction & node, Data & data, bool no_subqueries); static FutureSet makeSet(const ASTFunction & node, Data & data, bool no_subqueries);
static ASTs doUntuple(const ASTFunction * function, ActionsMatcher::Data & data); static ASTs doUntuple(const ASTFunction * function, ActionsMatcher::Data & data);
static std::optional<NameAndTypePair> getNameAndTypeFromAST(const ASTPtr & ast, Data & data); static std::optional<NameAndTypePair> getNameAndTypeFromAST(const ASTPtr & ast, Data & data);
}; };

View File

@ -43,6 +43,7 @@
#include <Interpreters/ExternalLoaderXMLConfigRepository.h> #include <Interpreters/ExternalLoaderXMLConfigRepository.h>
#include <Interpreters/TemporaryDataOnDisk.h> #include <Interpreters/TemporaryDataOnDisk.h>
#include <Interpreters/Cache/QueryCache.h> #include <Interpreters/Cache/QueryCache.h>
#include <Interpreters/PreparedSets.h>
#include <Core/Settings.h> #include <Core/Settings.h>
#include <Core/SettingsQuirks.h> #include <Core/SettingsQuirks.h>
#include <Access/AccessControl.h> #include <Access/AccessControl.h>
@ -4342,6 +4343,16 @@ bool Context::canUseParallelReplicasOnFollower() const
&& getClientInfo().collaborate_with_initiator; && getClientInfo().collaborate_with_initiator;
} }
void Context::setPreparedSetsCache(const PreparedSetsCachePtr & cache)
{
prepared_sets_cache = cache;
}
PreparedSetsCachePtr Context::getPreparedSetsCache() const
{
return prepared_sets_cache;
}
UInt64 Context::getClientProtocolVersion() const UInt64 Context::getClientProtocolVersion() const
{ {
return client_protocol_version; return client_protocol_version;

View File

@ -193,6 +193,9 @@ class MergeTreeMetadataCache;
using MergeTreeMetadataCachePtr = std::shared_ptr<MergeTreeMetadataCache>; using MergeTreeMetadataCachePtr = std::shared_ptr<MergeTreeMetadataCache>;
#endif #endif
class PreparedSetsCache;
using PreparedSetsCachePtr = std::shared_ptr<PreparedSetsCache>;
/// An empty interface for an arbitrary object that may be attached by a shared pointer /// An empty interface for an arbitrary object that may be attached by a shared pointer
/// to query context, when using ClickHouse as a library. /// to query context, when using ClickHouse as a library.
struct IHostContext struct IHostContext
@ -399,6 +402,10 @@ private:
/// Temporary data for query execution accounting. /// Temporary data for query execution accounting.
TemporaryDataOnDiskScopePtr temp_data_on_disk; TemporaryDataOnDiskScopePtr temp_data_on_disk;
/// Prepared sets that can be shared between different queries. One use case is when is to share prepared sets between
/// mutation tasks of one mutation executed against different parts of the same table.
PreparedSetsCachePtr prepared_sets_cache;
public: public:
/// Some counters for current query execution. /// Some counters for current query execution.
/// Most of them are workarounds and should be removed in the future. /// Most of them are workarounds and should be removed in the future.
@ -1128,6 +1135,9 @@ public:
ParallelReplicasMode getParallelReplicasMode() const; ParallelReplicasMode getParallelReplicasMode() const;
void setPreparedSetsCache(const PreparedSetsCachePtr & cache);
PreparedSetsCachePtr getPreparedSetsCache() const;
private: private:
std::unique_lock<std::recursive_mutex> getLock() const; std::unique_lock<std::recursive_mutex> getLock() const;

View File

@ -56,6 +56,7 @@
#include <Core/Names.h> #include <Core/Names.h>
#include <Core/NamesAndTypes.h> #include <Core/NamesAndTypes.h>
#include <Common/logger_useful.h> #include <Common/logger_useful.h>
#include <QueryPipeline/SizeLimits.h>
#include <DataTypes/DataTypesNumber.h> #include <DataTypes/DataTypesNumber.h>
@ -68,6 +69,7 @@
#include <Interpreters/interpretSubquery.h> #include <Interpreters/interpretSubquery.h>
#include <Interpreters/JoinUtils.h> #include <Interpreters/JoinUtils.h>
#include <Interpreters/misc.h> #include <Interpreters/misc.h>
#include <Interpreters/PreparedSets.h>
#include <IO/Operators.h> #include <IO/Operators.h>
#include <IO/WriteBufferFromString.h> #include <IO/WriteBufferFromString.h>
@ -147,6 +149,11 @@ ExpressionAnalyzerData::~ExpressionAnalyzerData() = default;
ExpressionAnalyzer::ExtractedSettings::ExtractedSettings(const Settings & settings_) ExpressionAnalyzer::ExtractedSettings::ExtractedSettings(const Settings & settings_)
: use_index_for_in_with_subqueries(settings_.use_index_for_in_with_subqueries) : use_index_for_in_with_subqueries(settings_.use_index_for_in_with_subqueries)
, size_limits_for_set(settings_.max_rows_in_set, settings_.max_bytes_in_set, settings_.set_overflow_mode) , size_limits_for_set(settings_.max_rows_in_set, settings_.max_bytes_in_set, settings_.set_overflow_mode)
, size_limits_for_set_used_with_index(
(settings_.use_index_for_in_with_subqueries_max_values &&
settings_.use_index_for_in_with_subqueries_max_values < settings_.max_rows_in_set) ?
size_limits_for_set :
SizeLimits(settings_.use_index_for_in_with_subqueries_max_values, settings_.max_bytes_in_set, OverflowMode::BREAK))
, distributed_group_by_no_merge(settings_.distributed_group_by_no_merge) , distributed_group_by_no_merge(settings_.distributed_group_by_no_merge)
{} {}
@ -450,7 +457,7 @@ void ExpressionAnalyzer::tryMakeSetForIndexFromSubquery(const ASTPtr & subquery_
auto set_key = PreparedSetKey::forSubquery(*subquery_or_table_name); auto set_key = PreparedSetKey::forSubquery(*subquery_or_table_name);
if (prepared_sets->get(set_key)) if (prepared_sets->getFuture(set_key).isValid())
return; /// Already prepared. return; /// Already prepared.
if (auto set_ptr_from_storage_set = isPlainStorageSetInSubquery(subquery_or_table_name)) if (auto set_ptr_from_storage_set = isPlainStorageSetInSubquery(subquery_or_table_name))
@ -459,11 +466,15 @@ void ExpressionAnalyzer::tryMakeSetForIndexFromSubquery(const ASTPtr & subquery_
return; return;
} }
auto build_set = [&] () -> SetPtr
{
LOG_TRACE(getLogger(), "Building set, key: {}", set_key.toString());
auto interpreter_subquery = interpretSubquery(subquery_or_table_name, getContext(), {}, query_options); auto interpreter_subquery = interpretSubquery(subquery_or_table_name, getContext(), {}, query_options);
auto io = interpreter_subquery->execute(); auto io = interpreter_subquery->execute();
PullingAsyncPipelineExecutor executor(io.pipeline); PullingAsyncPipelineExecutor executor(io.pipeline);
SetPtr set = std::make_shared<Set>(settings.size_limits_for_set, true, getContext()->getSettingsRef().transform_null_in); SetPtr set = std::make_shared<Set>(settings.size_limits_for_set_used_with_index, true, getContext()->getSettingsRef().transform_null_in);
set->setHeader(executor.getHeader().getColumnsWithTypeAndName()); set->setHeader(executor.getHeader().getColumnsWithTypeAndName());
Block block; Block block;
@ -474,11 +485,39 @@ void ExpressionAnalyzer::tryMakeSetForIndexFromSubquery(const ASTPtr & subquery_
/// If the limits have been exceeded, give up and let the default subquery processing actions take place. /// If the limits have been exceeded, give up and let the default subquery processing actions take place.
if (!set->insertFromBlock(block.getColumnsWithTypeAndName())) if (!set->insertFromBlock(block.getColumnsWithTypeAndName()))
return; return nullptr;
} }
set->finishInsert(); set->finishInsert();
return set;
};
SetPtr set;
auto set_cache = getContext()->getPreparedSetsCache();
if (set_cache)
{
auto from_cache = set_cache->findOrPromiseToBuild(set_key.toString());
if (from_cache.index() == 0)
{
set = build_set();
std::get<0>(from_cache).set_value(set);
}
else
{
LOG_TRACE(getLogger(), "Waiting for set, key: {}", set_key.toString());
set = std::get<1>(from_cache).get();
}
}
else
{
set = build_set();
}
if (!set)
return;
prepared_sets->set(set_key, std::move(set)); prepared_sets->set(set_key, std::move(set));
} }

View File

@ -95,6 +95,7 @@ private:
{ {
const bool use_index_for_in_with_subqueries; const bool use_index_for_in_with_subqueries;
const SizeLimits size_limits_for_set; const SizeLimits size_limits_for_set;
const SizeLimits size_limits_for_set_used_with_index;
const UInt64 distributed_group_by_no_merge; const UInt64 distributed_group_by_no_merge;
explicit ExtractedSettings(const Settings & settings_); explicit ExtractedSettings(const Settings & settings_);

View File

@ -1,7 +1,10 @@
#include <chrono>
#include <variant>
#include <Interpreters/PreparedSets.h> #include <Interpreters/PreparedSets.h>
#include <Processors/QueryPlan/QueryPlan.h> #include <Processors/QueryPlan/QueryPlan.h>
#include <Interpreters/InterpreterSelectWithUnionQuery.h> #include <Interpreters/InterpreterSelectWithUnionQuery.h>
#include <Interpreters/Set.h> #include <Interpreters/Set.h>
#include <IO/Operators.h>
namespace DB namespace DB
{ {
@ -43,6 +46,26 @@ bool PreparedSetKey::operator==(const PreparedSetKey & other) const
return true; return true;
} }
String PreparedSetKey::toString() const
{
WriteBufferFromOwnString buf;
buf << "__set_" << ast_hash.first << "_" << ast_hash.second;
if (!types.empty())
{
buf << "(";
bool first = true;
for (const auto & type : types)
{
if (!first)
buf << ",";
first = false;
buf << type->getName();
}
buf << ")";
}
return buf.str();
}
SubqueryForSet & PreparedSets::createOrGetSubquery(const String & subquery_id, const PreparedSetKey & key, SubqueryForSet & PreparedSets::createOrGetSubquery(const String & subquery_id, const PreparedSetKey & key,
SizeLimits set_size_limit, bool transform_null_in) SizeLimits set_size_limit, bool transform_null_in)
{ {
@ -51,10 +74,20 @@ SubqueryForSet & PreparedSets::createOrGetSubquery(const String & subquery_id, c
/// If you already created a Set with the same subquery / table for another ast /// If you already created a Set with the same subquery / table for another ast
/// In that case several PreparedSetKey would share same subquery and set /// In that case several PreparedSetKey would share same subquery and set
/// Not sure if it's really possible case (maybe for distributed query when set was filled by external table?) /// Not sure if it's really possible case (maybe for distributed query when set was filled by external table?)
if (subquery.set) if (subquery.set.isValid())
sets[key] = subquery.set; sets[key] = subquery.set;
else else
sets[key] = subquery.set = std::make_shared<Set>(set_size_limit, false, transform_null_in); {
subquery.set_in_progress = std::make_shared<Set>(set_size_limit, false, transform_null_in);
sets[key] = FutureSet(subquery.promise_to_fill_set.get_future());
}
if (!subquery.set_in_progress)
{
subquery.key = key.toString();
subquery.set_in_progress = std::make_shared<Set>(set_size_limit, false, transform_null_in);
}
return subquery; return subquery;
} }
@ -62,13 +95,27 @@ SubqueryForSet & PreparedSets::createOrGetSubquery(const String & subquery_id, c
/// It's aimed to fill external table passed to SubqueryForSet::createSource. /// It's aimed to fill external table passed to SubqueryForSet::createSource.
SubqueryForSet & PreparedSets::getSubquery(const String & subquery_id) { return subqueries[subquery_id]; } SubqueryForSet & PreparedSets::getSubquery(const String & subquery_id) { return subqueries[subquery_id]; }
void PreparedSets::set(const PreparedSetKey & key, SetPtr set_) { sets[key] = set_; } void PreparedSets::set(const PreparedSetKey & key, SetPtr set_) { sets[key] = FutureSet(set_); }
SetPtr & PreparedSets::get(const PreparedSetKey & key) { return sets[key]; } FutureSet PreparedSets::getFuture(const PreparedSetKey & key) const
std::vector<SetPtr> PreparedSets::getByTreeHash(IAST::Hash ast_hash)
{ {
std::vector<SetPtr> res; auto it = sets.find(key);
if (it == sets.end())
return {};
return it->second;
}
SetPtr PreparedSets::get(const PreparedSetKey & key) const
{
auto it = sets.find(key);
if (it == sets.end() || !it->second.isReady())
return nullptr;
return it->second.get();
}
std::vector<FutureSet> PreparedSets::getByTreeHash(IAST::Hash ast_hash) const
{
std::vector<FutureSet> res;
for (const auto & it : this->sets) for (const auto & it : this->sets)
{ {
if (it.first.ast_hash == ast_hash) if (it.first.ast_hash == ast_hash)
@ -106,4 +153,45 @@ QueryPlanPtr SubqueryForSet::detachSource()
return res; return res;
} }
FutureSet::FutureSet(SetPtr set)
{
std::promise<SetPtr> promise;
promise.set_value(set);
*this = FutureSet(promise.get_future());
}
bool FutureSet::isReady() const
{
return future_set.valid() &&
future_set.wait_for(std::chrono::seconds(0)) == std::future_status::ready;
}
bool FutureSet::isCreated() const
{
return isReady() && get() != nullptr && get()->isCreated();
}
std::variant<std::promise<SetPtr>, SharedSet> PreparedSetsCache::findOrPromiseToBuild(const String & key)
{
std::lock_guard lock(cache_mutex);
auto it = cache.find(key);
if (it != cache.end())
{
/// If the set is being built, return its future, but if it's ready and is nullptr then we should retry building it.
if (it->second.future.valid() &&
(it->second.future.wait_for(std::chrono::seconds(0)) != std::future_status::ready || it->second.future.get() != nullptr))
return it->second.future;
}
/// Insert the entry into the cache so that other threads can find it and start waiting for the set.
std::promise<SetPtr> promise_to_fill_set;
Entry & entry = cache[key];
entry.future = promise_to_fill_set.get_future();
return promise_to_fill_set;
}
}; };

View File

@ -2,6 +2,7 @@
#include <Parsers/IAST.h> #include <Parsers/IAST.h>
#include <DataTypes/IDataType.h> #include <DataTypes/IDataType.h>
#include <future>
#include <memory> #include <memory>
#include <unordered_map> #include <unordered_map>
#include <vector> #include <vector>
@ -19,6 +20,37 @@ class Set;
using SetPtr = std::shared_ptr<Set>; using SetPtr = std::shared_ptr<Set>;
class InterpreterSelectWithUnionQuery; class InterpreterSelectWithUnionQuery;
/// Represents a set in a query that might be referenced at analysis time and built later during execution.
/// Also it can represent a constant set that is ready to use.
/// At analysis stage the FutureSets are created but not necessarily filled. Then for non-constant sets there
/// must be an explicit step to build them before they can be used.
/// FutureSet objects can be stored in PreparedSets and are not intended to be used from multiple threads.
class FutureSet final
{
public:
FutureSet() = default;
/// Create FutureSet from an object that will be created in the future.
explicit FutureSet(const std::shared_future<SetPtr> & future_set_) : future_set(future_set_) {}
/// Create FutureSet from a ready set.
explicit FutureSet(SetPtr readySet);
/// The set object will be ready in the future, as opposed to 'null' object when FutureSet is default constructed.
bool isValid() const { return future_set.valid(); }
/// The the value of SetPtr is ready, but the set object might not have been filled yet.
bool isReady() const;
/// The set object is ready and filled.
bool isCreated() const;
SetPtr get() const { chassert(isReady()); return future_set.get(); }
private:
std::shared_future<SetPtr> future_set;
};
/// Information on how to build set for the [GLOBAL] IN section. /// Information on how to build set for the [GLOBAL] IN section.
class SubqueryForSet class SubqueryForSet
{ {
@ -33,7 +65,12 @@ public:
std::unique_ptr<QueryPlan> detachSource(); std::unique_ptr<QueryPlan> detachSource();
/// Build this set from the result of the subquery. /// Build this set from the result of the subquery.
SetPtr set; String key;
SetPtr set_in_progress;
/// After set_in_progress is finished it will be put into promise_to_fill_set and thus all FutureSet's
/// that are referencing this set will be filled.
std::promise<SetPtr> promise_to_fill_set;
FutureSet set = FutureSet{promise_to_fill_set.get_future()};
/// If set, put the result into the table. /// If set, put the result into the table.
/// This is a temporary table for transferring to remote servers for distributed query processing. /// This is a temporary table for transferring to remote servers for distributed query processing.
@ -59,6 +96,8 @@ struct PreparedSetKey
bool operator==(const PreparedSetKey & other) const; bool operator==(const PreparedSetKey & other) const;
String toString() const;
struct Hash struct Hash
{ {
UInt64 operator()(const PreparedSetKey & key) const { return key.ast_hash.first; } UInt64 operator()(const PreparedSetKey & key) const { return key.ast_hash.first; }
@ -75,7 +114,8 @@ public:
SubqueryForSet & getSubquery(const String & subquery_id); SubqueryForSet & getSubquery(const String & subquery_id);
void set(const PreparedSetKey & key, SetPtr set_); void set(const PreparedSetKey & key, SetPtr set_);
SetPtr & get(const PreparedSetKey & key); FutureSet getFuture(const PreparedSetKey & key) const;
SetPtr get(const PreparedSetKey & key) const;
/// Get subqueries and clear them. /// Get subqueries and clear them.
/// We need to build a plan for subqueries just once. That's why we can clear them after accessing them. /// We need to build a plan for subqueries just once. That's why we can clear them after accessing them.
@ -84,12 +124,12 @@ public:
/// Returns all sets that match the given ast hash not checking types /// Returns all sets that match the given ast hash not checking types
/// Used in KeyCondition and MergeTreeIndexConditionBloomFilter to make non exact match for types in PreparedSetKey /// Used in KeyCondition and MergeTreeIndexConditionBloomFilter to make non exact match for types in PreparedSetKey
std::vector<SetPtr> getByTreeHash(IAST::Hash ast_hash); std::vector<FutureSet> getByTreeHash(IAST::Hash ast_hash) const;
bool empty() const; bool empty() const;
private: private:
std::unordered_map<PreparedSetKey, SetPtr, PreparedSetKey::Hash> sets; std::unordered_map<PreparedSetKey, FutureSet, PreparedSetKey::Hash> sets;
/// This is the information required for building sets /// This is the information required for building sets
SubqueriesForSets subqueries; SubqueriesForSets subqueries;
@ -97,4 +137,31 @@ private:
using PreparedSetsPtr = std::shared_ptr<PreparedSets>; using PreparedSetsPtr = std::shared_ptr<PreparedSets>;
/// A reference to a set that is being built by another task.
/// The difference from FutureSet is that this object can be used to wait for the set to be built in another thread.
using SharedSet = std::shared_future<SetPtr>;
/// This set cache is used to avoid building the same set multiple times. It is different from PreparedSets in way that
/// it can be used across multiple queries. One use case is when we execute the same mutation on multiple parts. In this
/// case each part is processed by a separate mutation task but they can share the same set.
class PreparedSetsCache
{
public:
/// Lookup for set in the cache.
/// If it is found, get the future to be able to wait for the set to be built.
/// Otherwise create a promise, build the set and set the promise value.
std::variant<std::promise<SetPtr>, SharedSet> findOrPromiseToBuild(const String & key);
private:
struct Entry
{
SharedSet future; /// Other tasks can wait for the set to be built.
};
std::mutex cache_mutex;
std::unordered_map<String, Entry> cache;
};
using PreparedSetsCachePtr = std::shared_ptr<PreparedSetsCache>;
} }

View File

@ -236,6 +236,11 @@ bool Set::insertFromBlock(const Columns & columns)
return limits.check(data.getTotalRowCount(), data.getTotalByteCount(), "IN-set", ErrorCodes::SET_SIZE_LIMIT_EXCEEDED); return limits.check(data.getTotalRowCount(), data.getTotalByteCount(), "IN-set", ErrorCodes::SET_SIZE_LIMIT_EXCEEDED);
} }
void Set::checkIsCreated() const
{
if (!is_created.load())
throw Exception(ErrorCodes::LOGICAL_ERROR, "Logical error: Trying to use set before it has been built.");
}
ColumnPtr Set::execute(const ColumnsWithTypeAndName & columns, bool negative) const ColumnPtr Set::execute(const ColumnsWithTypeAndName & columns, bool negative) const
{ {

View File

@ -54,6 +54,8 @@ public:
/// finishInsert and isCreated are thread-safe /// finishInsert and isCreated are thread-safe
bool isCreated() const { return is_created.load(); } bool isCreated() const { return is_created.load(); }
void checkIsCreated() const;
/** For columns of 'block', check belonging of corresponding rows to the set. /** For columns of 'block', check belonging of corresponding rows to the set.
* Return UInt8 column with the result. * Return UInt8 column with the result.
*/ */
@ -67,7 +69,7 @@ public:
const DataTypes & getElementsTypes() const { return set_elements_types; } const DataTypes & getElementsTypes() const { return set_elements_types; }
bool hasExplicitSetElements() const { return fill_set_elements; } bool hasExplicitSetElements() const { return fill_set_elements; }
Columns getSetElements() const { return { set_elements.begin(), set_elements.end() }; } Columns getSetElements() const { checkIsCreated(); return { set_elements.begin(), set_elements.end() }; }
void checkColumnsNumber(size_t num_key_columns) const; void checkColumnsNumber(size_t num_key_columns) const;
bool areTypesEqual(size_t set_type_idx, const DataTypePtr & other_type) const; bool areTypesEqual(size_t set_type_idx, const DataTypePtr & other_type) const;

View File

@ -1,6 +1,7 @@
#include <Planner/CollectSets.h> #include <Planner/CollectSets.h>
#include <Interpreters/Context.h> #include <Interpreters/Context.h>
#include <Interpreters/PreparedSets.h>
#include <Storages/StorageSet.h> #include <Storages/StorageSet.h>
@ -52,7 +53,8 @@ public:
if (storage_set) if (storage_set)
{ {
planner_context.registerSet(set_key, PlannerSet(storage_set->getSet())); /// Handle storage_set as ready set.
planner_context.registerSet(set_key, PlannerSet(FutureSet(storage_set->getSet())));
} }
else if (const auto * constant_node = in_second_argument->as<ConstantNode>()) else if (const auto * constant_node = in_second_argument->as<ConstantNode>())
{ {
@ -62,16 +64,12 @@ public:
constant_node->getResultType(), constant_node->getResultType(),
settings); settings);
planner_context.registerSet(set_key, PlannerSet(std::move(set))); planner_context.registerSet(set_key, PlannerSet(FutureSet(std::move(set))));
} }
else if (in_second_argument_node_type == QueryTreeNodeType::QUERY || else if (in_second_argument_node_type == QueryTreeNodeType::QUERY ||
in_second_argument_node_type == QueryTreeNodeType::UNION) in_second_argument_node_type == QueryTreeNodeType::UNION)
{ {
SizeLimits size_limits_for_set = {settings.max_rows_in_set, settings.max_bytes_in_set, settings.set_overflow_mode}; planner_context.registerSet(set_key, PlannerSet(in_second_argument));
bool tranform_null_in = settings.transform_null_in;
auto set = std::make_shared<Set>(size_limits_for_set, false /*fill_set_elements*/, tranform_null_in);
planner_context.registerSet(set_key, PlannerSet(std::move(set), in_second_argument));
} }
else else
{ {

View File

@ -890,11 +890,11 @@ void addBuildSubqueriesForSetsStepIfNeeded(QueryPlan & query_plan,
for (const auto & node : actions_to_execute->getNodes()) for (const auto & node : actions_to_execute->getNodes())
{ {
const auto & set_key = node.result_name; const auto & set_key = node.result_name;
const auto * planner_set = planner_context->getSetOrNull(set_key); auto * planner_set = planner_context->getSetOrNull(set_key);
if (!planner_set) if (!planner_set)
continue; continue;
if (planner_set->getSet()->isCreated() || !planner_set->getSubqueryNode()) if (planner_set->getSet().isCreated() || !planner_set->getSubqueryNode())
continue; continue;
auto subquery_options = select_query_options.subquery(); auto subquery_options = select_query_options.subquery();
@ -904,8 +904,16 @@ void addBuildSubqueriesForSetsStepIfNeeded(QueryPlan & query_plan,
planner_context->getGlobalPlannerContext()); planner_context->getGlobalPlannerContext());
subquery_planner.buildQueryPlanIfNeeded(); subquery_planner.buildQueryPlanIfNeeded();
const auto & settings = planner_context->getQueryContext()->getSettingsRef();
SizeLimits size_limits_for_set = {settings.max_rows_in_set, settings.max_bytes_in_set, settings.set_overflow_mode};
bool tranform_null_in = settings.transform_null_in;
auto set = std::make_shared<Set>(size_limits_for_set, false /*fill_set_elements*/, tranform_null_in);
SubqueryForSet subquery_for_set; SubqueryForSet subquery_for_set;
subquery_for_set.key = set_key;
subquery_for_set.set_in_progress = set;
subquery_for_set.set = planner_set->getSet(); subquery_for_set.set = planner_set->getSet();
subquery_for_set.promise_to_fill_set = planner_set->extractPromiseToBuildSet();
subquery_for_set.source = std::make_unique<QueryPlan>(std::move(subquery_planner).extractQueryPlan()); subquery_for_set.source = std::make_unique<QueryPlan>(std::move(subquery_planner).extractQueryPlan());
subqueries_for_sets.emplace(set_key, std::move(subquery_for_set)); subqueries_for_sets.emplace(set_key, std::move(subquery_for_set));

View File

@ -632,7 +632,7 @@ PlannerActionsVisitorImpl::NodeNameAndNodeMinLevel PlannerActionsVisitorImpl::ma
column.name = set_key; column.name = set_key;
column.type = std::make_shared<DataTypeSet>(); column.type = std::make_shared<DataTypeSet>();
bool set_is_created = planner_set.getSet()->isCreated(); bool set_is_created = planner_set.getSet().isCreated();
auto column_set = ColumnSet::create(1, planner_set.getSet()); auto column_set = ColumnSet::create(1, planner_set.getSet());
if (set_is_created) if (set_is_created)

View File

@ -128,7 +128,7 @@ PlannerContext::SetKey PlannerContext::createSetKey(const QueryTreeNodePtr & set
void PlannerContext::registerSet(const SetKey & key, PlannerSet planner_set) void PlannerContext::registerSet(const SetKey & key, PlannerSet planner_set)
{ {
if (!planner_set.getSet()) if (!planner_set.getSet().isValid())
throw Exception(ErrorCodes::LOGICAL_ERROR, "Set must be initialized"); throw Exception(ErrorCodes::LOGICAL_ERROR, "Set must be initialized");
const auto & subquery_node = planner_set.getSubqueryNode(); const auto & subquery_node = planner_set.getSubqueryNode();
@ -162,7 +162,7 @@ const PlannerSet & PlannerContext::getSetOrThrow(const SetKey & key) const
return it->second; return it->second;
} }
const PlannerSet * PlannerContext::getSetOrNull(const SetKey & key) const PlannerSet * PlannerContext::getSetOrNull(const SetKey & key)
{ {
auto it = set_key_to_set.find(key); auto it = set_key_to_set.find(key);
if (it == set_key_to_set.end()) if (it == set_key_to_set.end())

View File

@ -7,6 +7,7 @@
#include <Interpreters/Context_fwd.h> #include <Interpreters/Context_fwd.h>
#include <Interpreters/Set.h> #include <Interpreters/Set.h>
#include <Interpreters/PreparedSets.h>
#include <Analyzer/IQueryTreeNode.h> #include <Analyzer/IQueryTreeNode.h>
@ -56,18 +57,18 @@ class PlannerSet
{ {
public: public:
/// Construct planner set that is ready for execution /// Construct planner set that is ready for execution
explicit PlannerSet(SetPtr set_) explicit PlannerSet(FutureSet set_)
: set(std::move(set_)) : set(std::move(set_))
{} {}
/// Construct planner set with set and subquery node /// Construct planner set with set and subquery node
explicit PlannerSet(SetPtr set_, QueryTreeNodePtr subquery_node_) explicit PlannerSet(QueryTreeNodePtr subquery_node_)
: set(std::move(set_)) : set(promise_to_build_set.get_future())
, subquery_node(std::move(subquery_node_)) , subquery_node(std::move(subquery_node_))
{} {}
/// Get set /// Get a reference to a set that might be not built yet
const SetPtr & getSet() const const FutureSet & getSet() const
{ {
return set; return set;
} }
@ -78,8 +79,15 @@ public:
return subquery_node; return subquery_node;
} }
/// This promise will be fulfilled when set is built and all FutureSet objects will become ready
std::promise<SetPtr> extractPromiseToBuildSet()
{
return std::move(promise_to_build_set);
}
private: private:
SetPtr set; std::promise<SetPtr> promise_to_build_set;
FutureSet set;
QueryTreeNodePtr subquery_node; QueryTreeNodePtr subquery_node;
}; };
@ -186,7 +194,7 @@ public:
const PlannerSet & getSetOrThrow(const SetKey & key) const; const PlannerSet & getSetOrThrow(const SetKey & key) const;
/// Get set for key, if no set is registered null is returned /// Get set for key, if no set is registered null is returned
const PlannerSet * getSetOrNull(const SetKey & key) const; PlannerSet * getSetOrNull(const SetKey & key);
/// Get registered sets /// Get registered sets
const SetKeyToSet & getRegisteredSets() const const SetKeyToSet & getRegisteredSets() const

View File

@ -60,7 +60,7 @@ void CreatingSetStep::describeActions(FormatSettings & settings) const
String prefix(settings.offset, ' '); String prefix(settings.offset, ' ');
settings.out << prefix; settings.out << prefix;
if (subquery_for_set.set) if (subquery_for_set.set_in_progress)
settings.out << "Set: "; settings.out << "Set: ";
settings.out << description << '\n'; settings.out << description << '\n';
@ -68,7 +68,7 @@ void CreatingSetStep::describeActions(FormatSettings & settings) const
void CreatingSetStep::describeActions(JSONBuilder::JSONMap & map) const void CreatingSetStep::describeActions(JSONBuilder::JSONMap & map) const
{ {
if (subquery_for_set.set) if (subquery_for_set.set_in_progress)
map.add("Set", description); map.add("Set", description);
} }

View File

@ -4,6 +4,7 @@
#include <Interpreters/Set.h> #include <Interpreters/Set.h>
#include <Interpreters/IJoin.h> #include <Interpreters/IJoin.h>
#include <Interpreters/Context.h>
#include <Storages/IStorage.h> #include <Storages/IStorage.h>
#include <Common/logger_useful.h> #include <Common/logger_useful.h>
@ -39,13 +40,52 @@ void CreatingSetsTransform::work()
if (!is_initialized) if (!is_initialized)
init(); init();
if (done_with_set && done_with_table)
{
finishConsume();
input.close();
}
IAccumulatingTransform::work(); IAccumulatingTransform::work();
} }
void CreatingSetsTransform::startSubquery() void CreatingSetsTransform::startSubquery()
{ {
if (subquery.set) /// Lookup the set in the cache if we don't need to build table.
LOG_TRACE(log, "Creating set."); auto ctx = context.lock();
if (ctx && ctx->getPreparedSetsCache() && !subquery.table)
{
/// Try to find the set in the cache and wait for it to be built.
/// Retry if the set from cache fails to be built.
while (true)
{
auto from_cache = ctx->getPreparedSetsCache()->findOrPromiseToBuild(subquery.key);
if (from_cache.index() == 0)
{
promise_to_build = std::move(std::get<0>(from_cache));
}
else
{
LOG_TRACE(log, "Waiting for set to be build by another thread, key: {}", subquery.key);
SharedSet set_built_by_another_thread = std::move(std::get<1>(from_cache));
const SetPtr & ready_set = set_built_by_another_thread.get();
if (!ready_set)
{
LOG_TRACE(log, "Failed to use set from cache, key: {}", subquery.key);
continue;
}
subquery.promise_to_fill_set.set_value(ready_set);
subquery.set_in_progress.reset();
done_with_set = true;
set_from_cache = true;
}
break;
}
}
if (subquery.set_in_progress)
LOG_TRACE(log, "Creating set, key: {}", subquery.key);
if (subquery.table) if (subquery.table)
LOG_TRACE(log, "Filling temporary table."); LOG_TRACE(log, "Filling temporary table.");
@ -53,10 +93,10 @@ void CreatingSetsTransform::startSubquery()
/// TODO: make via port /// TODO: make via port
table_out = QueryPipeline(subquery.table->write({}, subquery.table->getInMemoryMetadataPtr(), getContext())); table_out = QueryPipeline(subquery.table->write({}, subquery.table->getInMemoryMetadataPtr(), getContext()));
done_with_set = !subquery.set; done_with_set = !subquery.set_in_progress;
done_with_table = !subquery.table; done_with_table = !subquery.table;
if (done_with_set /*&& done_with_join*/ && done_with_table) if ((done_with_set && !set_from_cache) /*&& done_with_join*/ && done_with_table)
throw Exception(ErrorCodes::LOGICAL_ERROR, "Logical error: nothing to do with subquery"); throw Exception(ErrorCodes::LOGICAL_ERROR, "Logical error: nothing to do with subquery");
if (table_out.initialized()) if (table_out.initialized())
@ -68,12 +108,16 @@ void CreatingSetsTransform::startSubquery()
void CreatingSetsTransform::finishSubquery() void CreatingSetsTransform::finishSubquery()
{ {
if (read_rows != 0)
{
auto seconds = watch.elapsedNanoseconds() / 1e9; auto seconds = watch.elapsedNanoseconds() / 1e9;
if (subquery.set) if (set_from_cache)
LOG_DEBUG(log, "Created Set with {} entries from {} rows in {} sec.", subquery.set->getTotalRowCount(), read_rows, seconds); {
LOG_DEBUG(log, "Got set from cache in {} sec.", seconds);
}
else if (read_rows != 0)
{
if (subquery.set_in_progress)
LOG_DEBUG(log, "Created Set with {} entries from {} rows in {} sec.", subquery.set_in_progress->getTotalRowCount(), read_rows, seconds);
if (subquery.table) if (subquery.table)
LOG_DEBUG(log, "Created Table with {} rows in {} sec.", read_rows, seconds); LOG_DEBUG(log, "Created Table with {} rows in {} sec.", read_rows, seconds);
} }
@ -87,8 +131,10 @@ void CreatingSetsTransform::init()
{ {
is_initialized = true; is_initialized = true;
if (subquery.set) if (subquery.set_in_progress)
subquery.set->setHeader(getInputPort().getHeader().getColumnsWithTypeAndName()); {
subquery.set_in_progress->setHeader(getInputPort().getHeader().getColumnsWithTypeAndName());
}
watch.restart(); watch.restart();
startSubquery(); startSubquery();
@ -101,7 +147,7 @@ void CreatingSetsTransform::consume(Chunk chunk)
if (!done_with_set) if (!done_with_set)
{ {
if (!subquery.set->insertFromBlock(block.getColumnsWithTypeAndName())) if (!subquery.set_in_progress->insertFromBlock(block.getColumnsWithTypeAndName()))
done_with_set = true; done_with_set = true;
} }
@ -124,8 +170,13 @@ void CreatingSetsTransform::consume(Chunk chunk)
Chunk CreatingSetsTransform::generate() Chunk CreatingSetsTransform::generate()
{ {
if (subquery.set) if (subquery.set_in_progress)
subquery.set->finishInsert(); {
subquery.set_in_progress->finishInsert();
subquery.promise_to_fill_set.set_value(subquery.set_in_progress);
if (promise_to_build)
promise_to_build->set_value(subquery.set_in_progress);
}
if (table_out.initialized()) if (table_out.initialized())
{ {

View File

@ -43,10 +43,12 @@ public:
private: private:
SubqueryForSet subquery; SubqueryForSet subquery;
std::optional<std::promise<SetPtr>> promise_to_build;
QueryPipeline table_out; QueryPipeline table_out;
std::unique_ptr<PushingPipelineExecutor> executor; std::unique_ptr<PushingPipelineExecutor> executor;
UInt64 read_rows = 0; UInt64 read_rows = 0;
bool set_from_cache = false;
Stopwatch watch; Stopwatch watch;
bool done_with_set = true; bool done_with_set = true;

View File

@ -52,6 +52,14 @@ void MutatePlainMergeTreeTask::prepare()
std::move(profile_counters_snapshot)); std::move(profile_counters_snapshot));
}; };
if (task_context->getSettingsRef().enable_sharing_sets_for_mutations)
{
/// If we have a prepared sets cache for this mutations, we will use it.
auto mutation_id = future_part->part_info.mutation;
auto prepared_sets_cache_for_mutation = storage.getPreparedSetsCache(mutation_id);
task_context->setPreparedSetsCache(prepared_sets_cache_for_mutation);
}
mutate_task = storage.merger_mutator.mutatePartToTemporaryPart( mutate_task = storage.merger_mutator.mutatePartToTemporaryPart(
future_part, metadata_snapshot, merge_mutate_entry->commands, merge_list_entry.get(), future_part, metadata_snapshot, merge_mutate_entry->commands, merge_list_entry.get(),
time(nullptr), task_context, merge_mutate_entry->txn, merge_mutate_entry->tagger->reserved_space, table_lock_holder); time(nullptr), task_context, merge_mutate_entry->txn, merge_mutate_entry->tagger->reserved_space, table_lock_holder);

View File

@ -1724,6 +1724,8 @@ bool MutateTask::prepare()
/// Allow mutations to work when force_index_by_date or force_primary_key is on. /// Allow mutations to work when force_index_by_date or force_primary_key is on.
context_for_reading->setSetting("force_index_by_date", false); context_for_reading->setSetting("force_index_by_date", false);
context_for_reading->setSetting("force_primary_key", false); context_for_reading->setSetting("force_primary_key", false);
/// Skip using large sets in KeyCondition
context_for_reading->setSetting("use_index_for_in_with_subqueries_max_values", 100000);
for (const auto & command : *ctx->commands) for (const auto & command : *ctx->commands)
if (!canSkipMutationCommandForPart(ctx->source_part, command, context_for_reading)) if (!canSkipMutationCommandForPart(ctx->source_part, command, context_for_reading))

View File

@ -288,7 +288,7 @@ ConstSetPtr tryGetSetFromDAGNode(const ActionsDAG::Node * dag_node)
{ {
auto set = column_set->getData(); auto set = column_set->getData();
if (set->isCreated()) if (set && set->isCreated())
return set; return set;
} }
@ -305,8 +305,8 @@ ConstSetPtr RPNBuilderTreeNode::tryGetPreparedSet() const
{ {
auto prepared_sets_with_same_hash = prepared_sets->getByTreeHash(ast_node->getTreeHash()); auto prepared_sets_with_same_hash = prepared_sets->getByTreeHash(ast_node->getTreeHash());
for (auto & set : prepared_sets_with_same_hash) for (auto & set : prepared_sets_with_same_hash)
if (set->isCreated()) if (set.isCreated())
return set; return set.get();
} }
else if (dag_node) else if (dag_node)
{ {
@ -368,8 +368,8 @@ ConstSetPtr RPNBuilderTreeNode::tryGetPreparedSet(
auto tree_hash = ast_node->getTreeHash(); auto tree_hash = ast_node->getTreeHash();
for (const auto & set : prepared_sets->getByTreeHash(tree_hash)) for (const auto & set : prepared_sets->getByTreeHash(tree_hash))
{ {
if (types_match(set)) if (set.isCreated() && types_match(set.get()))
return set; return set.get();
} }
} }
else else

View File

@ -2181,6 +2181,35 @@ std::unique_ptr<MergeTreeSettings> StorageMergeTree::getDefaultSettings() const
return std::make_unique<MergeTreeSettings>(getContext()->getMergeTreeSettings()); return std::make_unique<MergeTreeSettings>(getContext()->getMergeTreeSettings());
} }
PreparedSetsCachePtr StorageMergeTree::getPreparedSetsCache(Int64 mutation_id)
{
auto l = std::lock_guard(mutation_prepared_sets_cache_mutex);
/// Cleanup stale entries where the shared_ptr is expired.
while (!mutation_prepared_sets_cache.empty())
{
auto it = mutation_prepared_sets_cache.begin();
if (it->second.lock())
break;
mutation_prepared_sets_cache.erase(it);
}
/// Look up an existing entry.
auto it = mutation_prepared_sets_cache.find(mutation_id);
if (it != mutation_prepared_sets_cache.end())
{
/// If the entry is still alive, return it.
auto existing_set_cache = it->second.lock();
if (existing_set_cache)
return existing_set_cache;
}
/// Create new entry.
auto cache = std::make_shared<PreparedSetsCache>();
mutation_prepared_sets_cache[mutation_id] = cache;
return cache;
}
void StorageMergeTree::fillNewPartName(MutableDataPartPtr & part, DataPartsLock &) void StorageMergeTree::fillNewPartName(MutableDataPartPtr & part, DataPartsLock &)
{ {
part->info.min_block = part->info.max_block = increment.get(); part->info.min_block = part->info.max_block = increment.get();

View File

@ -151,6 +151,13 @@ private:
std::atomic<bool> shutdown_called {false}; std::atomic<bool> shutdown_called {false};
std::atomic<bool> flush_called {false}; std::atomic<bool> flush_called {false};
/// PreparedSets cache for one executing mutation.
/// NOTE: we only store weak_ptr to PreparedSetsCache, so that the cache is shared between mutation tasks that are executed in parallel.
/// The goal is to avoiding consuming a lot of memory when the same big sets are used by multiple tasks at the same time.
/// If the tasks are executed without time overlap, we will destroy the cache to free memory, and the next task might rebuild the same sets.
std::mutex mutation_prepared_sets_cache_mutex;
std::map<Int64, PreparedSetsCachePtr::weak_type> mutation_prepared_sets_cache;
void loadMutations(); void loadMutations();
/// Load and initialize deduplication logs. Even if deduplication setting /// Load and initialize deduplication logs. Even if deduplication setting
@ -259,6 +266,8 @@ private:
std::unique_ptr<MergeTreeSettings> getDefaultSettings() const override; std::unique_ptr<MergeTreeSettings> getDefaultSettings() const override;
PreparedSetsCachePtr getPreparedSetsCache(Int64 mutation_id);
friend class MergeTreeSink; friend class MergeTreeSink;
friend class MergeTreeData; friend class MergeTreeData;
friend class MergePlainMergeTreeTask; friend class MergePlainMergeTreeTask;

View File

@ -77,12 +77,6 @@ void S3Settings::RequestSettings::PartUploadSettings::updateFromSettingsImpl(con
if (!if_changed || settings.s3_max_single_part_upload_size.changed) if (!if_changed || settings.s3_max_single_part_upload_size.changed)
max_single_part_upload_size = settings.s3_max_single_part_upload_size; max_single_part_upload_size = settings.s3_max_single_part_upload_size;
/// AWS S3 SDK library has a bug. It is using std::*stream (which is a major offense).
/// LLVM libc++ has a bug. It does not allow std::*stream to work for large strings.
if (max_single_part_upload_size >= 2_GiB)
throw Exception(ErrorCodes::INVALID_SETTING_VALUE, "Uploading parts of 2 GiB or larger is not supported due to a bug in AWS and LLVM. Lower the value of `s3_max_single_part_upload_size` setting.");
} }
void S3Settings::RequestSettings::PartUploadSettings::validate() void S3Settings::RequestSettings::PartUploadSettings::validate()

View File

@ -1,2 +1,2 @@
2147483648 1048576 104857600 1 0 0 0 s3_cache/ 0 2147483648 1048576 104857600 1 0 0 0 /var/lib/clickhouse/caches/s3_cache/ 0
2147483648 1048576 104857600 0 0 0 0 s3_cache_2/ 0 2147483648 1048576 104857600 0 0 0 0 /var/lib/clickhouse/caches/s3_cache_2/ 0

View File

@ -0,0 +1,9 @@
40000
all_1_1_0
all_2_2_0
all_3_3_0
all_4_4_0
5000 all_1_1_0_9
5000 all_2_2_0_9
5000 all_3_3_0_9
5000 all_4_4_0_9

View File

@ -0,0 +1,27 @@
-- Tags: long, no-debug, no-tsan, no-asan, no-ubsan, no-msan
DROP TABLE IF EXISTS 02581_trips;
CREATE TABLE 02581_trips(id UInt32, description String, id2 UInt32, PRIMARY KEY id) ENGINE=MergeTree ORDER BY id;
-- Make multiple parts
INSERT INTO 02581_trips SELECT number, '', number FROM numbers(10000);
INSERT INTO 02581_trips SELECT number+10000000, '', number FROM numbers(10000);
INSERT INTO 02581_trips SELECT number+20000000, '', number FROM numbers(10000);
INSERT INTO 02581_trips SELECT number+30000000, '', number FROM numbers(10000);
SELECT count() from 02581_trips WHERE description = '';
SELECT name FROM system.parts WHERE database=currentDatabase() AND table = '02581_trips' AND active ORDER BY name;
-- Start multiple mutations simultaneously
SYSTEM STOP MERGES 02581_trips;
ALTER TABLE 02581_trips UPDATE description='5' WHERE id IN (SELECT (number*10 + 5)::UInt32 FROM numbers(200000000)) SETTINGS mutations_sync=0;
ALTER TABLE 02581_trips UPDATE description='6' WHERE id IN (SELECT (number*10 + 6)::UInt32 FROM numbers(200000000)) SETTINGS mutations_sync=0;
ALTER TABLE 02581_trips DELETE WHERE id IN (SELECT (number*10 + 7)::UInt32 FROM numbers(200000000)) SETTINGS mutations_sync=0;
ALTER TABLE 02581_trips UPDATE description='8' WHERE id IN (SELECT (number*10 + 8)::UInt32 FROM numbers(200000000)) SETTINGS mutations_sync=0;
SYSTEM START MERGES 02581_trips;
DELETE FROM 02581_trips WHERE id IN (SELECT (number*10 + 9)::UInt32 FROM numbers(200000000));
SELECT count(), _part from 02581_trips WHERE description = '' GROUP BY _part ORDER BY _part;
DROP TABLE 02581_trips;

View File

@ -0,0 +1,19 @@
-- { echoOn }
SELECT count(), _part FROM 02581_trips GROUP BY _part ORDER BY _part;
10000 all_1_1_0
10000 all_2_2_0
10000 all_3_3_0
10000 all_4_4_0
-- Run mutation with a 'IN big subquery'
ALTER TABLE 02581_trips UPDATE description='1' WHERE id IN (SELECT (number*10+1)::UInt32 FROM numbers(10000000)) SETTINGS mutations_sync=2;
SELECT count(), _part FROM 02581_trips WHERE description = '' GROUP BY _part ORDER BY _part;
9000 all_1_1_0_5
9000 all_2_2_0_5
9000 all_3_3_0_5
9000 all_4_4_0_5
ALTER TABLE 02581_trips UPDATE description='2' WHERE id IN (SELECT (number*10+2)::UInt32 FROM numbers(10000)) SETTINGS mutations_sync=2;
SELECT count(), _part FROM 02581_trips WHERE description = '' GROUP BY _part ORDER BY _part;
8000 all_1_1_0_6
8000 all_2_2_0_6
8000 all_3_3_0_6
8000 all_4_4_0_6

View File

@ -0,0 +1,21 @@
DROP TABLE IF EXISTS 02581_trips;
CREATE TABLE 02581_trips(id UInt32, description String) ENGINE=MergeTree ORDER BY id;
-- Make multiple parts
INSERT INTO 02581_trips SELECT number, '' FROM numbers(10000);
INSERT INTO 02581_trips SELECT number+10000, '' FROM numbers(10000);
INSERT INTO 02581_trips SELECT number+20000, '' FROM numbers(10000);
INSERT INTO 02581_trips SELECT number+30000, '' FROM numbers(10000);
-- { echoOn }
SELECT count(), _part FROM 02581_trips GROUP BY _part ORDER BY _part;
-- Run mutation with a 'IN big subquery'
ALTER TABLE 02581_trips UPDATE description='1' WHERE id IN (SELECT (number*10+1)::UInt32 FROM numbers(10000000)) SETTINGS mutations_sync=2;
SELECT count(), _part FROM 02581_trips WHERE description = '' GROUP BY _part ORDER BY _part;
ALTER TABLE 02581_trips UPDATE description='2' WHERE id IN (SELECT (number*10+2)::UInt32 FROM numbers(10000)) SETTINGS mutations_sync=2;
SELECT count(), _part FROM 02581_trips WHERE description = '' GROUP BY _part ORDER BY _part;
-- { echoOff }
DROP TABLE 02581_trips;

View File

@ -0,0 +1,12 @@
40000
all_1_1_0
all_2_2_0
all_3_3_0
all_4_4_0
36000
32000
28000
24000
20000
16000
12000

View File

@ -0,0 +1,57 @@
-- Tags: long, no-tsan, no-asan, no-ubsan, no-msan
DROP TABLE IF EXISTS 02581_trips;
CREATE TABLE 02581_trips(id UInt32, description String, id2 UInt32, PRIMARY KEY id) ENGINE=MergeTree ORDER BY id;
-- Make multiple parts
INSERT INTO 02581_trips SELECT number, '', number FROM numbers(10000);
INSERT INTO 02581_trips SELECT number+10000000, '', number FROM numbers(10000);
INSERT INTO 02581_trips SELECT number+20000000, '', number FROM numbers(10000);
INSERT INTO 02581_trips SELECT number+30000000, '', number FROM numbers(10000);
SELECT count() from 02581_trips WHERE description = '';
SELECT name FROM system.parts WHERE database=currentDatabase() AND table = '02581_trips' AND active ORDER BY name;
-- Run mutation with `id` a 'IN big subquery'
ALTER TABLE 02581_trips UPDATE description='a' WHERE id IN (SELECT (number*10)::UInt32 FROM numbers(200000000)) SETTINGS mutations_sync=2;
SELECT count() from 02581_trips WHERE description = '';
ALTER TABLE 02581_trips UPDATE description='a' WHERE id IN (SELECT (number*10 + 1)::UInt32 FROM numbers(200000000)) SETTINGS mutations_sync=2, max_rows_in_set=1000;
SELECT count() from 02581_trips WHERE description = '';
-- Run mutation with func(`id`) IN big subquery
ALTER TABLE 02581_trips UPDATE description='b' WHERE id::UInt64 IN (SELECT (number*10 + 2)::UInt32 FROM numbers(200000000)) SETTINGS mutations_sync=2;
SELECT count() from 02581_trips WHERE description = '';
-- Run mutation with non-PK `id2` IN big subquery
ALTER TABLE 02581_trips UPDATE description='c' WHERE id2 IN (SELECT (number*10 + 3)::UInt32 FROM numbers(200000000)) SETTINGS mutations_sync=2;
SELECT count() from 02581_trips WHERE description = '';
-- Run mutation with PK and non-PK IN big subquery
ALTER TABLE 02581_trips UPDATE description='c'
WHERE
(id IN (SELECT (number*10 + 4)::UInt32 FROM numbers(200000000))) OR
(id2 IN (SELECT (number*10 + 4)::UInt32 FROM numbers(200000000)))
SETTINGS mutations_sync=2;
SELECT count() from 02581_trips WHERE description = '';
-- Run mutation with PK and non-PK IN big subquery
ALTER TABLE 02581_trips UPDATE description='c'
WHERE
(id::UInt64 IN (SELECT (number*10 + 5)::UInt32 FROM numbers(200000000))) OR
(id2::UInt64 IN (SELECT (number*10 + 5)::UInt32 FROM numbers(200000000)))
SETTINGS mutations_sync=2;
SELECT count() from 02581_trips WHERE description = '';
-- Run mutation with PK and non-PK IN big subquery
ALTER TABLE 02581_trips UPDATE description='c'
WHERE
(id::UInt32 IN (SELECT (number*10 + 6)::UInt32 FROM numbers(200000000))) OR
((id2+1)::String IN (SELECT (number*10 + 6)::UInt32 FROM numbers(200000000)))
SETTINGS mutations_sync=2;
SELECT count() from 02581_trips WHERE description = '';
DROP TABLE 02581_trips;

View File

@ -0,0 +1,7 @@
40000
all_1_1_0
all_2_2_0
all_3_3_0
all_4_4_0
36000
32000

View File

@ -0,0 +1,32 @@
DROP TABLE IF EXISTS 02581_trips;
CREATE TABLE 02581_trips(id UInt32, description String, id2 UInt32, PRIMARY KEY id) ENGINE=MergeTree ORDER BY id;
-- Make multiple parts
INSERT INTO 02581_trips SELECT number, '', number FROM numbers(10000);
INSERT INTO 02581_trips SELECT number+10000000, '', number FROM numbers(10000);
INSERT INTO 02581_trips SELECT number+20000000, '', number FROM numbers(10000);
INSERT INTO 02581_trips SELECT number+30000000, '', number FROM numbers(10000);
SELECT count() from 02581_trips WHERE description = '';
SELECT name FROM system.parts WHERE database=currentDatabase() AND table = '02581_trips' AND active ORDER BY name;
CREATE TABLE 02581_set (id UInt32) ENGINE = Set;
INSERT INTO 02581_set SELECT number*10+7 FROM numbers(10000000);
-- Run mutation with PK `id` IN big set
ALTER TABLE 02581_trips UPDATE description='d' WHERE id IN 02581_set SETTINGS mutations_sync=2;
SELECT count() from 02581_trips WHERE description = '';
INSERT INTO 02581_set SELECT number*10+8 FROM numbers(10000000);
-- Run mutation with PK `id` IN big set after it is updated
ALTER TABLE 02581_trips UPDATE description='d' WHERE id IN 02581_set SETTINGS mutations_sync=2;
SELECT count() from 02581_trips WHERE description = '';
DROP TABLE 02581_set;
DROP TABLE 02581_trips;

View File

@ -0,0 +1 @@
2097152

View File

@ -0,0 +1,19 @@
#!/usr/bin/env bash
# Tags: no-parallel, long
CUR_DIR=$(cd "$(dirname "${BASH_SOURCE[0]}")" && pwd)
# shellcheck source=../shell_config.sh
. "$CUR_DIR"/../shell_config.sh
# Regression test for crash in case of part exceeds INT_MAX
#
# NOTE: .sh test is used over .sql because it needs $CLICKHOUSE_DATABASE to
# avoid truncation, since seems that the version of MinIO that is used on CI
# too slow with this.
$CLICKHOUSE_CLIENT -nm -q "
INSERT INTO FUNCTION s3('http://localhost:11111/test/$CLICKHOUSE_DATABASE/test_INT_MAX.tsv', '', '', 'TSV')
SELECT repeat('a', 1024) FROM numbers((pow(2, 30) * 2) / 1024)
SETTINGS s3_max_single_part_upload_size = '10Gi';
SELECT count() FROM s3('http://localhost:11111/test/$CLICKHOUSE_DATABASE/test_INT_MAX.tsv');
"