diff --git a/contrib/llvm-project b/contrib/llvm-project index e0accd51793..2aedf7598a4 160000 --- a/contrib/llvm-project +++ b/contrib/llvm-project @@ -1 +1 @@ -Subproject commit e0accd517933ebb44aff84bc8db448ffd8ef1929 +Subproject commit 2aedf7598a4040b23881dbe05b6afaca25a337ef diff --git a/src/Analyzer/Passes/QueryAnalysisPass.cpp b/src/Analyzer/Passes/QueryAnalysisPass.cpp index cff8c2a97bb..8afb9078fae 100644 --- a/src/Analyzer/Passes/QueryAnalysisPass.cpp +++ b/src/Analyzer/Passes/QueryAnalysisPass.cpp @@ -5124,7 +5124,7 @@ ProjectionNames QueryAnalyzer::resolveFunction(QueryTreeNodePtr & node, Identifi /// Create constant set column for constant folding - auto column_set = ColumnSet::create(1, std::move(set)); + auto column_set = ColumnSet::create(1, FutureSet(std::move(set))); argument_columns[1].column = ColumnConst::create(std::move(column_set), 1); } diff --git a/src/Columns/ColumnSet.h b/src/Columns/ColumnSet.h index 316f8196e5a..3f5cf4ad280 100644 --- a/src/Columns/ColumnSet.h +++ b/src/Columns/ColumnSet.h @@ -1,5 +1,6 @@ #pragma once +#include #include #include @@ -20,7 +21,7 @@ class ColumnSet final : public COWHelper private: friend class COWHelper; - ColumnSet(size_t s_, const ConstSetPtr & data_) : data(data_) { s = s_; } + ColumnSet(size_t s_, FutureSet data_) : data(std::move(data_)) { s = s_; } ColumnSet(const ColumnSet &) = default; public: @@ -28,13 +29,13 @@ public: TypeIndex getDataType() const override { return TypeIndex::Set; } MutableColumnPtr cloneDummy(size_t s_) const override { return ColumnSet::create(s_, data); } - ConstSetPtr getData() const { return data; } + ConstSetPtr getData() const { if (!data.isReady()) return nullptr; return data.get(); } // Used only for debugging, making it DUMPABLE Field operator[](size_t) const override { return {}; } private: - ConstSetPtr data; + FutureSet data; }; } diff --git a/src/Core/Settings.h b/src/Core/Settings.h index 199b093360f..4d21960906d 100644 --- a/src/Core/Settings.h +++ b/src/Core/Settings.h @@ -302,6 +302,7 @@ class IColumn; M(Bool, http_skip_not_found_url_for_globs, true, "Skip url's for globs with HTTP_NOT_FOUND error", 0) \ M(Bool, optimize_throw_if_noop, false, "If setting is enabled and OPTIMIZE query didn't actually assign a merge then an explanatory exception is thrown", 0) \ M(Bool, use_index_for_in_with_subqueries, true, "Try using an index if there is a subquery or a table expression on the right side of the IN operator.", 0) \ + M(UInt64, use_index_for_in_with_subqueries_max_values, 0, "The maximum size of set in the right hand side of the IN operator to use table index for filtering. It allows to avoid performance degradation and higher memory usage due to preparation of additional data structures for large queries. Zero means no limit.", 0) \ M(Bool, joined_subquery_requires_alias, true, "Force joined subqueries and table functions to have aliases for correct name qualification.", 0) \ M(Bool, empty_result_for_aggregation_by_empty_set, false, "Return empty result when aggregating without keys on empty set.", 0) \ M(Bool, empty_result_for_aggregation_by_constant_keys_on_empty_set, true, "Return empty result when aggregating by constant keys on empty set.", 0) \ @@ -569,6 +570,7 @@ class IColumn; M(Bool, query_cache_squash_partial_results, true, "Squash partial result blocks to blocks of size 'max_block_size'. Reduces performance of inserts into the query cache but improves the compressability of cache entries.", 0) \ M(Seconds, query_cache_ttl, 60, "After this time in seconds entries in the query cache become stale", 0) \ M(Bool, query_cache_share_between_users, false, "Allow other users to read entry in the query cache", 0) \ + M(Bool, enable_sharing_sets_for_mutations, true, "Allow sharing set objects build for IN subqueries between different tasks of the same mutation. This reduces memory usage and CPU consumption", 0) \ \ M(Bool, optimize_rewrite_sum_if_to_count_if, false, "Rewrite sumIf() and sum(if()) function countIf() function when logically equivalent", 0) \ M(Bool, optimize_rewrite_aggregate_function_with_if, true, "Rewrite aggregate functions with if expression as argument when logically equivalent. For example, avg(if(cond, col, null)) can be rewritten to avgIf(cond, col)", 0) \ diff --git a/src/DataTypes/DataTypeSet.h b/src/DataTypes/DataTypeSet.h index 7ef0d931279..7ddfeb9fe30 100644 --- a/src/DataTypes/DataTypeSet.h +++ b/src/DataTypes/DataTypeSet.h @@ -20,7 +20,7 @@ public: bool isParametric() const override { return true; } // Used for expressions analysis. - MutableColumnPtr createColumn() const override { return ColumnSet::create(0, nullptr); } + MutableColumnPtr createColumn() const override { return ColumnSet::create(0, FutureSet{}); } // Used only for debugging, making it DUMPABLE Field getDefault() const override { return Tuple(); } diff --git a/src/Disks/ObjectStorages/Cached/registerDiskCache.cpp b/src/Disks/ObjectStorages/Cached/registerDiskCache.cpp index 13e1056e047..3a624d8a18d 100644 --- a/src/Disks/ObjectStorages/Cached/registerDiskCache.cpp +++ b/src/Disks/ObjectStorages/Cached/registerDiskCache.cpp @@ -41,7 +41,9 @@ void registerDiskCache(DiskFactory & factory, bool /* global_skip_access_check * file_cache_settings.loadFromConfig(config, config_prefix); if (file_cache_settings.base_path.empty()) - file_cache_settings.base_path = fs::path(context->getPath()) / "disks" / name / "cache/"; + file_cache_settings.base_path = fs::path(context->getPath()) / "disks" / name / "cache/"; + else if (fs::path(file_cache_settings.base_path).is_relative()) + file_cache_settings.base_path = fs::path(context->getPath()) / "caches" / file_cache_settings.base_path; auto cache = FileCacheFactory::instance().getOrCreate(name, file_cache_settings); auto disk = disk_it->second; diff --git a/src/IO/tests/gtest_stringstream.cpp b/src/IO/tests/gtest_stringstream.cpp new file mode 100644 index 00000000000..c4352740695 --- /dev/null +++ b/src/IO/tests/gtest_stringstream.cpp @@ -0,0 +1,37 @@ +#include + +#include +#include + +// There are few places where stringstream is used to pass data to some 3d +// party code. +// +// And there was problems with feeding > INT_MAX to stringstream in libc++, +// this is the regression test for it. +// +// Since that places in Clickhouse can operate on buffers > INT_MAX (i.e. +// WriteBufferFromS3), so it is better to have a test for this in ClickHouse +// too. +TEST(stringstream, INTMAX) +{ + std::stringstream ss; + ss.exceptions(std::ios::badbit); + + std::string payload(1<<20, 'A'); + + // write up to INT_MAX-1MiB + for (size_t i = 0; i < (2ULL<<30) - payload.size(); i += payload.size()) + { + ASSERT_NE(ss.tellp(), -1); + ss.write(payload.data(), payload.size()); + // std::cerr << "i: " << ss.tellp()/1024/1024 << " MB\n"; + } + + ASSERT_NE(ss.tellp(), -1); + // write up to INT_MAX + ss.write(payload.data(), payload.size()); + + ASSERT_NE(ss.tellp(), -1); + // write one more 1MiB chunk + ss.write(payload.data(), payload.size()); +} diff --git a/src/Interpreters/ActionsVisitor.cpp b/src/Interpreters/ActionsVisitor.cpp index 1f4969a7f9a..3bb3ea67e29 100644 --- a/src/Interpreters/ActionsVisitor.cpp +++ b/src/Interpreters/ActionsVisitor.cpp @@ -952,14 +952,16 @@ void ActionsMatcher::visit(const ASTFunction & node, const ASTPtr & ast, Data & return; } - SetPtr prepared_set; + FutureSet prepared_set; if (checkFunctionIsInOrGlobalInOperator(node)) { /// Let's find the type of the first argument (then getActionsImpl will be called again and will not affect anything). visit(node.arguments->children.at(0), data); - if (!data.no_makeset && !(data.is_create_parameterized_view && !analyzeReceiveQueryParams(ast).empty()) - && (prepared_set = makeSet(node, data, data.no_subqueries))) + if (!data.no_makeset && !(data.is_create_parameterized_view && !analyzeReceiveQueryParams(ast).empty())) + prepared_set = makeSet(node, data, data.no_subqueries); + + if (prepared_set.isValid()) { /// Transform tuple or subquery into a set. } @@ -1172,14 +1174,15 @@ void ActionsMatcher::visit(const ASTFunction & node, const ASTPtr & ast, Data & num_arguments += columns.size() - 1; arg += columns.size() - 1; } - else if (checkFunctionIsInOrGlobalInOperator(node) && arg == 1 && prepared_set) + else if (checkFunctionIsInOrGlobalInOperator(node) && arg == 1 && prepared_set.isValid()) { ColumnWithTypeAndName column; column.type = std::make_shared(); /// If the argument is a set given by an enumeration of values (so, the set was already built), give it a unique name, /// so that sets with the same literal representation do not fuse together (they can have different types). - if (!prepared_set->empty()) + const bool is_constant_set = prepared_set.isCreated(); + if (is_constant_set) column.name = data.getUniqueName("__set"); else column.name = child->getColumnName(); @@ -1189,7 +1192,7 @@ void ActionsMatcher::visit(const ASTFunction & node, const ASTPtr & ast, Data & auto column_set = ColumnSet::create(1, prepared_set); /// If prepared_set is not empty, we have a set made with literals. /// Create a const ColumnSet to make constant folding work - if (!prepared_set->empty()) + if (is_constant_set) column.column = ColumnConst::create(std::move(column_set), 1); else column.column = std::move(column_set); @@ -1370,10 +1373,10 @@ void ActionsMatcher::visit(const ASTLiteral & literal, const ASTPtr & /* ast */, data.addColumn(std::move(column)); } -SetPtr ActionsMatcher::makeSet(const ASTFunction & node, Data & data, bool no_subqueries) +FutureSet ActionsMatcher::makeSet(const ASTFunction & node, Data & data, bool no_subqueries) { if (!data.prepared_sets) - return nullptr; + return {}; /** You need to convert the right argument to a set. * This can be a table name, a value, a value enumeration, or a subquery. @@ -1390,8 +1393,12 @@ SetPtr ActionsMatcher::makeSet(const ASTFunction & node, Data & data, bool no_su if (no_subqueries) return {}; auto set_key = PreparedSetKey::forSubquery(*right_in_operand); - if (SetPtr set = data.prepared_sets->get(set_key)) - return set; + + { + auto set = data.prepared_sets->getFuture(set_key); + if (set.isValid()) + return set; + } /// A special case is if the name of the table is specified on the right side of the IN statement, /// and the table has the type Set (a previously prepared set). @@ -1407,7 +1414,7 @@ SetPtr ActionsMatcher::makeSet(const ASTFunction & node, Data & data, bool no_su { SetPtr set = storage_set->getSet(); data.prepared_sets->set(set_key, set); - return set; + return FutureSet(set); } } } @@ -1439,7 +1446,8 @@ SetPtr ActionsMatcher::makeSet(const ASTFunction & node, Data & data, bool no_su const auto & index = data.actions_stack.getLastActionsIndex(); if (data.prepared_sets && index.contains(left_in_operand->getColumnName())) /// An explicit enumeration of values in parentheses. - return makeExplicitSet(&node, last_actions, false, data.getContext(), data.set_size_limit, *data.prepared_sets); + return FutureSet( + makeExplicitSet(&node, last_actions, false, data.getContext(), data.set_size_limit, *data.prepared_sets)); else return {}; } diff --git a/src/Interpreters/ActionsVisitor.h b/src/Interpreters/ActionsVisitor.h index 0269371b46e..260fd5ab2c0 100644 --- a/src/Interpreters/ActionsVisitor.h +++ b/src/Interpreters/ActionsVisitor.h @@ -219,7 +219,7 @@ private: static void visit(const ASTLiteral & literal, const ASTPtr & ast, Data & data); static void visit(ASTExpressionList & expression_list, const ASTPtr & ast, Data & data); - static SetPtr makeSet(const ASTFunction & node, Data & data, bool no_subqueries); + static FutureSet makeSet(const ASTFunction & node, Data & data, bool no_subqueries); static ASTs doUntuple(const ASTFunction * function, ActionsMatcher::Data & data); static std::optional getNameAndTypeFromAST(const ASTPtr & ast, Data & data); }; diff --git a/src/Interpreters/Context.cpp b/src/Interpreters/Context.cpp index 559be27a133..73d78e84198 100644 --- a/src/Interpreters/Context.cpp +++ b/src/Interpreters/Context.cpp @@ -43,6 +43,7 @@ #include #include #include +#include #include #include #include @@ -4342,6 +4343,16 @@ bool Context::canUseParallelReplicasOnFollower() const && getClientInfo().collaborate_with_initiator; } +void Context::setPreparedSetsCache(const PreparedSetsCachePtr & cache) +{ + prepared_sets_cache = cache; +} + +PreparedSetsCachePtr Context::getPreparedSetsCache() const +{ + return prepared_sets_cache; +} + UInt64 Context::getClientProtocolVersion() const { return client_protocol_version; diff --git a/src/Interpreters/Context.h b/src/Interpreters/Context.h index 1988b9d82fe..95611e99d51 100644 --- a/src/Interpreters/Context.h +++ b/src/Interpreters/Context.h @@ -193,6 +193,9 @@ class MergeTreeMetadataCache; using MergeTreeMetadataCachePtr = std::shared_ptr; #endif +class PreparedSetsCache; +using PreparedSetsCachePtr = std::shared_ptr; + /// An empty interface for an arbitrary object that may be attached by a shared pointer /// to query context, when using ClickHouse as a library. struct IHostContext @@ -399,6 +402,10 @@ private: /// Temporary data for query execution accounting. TemporaryDataOnDiskScopePtr temp_data_on_disk; + /// Prepared sets that can be shared between different queries. One use case is when is to share prepared sets between + /// mutation tasks of one mutation executed against different parts of the same table. + PreparedSetsCachePtr prepared_sets_cache; + public: /// Some counters for current query execution. /// Most of them are workarounds and should be removed in the future. @@ -1128,6 +1135,9 @@ public: ParallelReplicasMode getParallelReplicasMode() const; + void setPreparedSetsCache(const PreparedSetsCachePtr & cache); + PreparedSetsCachePtr getPreparedSetsCache() const; + private: std::unique_lock getLock() const; diff --git a/src/Interpreters/ExpressionAnalyzer.cpp b/src/Interpreters/ExpressionAnalyzer.cpp index ade393a8c0e..cc54e7620f6 100644 --- a/src/Interpreters/ExpressionAnalyzer.cpp +++ b/src/Interpreters/ExpressionAnalyzer.cpp @@ -56,6 +56,7 @@ #include #include #include +#include #include @@ -68,6 +69,7 @@ #include #include #include +#include #include #include @@ -147,6 +149,11 @@ ExpressionAnalyzerData::~ExpressionAnalyzerData() = default; ExpressionAnalyzer::ExtractedSettings::ExtractedSettings(const Settings & settings_) : use_index_for_in_with_subqueries(settings_.use_index_for_in_with_subqueries) , size_limits_for_set(settings_.max_rows_in_set, settings_.max_bytes_in_set, settings_.set_overflow_mode) + , size_limits_for_set_used_with_index( + (settings_.use_index_for_in_with_subqueries_max_values && + settings_.use_index_for_in_with_subqueries_max_values < settings_.max_rows_in_set) ? + size_limits_for_set : + SizeLimits(settings_.use_index_for_in_with_subqueries_max_values, settings_.max_bytes_in_set, OverflowMode::BREAK)) , distributed_group_by_no_merge(settings_.distributed_group_by_no_merge) {} @@ -450,7 +457,7 @@ void ExpressionAnalyzer::tryMakeSetForIndexFromSubquery(const ASTPtr & subquery_ auto set_key = PreparedSetKey::forSubquery(*subquery_or_table_name); - if (prepared_sets->get(set_key)) + if (prepared_sets->getFuture(set_key).isValid()) return; /// Already prepared. if (auto set_ptr_from_storage_set = isPlainStorageSetInSubquery(subquery_or_table_name)) @@ -459,25 +466,57 @@ void ExpressionAnalyzer::tryMakeSetForIndexFromSubquery(const ASTPtr & subquery_ return; } - auto interpreter_subquery = interpretSubquery(subquery_or_table_name, getContext(), {}, query_options); - auto io = interpreter_subquery->execute(); - PullingAsyncPipelineExecutor executor(io.pipeline); - - SetPtr set = std::make_shared(settings.size_limits_for_set, true, getContext()->getSettingsRef().transform_null_in); - set->setHeader(executor.getHeader().getColumnsWithTypeAndName()); - - Block block; - while (executor.pull(block)) + auto build_set = [&] () -> SetPtr { - if (block.rows() == 0) - continue; + LOG_TRACE(getLogger(), "Building set, key: {}", set_key.toString()); - /// If the limits have been exceeded, give up and let the default subquery processing actions take place. - if (!set->insertFromBlock(block.getColumnsWithTypeAndName())) - return; + auto interpreter_subquery = interpretSubquery(subquery_or_table_name, getContext(), {}, query_options); + auto io = interpreter_subquery->execute(); + PullingAsyncPipelineExecutor executor(io.pipeline); + + SetPtr set = std::make_shared(settings.size_limits_for_set_used_with_index, true, getContext()->getSettingsRef().transform_null_in); + set->setHeader(executor.getHeader().getColumnsWithTypeAndName()); + + Block block; + while (executor.pull(block)) + { + if (block.rows() == 0) + continue; + + /// If the limits have been exceeded, give up and let the default subquery processing actions take place. + if (!set->insertFromBlock(block.getColumnsWithTypeAndName())) + return nullptr; + } + + set->finishInsert(); + + return set; + }; + + SetPtr set; + + auto set_cache = getContext()->getPreparedSetsCache(); + if (set_cache) + { + auto from_cache = set_cache->findOrPromiseToBuild(set_key.toString()); + if (from_cache.index() == 0) + { + set = build_set(); + std::get<0>(from_cache).set_value(set); + } + else + { + LOG_TRACE(getLogger(), "Waiting for set, key: {}", set_key.toString()); + set = std::get<1>(from_cache).get(); + } + } + else + { + set = build_set(); } - set->finishInsert(); + if (!set) + return; prepared_sets->set(set_key, std::move(set)); } diff --git a/src/Interpreters/ExpressionAnalyzer.h b/src/Interpreters/ExpressionAnalyzer.h index 1676cb506c0..1b6e8e24091 100644 --- a/src/Interpreters/ExpressionAnalyzer.h +++ b/src/Interpreters/ExpressionAnalyzer.h @@ -95,6 +95,7 @@ private: { const bool use_index_for_in_with_subqueries; const SizeLimits size_limits_for_set; + const SizeLimits size_limits_for_set_used_with_index; const UInt64 distributed_group_by_no_merge; explicit ExtractedSettings(const Settings & settings_); diff --git a/src/Interpreters/PreparedSets.cpp b/src/Interpreters/PreparedSets.cpp index 79cfb8b688a..7b0efddae87 100644 --- a/src/Interpreters/PreparedSets.cpp +++ b/src/Interpreters/PreparedSets.cpp @@ -1,7 +1,10 @@ +#include +#include #include #include #include #include +#include namespace DB { @@ -43,6 +46,26 @@ bool PreparedSetKey::operator==(const PreparedSetKey & other) const return true; } +String PreparedSetKey::toString() const +{ + WriteBufferFromOwnString buf; + buf << "__set_" << ast_hash.first << "_" << ast_hash.second; + if (!types.empty()) + { + buf << "("; + bool first = true; + for (const auto & type : types) + { + if (!first) + buf << ","; + first = false; + buf << type->getName(); + } + buf << ")"; + } + return buf.str(); +} + SubqueryForSet & PreparedSets::createOrGetSubquery(const String & subquery_id, const PreparedSetKey & key, SizeLimits set_size_limit, bool transform_null_in) { @@ -51,10 +74,20 @@ SubqueryForSet & PreparedSets::createOrGetSubquery(const String & subquery_id, c /// If you already created a Set with the same subquery / table for another ast /// In that case several PreparedSetKey would share same subquery and set /// Not sure if it's really possible case (maybe for distributed query when set was filled by external table?) - if (subquery.set) + if (subquery.set.isValid()) sets[key] = subquery.set; else - sets[key] = subquery.set = std::make_shared(set_size_limit, false, transform_null_in); + { + subquery.set_in_progress = std::make_shared(set_size_limit, false, transform_null_in); + sets[key] = FutureSet(subquery.promise_to_fill_set.get_future()); + } + + if (!subquery.set_in_progress) + { + subquery.key = key.toString(); + subquery.set_in_progress = std::make_shared(set_size_limit, false, transform_null_in); + } + return subquery; } @@ -62,13 +95,27 @@ SubqueryForSet & PreparedSets::createOrGetSubquery(const String & subquery_id, c /// It's aimed to fill external table passed to SubqueryForSet::createSource. SubqueryForSet & PreparedSets::getSubquery(const String & subquery_id) { return subqueries[subquery_id]; } -void PreparedSets::set(const PreparedSetKey & key, SetPtr set_) { sets[key] = set_; } +void PreparedSets::set(const PreparedSetKey & key, SetPtr set_) { sets[key] = FutureSet(set_); } -SetPtr & PreparedSets::get(const PreparedSetKey & key) { return sets[key]; } - -std::vector PreparedSets::getByTreeHash(IAST::Hash ast_hash) +FutureSet PreparedSets::getFuture(const PreparedSetKey & key) const { - std::vector res; + auto it = sets.find(key); + if (it == sets.end()) + return {}; + return it->second; +} + +SetPtr PreparedSets::get(const PreparedSetKey & key) const +{ + auto it = sets.find(key); + if (it == sets.end() || !it->second.isReady()) + return nullptr; + return it->second.get(); +} + +std::vector PreparedSets::getByTreeHash(IAST::Hash ast_hash) const +{ + std::vector res; for (const auto & it : this->sets) { if (it.first.ast_hash == ast_hash) @@ -106,4 +153,45 @@ QueryPlanPtr SubqueryForSet::detachSource() return res; } + +FutureSet::FutureSet(SetPtr set) +{ + std::promise promise; + promise.set_value(set); + *this = FutureSet(promise.get_future()); +} + + +bool FutureSet::isReady() const +{ + return future_set.valid() && + future_set.wait_for(std::chrono::seconds(0)) == std::future_status::ready; +} + +bool FutureSet::isCreated() const +{ + return isReady() && get() != nullptr && get()->isCreated(); +} + + +std::variant, SharedSet> PreparedSetsCache::findOrPromiseToBuild(const String & key) +{ + std::lock_guard lock(cache_mutex); + + auto it = cache.find(key); + if (it != cache.end()) + { + /// If the set is being built, return its future, but if it's ready and is nullptr then we should retry building it. + if (it->second.future.valid() && + (it->second.future.wait_for(std::chrono::seconds(0)) != std::future_status::ready || it->second.future.get() != nullptr)) + return it->second.future; + } + + /// Insert the entry into the cache so that other threads can find it and start waiting for the set. + std::promise promise_to_fill_set; + Entry & entry = cache[key]; + entry.future = promise_to_fill_set.get_future(); + return promise_to_fill_set; +} + }; diff --git a/src/Interpreters/PreparedSets.h b/src/Interpreters/PreparedSets.h index a50e390ee5a..4a7d1c3de46 100644 --- a/src/Interpreters/PreparedSets.h +++ b/src/Interpreters/PreparedSets.h @@ -2,6 +2,7 @@ #include #include +#include #include #include #include @@ -19,6 +20,37 @@ class Set; using SetPtr = std::shared_ptr; class InterpreterSelectWithUnionQuery; +/// Represents a set in a query that might be referenced at analysis time and built later during execution. +/// Also it can represent a constant set that is ready to use. +/// At analysis stage the FutureSets are created but not necessarily filled. Then for non-constant sets there +/// must be an explicit step to build them before they can be used. +/// FutureSet objects can be stored in PreparedSets and are not intended to be used from multiple threads. +class FutureSet final +{ +public: + FutureSet() = default; + + /// Create FutureSet from an object that will be created in the future. + explicit FutureSet(const std::shared_future & future_set_) : future_set(future_set_) {} + + /// Create FutureSet from a ready set. + explicit FutureSet(SetPtr readySet); + + /// The set object will be ready in the future, as opposed to 'null' object when FutureSet is default constructed. + bool isValid() const { return future_set.valid(); } + + /// The the value of SetPtr is ready, but the set object might not have been filled yet. + bool isReady() const; + + /// The set object is ready and filled. + bool isCreated() const; + + SetPtr get() const { chassert(isReady()); return future_set.get(); } + +private: + std::shared_future future_set; +}; + /// Information on how to build set for the [GLOBAL] IN section. class SubqueryForSet { @@ -33,7 +65,12 @@ public: std::unique_ptr detachSource(); /// Build this set from the result of the subquery. - SetPtr set; + String key; + SetPtr set_in_progress; + /// After set_in_progress is finished it will be put into promise_to_fill_set and thus all FutureSet's + /// that are referencing this set will be filled. + std::promise promise_to_fill_set; + FutureSet set = FutureSet{promise_to_fill_set.get_future()}; /// If set, put the result into the table. /// This is a temporary table for transferring to remote servers for distributed query processing. @@ -59,6 +96,8 @@ struct PreparedSetKey bool operator==(const PreparedSetKey & other) const; + String toString() const; + struct Hash { UInt64 operator()(const PreparedSetKey & key) const { return key.ast_hash.first; } @@ -75,7 +114,8 @@ public: SubqueryForSet & getSubquery(const String & subquery_id); void set(const PreparedSetKey & key, SetPtr set_); - SetPtr & get(const PreparedSetKey & key); + FutureSet getFuture(const PreparedSetKey & key) const; + SetPtr get(const PreparedSetKey & key) const; /// Get subqueries and clear them. /// We need to build a plan for subqueries just once. That's why we can clear them after accessing them. @@ -84,12 +124,12 @@ public: /// Returns all sets that match the given ast hash not checking types /// Used in KeyCondition and MergeTreeIndexConditionBloomFilter to make non exact match for types in PreparedSetKey - std::vector getByTreeHash(IAST::Hash ast_hash); + std::vector getByTreeHash(IAST::Hash ast_hash) const; bool empty() const; private: - std::unordered_map sets; + std::unordered_map sets; /// This is the information required for building sets SubqueriesForSets subqueries; @@ -97,4 +137,31 @@ private: using PreparedSetsPtr = std::shared_ptr; +/// A reference to a set that is being built by another task. +/// The difference from FutureSet is that this object can be used to wait for the set to be built in another thread. +using SharedSet = std::shared_future; + +/// This set cache is used to avoid building the same set multiple times. It is different from PreparedSets in way that +/// it can be used across multiple queries. One use case is when we execute the same mutation on multiple parts. In this +/// case each part is processed by a separate mutation task but they can share the same set. +class PreparedSetsCache +{ +public: + /// Lookup for set in the cache. + /// If it is found, get the future to be able to wait for the set to be built. + /// Otherwise create a promise, build the set and set the promise value. + std::variant, SharedSet> findOrPromiseToBuild(const String & key); + +private: + struct Entry + { + SharedSet future; /// Other tasks can wait for the set to be built. + }; + + std::mutex cache_mutex; + std::unordered_map cache; +}; + +using PreparedSetsCachePtr = std::shared_ptr; + } diff --git a/src/Interpreters/Set.cpp b/src/Interpreters/Set.cpp index 75bb05f8346..a7bea63bd99 100644 --- a/src/Interpreters/Set.cpp +++ b/src/Interpreters/Set.cpp @@ -236,6 +236,11 @@ bool Set::insertFromBlock(const Columns & columns) return limits.check(data.getTotalRowCount(), data.getTotalByteCount(), "IN-set", ErrorCodes::SET_SIZE_LIMIT_EXCEEDED); } +void Set::checkIsCreated() const +{ + if (!is_created.load()) + throw Exception(ErrorCodes::LOGICAL_ERROR, "Logical error: Trying to use set before it has been built."); +} ColumnPtr Set::execute(const ColumnsWithTypeAndName & columns, bool negative) const { diff --git a/src/Interpreters/Set.h b/src/Interpreters/Set.h index e593f71cfbe..fff5fa4e1b1 100644 --- a/src/Interpreters/Set.h +++ b/src/Interpreters/Set.h @@ -54,6 +54,8 @@ public: /// finishInsert and isCreated are thread-safe bool isCreated() const { return is_created.load(); } + void checkIsCreated() const; + /** For columns of 'block', check belonging of corresponding rows to the set. * Return UInt8 column with the result. */ @@ -67,7 +69,7 @@ public: const DataTypes & getElementsTypes() const { return set_elements_types; } bool hasExplicitSetElements() const { return fill_set_elements; } - Columns getSetElements() const { return { set_elements.begin(), set_elements.end() }; } + Columns getSetElements() const { checkIsCreated(); return { set_elements.begin(), set_elements.end() }; } void checkColumnsNumber(size_t num_key_columns) const; bool areTypesEqual(size_t set_type_idx, const DataTypePtr & other_type) const; diff --git a/src/Planner/CollectSets.cpp b/src/Planner/CollectSets.cpp index bc4b0dd09f3..02069aad292 100644 --- a/src/Planner/CollectSets.cpp +++ b/src/Planner/CollectSets.cpp @@ -1,6 +1,7 @@ #include #include +#include #include @@ -52,7 +53,8 @@ public: if (storage_set) { - planner_context.registerSet(set_key, PlannerSet(storage_set->getSet())); + /// Handle storage_set as ready set. + planner_context.registerSet(set_key, PlannerSet(FutureSet(storage_set->getSet()))); } else if (const auto * constant_node = in_second_argument->as()) { @@ -62,16 +64,12 @@ public: constant_node->getResultType(), settings); - planner_context.registerSet(set_key, PlannerSet(std::move(set))); + planner_context.registerSet(set_key, PlannerSet(FutureSet(std::move(set)))); } else if (in_second_argument_node_type == QueryTreeNodeType::QUERY || in_second_argument_node_type == QueryTreeNodeType::UNION) { - SizeLimits size_limits_for_set = {settings.max_rows_in_set, settings.max_bytes_in_set, settings.set_overflow_mode}; - bool tranform_null_in = settings.transform_null_in; - auto set = std::make_shared(size_limits_for_set, false /*fill_set_elements*/, tranform_null_in); - - planner_context.registerSet(set_key, PlannerSet(std::move(set), in_second_argument)); + planner_context.registerSet(set_key, PlannerSet(in_second_argument)); } else { diff --git a/src/Planner/Planner.cpp b/src/Planner/Planner.cpp index 5217d240478..d036c895fbb 100644 --- a/src/Planner/Planner.cpp +++ b/src/Planner/Planner.cpp @@ -890,11 +890,11 @@ void addBuildSubqueriesForSetsStepIfNeeded(QueryPlan & query_plan, for (const auto & node : actions_to_execute->getNodes()) { const auto & set_key = node.result_name; - const auto * planner_set = planner_context->getSetOrNull(set_key); + auto * planner_set = planner_context->getSetOrNull(set_key); if (!planner_set) continue; - if (planner_set->getSet()->isCreated() || !planner_set->getSubqueryNode()) + if (planner_set->getSet().isCreated() || !planner_set->getSubqueryNode()) continue; auto subquery_options = select_query_options.subquery(); @@ -904,8 +904,16 @@ void addBuildSubqueriesForSetsStepIfNeeded(QueryPlan & query_plan, planner_context->getGlobalPlannerContext()); subquery_planner.buildQueryPlanIfNeeded(); + const auto & settings = planner_context->getQueryContext()->getSettingsRef(); + SizeLimits size_limits_for_set = {settings.max_rows_in_set, settings.max_bytes_in_set, settings.set_overflow_mode}; + bool tranform_null_in = settings.transform_null_in; + auto set = std::make_shared(size_limits_for_set, false /*fill_set_elements*/, tranform_null_in); + SubqueryForSet subquery_for_set; + subquery_for_set.key = set_key; + subquery_for_set.set_in_progress = set; subquery_for_set.set = planner_set->getSet(); + subquery_for_set.promise_to_fill_set = planner_set->extractPromiseToBuildSet(); subquery_for_set.source = std::make_unique(std::move(subquery_planner).extractQueryPlan()); subqueries_for_sets.emplace(set_key, std::move(subquery_for_set)); diff --git a/src/Planner/PlannerActionsVisitor.cpp b/src/Planner/PlannerActionsVisitor.cpp index 1e6ef35f4ab..c64d82299ca 100644 --- a/src/Planner/PlannerActionsVisitor.cpp +++ b/src/Planner/PlannerActionsVisitor.cpp @@ -632,7 +632,7 @@ PlannerActionsVisitorImpl::NodeNameAndNodeMinLevel PlannerActionsVisitorImpl::ma column.name = set_key; column.type = std::make_shared(); - bool set_is_created = planner_set.getSet()->isCreated(); + bool set_is_created = planner_set.getSet().isCreated(); auto column_set = ColumnSet::create(1, planner_set.getSet()); if (set_is_created) diff --git a/src/Planner/PlannerContext.cpp b/src/Planner/PlannerContext.cpp index 59ae0f20fac..346cc6d2080 100644 --- a/src/Planner/PlannerContext.cpp +++ b/src/Planner/PlannerContext.cpp @@ -128,7 +128,7 @@ PlannerContext::SetKey PlannerContext::createSetKey(const QueryTreeNodePtr & set void PlannerContext::registerSet(const SetKey & key, PlannerSet planner_set) { - if (!planner_set.getSet()) + if (!planner_set.getSet().isValid()) throw Exception(ErrorCodes::LOGICAL_ERROR, "Set must be initialized"); const auto & subquery_node = planner_set.getSubqueryNode(); @@ -162,7 +162,7 @@ const PlannerSet & PlannerContext::getSetOrThrow(const SetKey & key) const return it->second; } -const PlannerSet * PlannerContext::getSetOrNull(const SetKey & key) const +PlannerSet * PlannerContext::getSetOrNull(const SetKey & key) { auto it = set_key_to_set.find(key); if (it == set_key_to_set.end()) diff --git a/src/Planner/PlannerContext.h b/src/Planner/PlannerContext.h index e47198bfe5f..ccc4ab43638 100644 --- a/src/Planner/PlannerContext.h +++ b/src/Planner/PlannerContext.h @@ -7,6 +7,7 @@ #include #include +#include #include @@ -56,18 +57,18 @@ class PlannerSet { public: /// Construct planner set that is ready for execution - explicit PlannerSet(SetPtr set_) + explicit PlannerSet(FutureSet set_) : set(std::move(set_)) {} /// Construct planner set with set and subquery node - explicit PlannerSet(SetPtr set_, QueryTreeNodePtr subquery_node_) - : set(std::move(set_)) + explicit PlannerSet(QueryTreeNodePtr subquery_node_) + : set(promise_to_build_set.get_future()) , subquery_node(std::move(subquery_node_)) {} - /// Get set - const SetPtr & getSet() const + /// Get a reference to a set that might be not built yet + const FutureSet & getSet() const { return set; } @@ -78,8 +79,15 @@ public: return subquery_node; } + /// This promise will be fulfilled when set is built and all FutureSet objects will become ready + std::promise extractPromiseToBuildSet() + { + return std::move(promise_to_build_set); + } + private: - SetPtr set; + std::promise promise_to_build_set; + FutureSet set; QueryTreeNodePtr subquery_node; }; @@ -186,7 +194,7 @@ public: const PlannerSet & getSetOrThrow(const SetKey & key) const; /// Get set for key, if no set is registered null is returned - const PlannerSet * getSetOrNull(const SetKey & key) const; + PlannerSet * getSetOrNull(const SetKey & key); /// Get registered sets const SetKeyToSet & getRegisteredSets() const diff --git a/src/Processors/QueryPlan/CreatingSetsStep.cpp b/src/Processors/QueryPlan/CreatingSetsStep.cpp index b696b77ccfe..9eec3e90494 100644 --- a/src/Processors/QueryPlan/CreatingSetsStep.cpp +++ b/src/Processors/QueryPlan/CreatingSetsStep.cpp @@ -60,7 +60,7 @@ void CreatingSetStep::describeActions(FormatSettings & settings) const String prefix(settings.offset, ' '); settings.out << prefix; - if (subquery_for_set.set) + if (subquery_for_set.set_in_progress) settings.out << "Set: "; settings.out << description << '\n'; @@ -68,7 +68,7 @@ void CreatingSetStep::describeActions(FormatSettings & settings) const void CreatingSetStep::describeActions(JSONBuilder::JSONMap & map) const { - if (subquery_for_set.set) + if (subquery_for_set.set_in_progress) map.add("Set", description); } diff --git a/src/Processors/Transforms/CreatingSetsTransform.cpp b/src/Processors/Transforms/CreatingSetsTransform.cpp index f65e72c2723..e3ae2d4fd4e 100644 --- a/src/Processors/Transforms/CreatingSetsTransform.cpp +++ b/src/Processors/Transforms/CreatingSetsTransform.cpp @@ -4,6 +4,7 @@ #include #include +#include #include #include @@ -39,13 +40,52 @@ void CreatingSetsTransform::work() if (!is_initialized) init(); + if (done_with_set && done_with_table) + { + finishConsume(); + input.close(); + } + IAccumulatingTransform::work(); } void CreatingSetsTransform::startSubquery() { - if (subquery.set) - LOG_TRACE(log, "Creating set."); + /// Lookup the set in the cache if we don't need to build table. + auto ctx = context.lock(); + if (ctx && ctx->getPreparedSetsCache() && !subquery.table) + { + /// Try to find the set in the cache and wait for it to be built. + /// Retry if the set from cache fails to be built. + while (true) + { + auto from_cache = ctx->getPreparedSetsCache()->findOrPromiseToBuild(subquery.key); + if (from_cache.index() == 0) + { + promise_to_build = std::move(std::get<0>(from_cache)); + } + else + { + LOG_TRACE(log, "Waiting for set to be build by another thread, key: {}", subquery.key); + SharedSet set_built_by_another_thread = std::move(std::get<1>(from_cache)); + const SetPtr & ready_set = set_built_by_another_thread.get(); + if (!ready_set) + { + LOG_TRACE(log, "Failed to use set from cache, key: {}", subquery.key); + continue; + } + + subquery.promise_to_fill_set.set_value(ready_set); + subquery.set_in_progress.reset(); + done_with_set = true; + set_from_cache = true; + } + break; + } + } + + if (subquery.set_in_progress) + LOG_TRACE(log, "Creating set, key: {}", subquery.key); if (subquery.table) LOG_TRACE(log, "Filling temporary table."); @@ -53,10 +93,10 @@ void CreatingSetsTransform::startSubquery() /// TODO: make via port table_out = QueryPipeline(subquery.table->write({}, subquery.table->getInMemoryMetadataPtr(), getContext())); - done_with_set = !subquery.set; + done_with_set = !subquery.set_in_progress; done_with_table = !subquery.table; - if (done_with_set /*&& done_with_join*/ && done_with_table) + if ((done_with_set && !set_from_cache) /*&& done_with_join*/ && done_with_table) throw Exception(ErrorCodes::LOGICAL_ERROR, "Logical error: nothing to do with subquery"); if (table_out.initialized()) @@ -68,12 +108,16 @@ void CreatingSetsTransform::startSubquery() void CreatingSetsTransform::finishSubquery() { - if (read_rows != 0) - { - auto seconds = watch.elapsedNanoseconds() / 1e9; + auto seconds = watch.elapsedNanoseconds() / 1e9; - if (subquery.set) - LOG_DEBUG(log, "Created Set with {} entries from {} rows in {} sec.", subquery.set->getTotalRowCount(), read_rows, seconds); + if (set_from_cache) + { + LOG_DEBUG(log, "Got set from cache in {} sec.", seconds); + } + else if (read_rows != 0) + { + if (subquery.set_in_progress) + LOG_DEBUG(log, "Created Set with {} entries from {} rows in {} sec.", subquery.set_in_progress->getTotalRowCount(), read_rows, seconds); if (subquery.table) LOG_DEBUG(log, "Created Table with {} rows in {} sec.", read_rows, seconds); } @@ -87,8 +131,10 @@ void CreatingSetsTransform::init() { is_initialized = true; - if (subquery.set) - subquery.set->setHeader(getInputPort().getHeader().getColumnsWithTypeAndName()); + if (subquery.set_in_progress) + { + subquery.set_in_progress->setHeader(getInputPort().getHeader().getColumnsWithTypeAndName()); + } watch.restart(); startSubquery(); @@ -101,7 +147,7 @@ void CreatingSetsTransform::consume(Chunk chunk) if (!done_with_set) { - if (!subquery.set->insertFromBlock(block.getColumnsWithTypeAndName())) + if (!subquery.set_in_progress->insertFromBlock(block.getColumnsWithTypeAndName())) done_with_set = true; } @@ -124,8 +170,13 @@ void CreatingSetsTransform::consume(Chunk chunk) Chunk CreatingSetsTransform::generate() { - if (subquery.set) - subquery.set->finishInsert(); + if (subquery.set_in_progress) + { + subquery.set_in_progress->finishInsert(); + subquery.promise_to_fill_set.set_value(subquery.set_in_progress); + if (promise_to_build) + promise_to_build->set_value(subquery.set_in_progress); + } if (table_out.initialized()) { diff --git a/src/Processors/Transforms/CreatingSetsTransform.h b/src/Processors/Transforms/CreatingSetsTransform.h index ca59fb9e220..26bbc45933d 100644 --- a/src/Processors/Transforms/CreatingSetsTransform.h +++ b/src/Processors/Transforms/CreatingSetsTransform.h @@ -43,10 +43,12 @@ public: private: SubqueryForSet subquery; + std::optional> promise_to_build; QueryPipeline table_out; std::unique_ptr executor; UInt64 read_rows = 0; + bool set_from_cache = false; Stopwatch watch; bool done_with_set = true; diff --git a/src/Storages/MergeTree/MutatePlainMergeTreeTask.cpp b/src/Storages/MergeTree/MutatePlainMergeTreeTask.cpp index e283cfa8a93..04effdb8894 100644 --- a/src/Storages/MergeTree/MutatePlainMergeTreeTask.cpp +++ b/src/Storages/MergeTree/MutatePlainMergeTreeTask.cpp @@ -52,6 +52,14 @@ void MutatePlainMergeTreeTask::prepare() std::move(profile_counters_snapshot)); }; + if (task_context->getSettingsRef().enable_sharing_sets_for_mutations) + { + /// If we have a prepared sets cache for this mutations, we will use it. + auto mutation_id = future_part->part_info.mutation; + auto prepared_sets_cache_for_mutation = storage.getPreparedSetsCache(mutation_id); + task_context->setPreparedSetsCache(prepared_sets_cache_for_mutation); + } + mutate_task = storage.merger_mutator.mutatePartToTemporaryPart( future_part, metadata_snapshot, merge_mutate_entry->commands, merge_list_entry.get(), time(nullptr), task_context, merge_mutate_entry->txn, merge_mutate_entry->tagger->reserved_space, table_lock_holder); diff --git a/src/Storages/MergeTree/MutateTask.cpp b/src/Storages/MergeTree/MutateTask.cpp index edcf85292f4..a3fa210ac42 100644 --- a/src/Storages/MergeTree/MutateTask.cpp +++ b/src/Storages/MergeTree/MutateTask.cpp @@ -1724,6 +1724,8 @@ bool MutateTask::prepare() /// Allow mutations to work when force_index_by_date or force_primary_key is on. context_for_reading->setSetting("force_index_by_date", false); context_for_reading->setSetting("force_primary_key", false); + /// Skip using large sets in KeyCondition + context_for_reading->setSetting("use_index_for_in_with_subqueries_max_values", 100000); for (const auto & command : *ctx->commands) if (!canSkipMutationCommandForPart(ctx->source_part, command, context_for_reading)) diff --git a/src/Storages/MergeTree/RPNBuilder.cpp b/src/Storages/MergeTree/RPNBuilder.cpp index dd6dbf7e02e..e49459d3d17 100644 --- a/src/Storages/MergeTree/RPNBuilder.cpp +++ b/src/Storages/MergeTree/RPNBuilder.cpp @@ -288,7 +288,7 @@ ConstSetPtr tryGetSetFromDAGNode(const ActionsDAG::Node * dag_node) { auto set = column_set->getData(); - if (set->isCreated()) + if (set && set->isCreated()) return set; } @@ -305,8 +305,8 @@ ConstSetPtr RPNBuilderTreeNode::tryGetPreparedSet() const { auto prepared_sets_with_same_hash = prepared_sets->getByTreeHash(ast_node->getTreeHash()); for (auto & set : prepared_sets_with_same_hash) - if (set->isCreated()) - return set; + if (set.isCreated()) + return set.get(); } else if (dag_node) { @@ -368,8 +368,8 @@ ConstSetPtr RPNBuilderTreeNode::tryGetPreparedSet( auto tree_hash = ast_node->getTreeHash(); for (const auto & set : prepared_sets->getByTreeHash(tree_hash)) { - if (types_match(set)) - return set; + if (set.isCreated() && types_match(set.get())) + return set.get(); } } else diff --git a/src/Storages/StorageMergeTree.cpp b/src/Storages/StorageMergeTree.cpp index 34bf5d55270..5513603bca6 100644 --- a/src/Storages/StorageMergeTree.cpp +++ b/src/Storages/StorageMergeTree.cpp @@ -2181,6 +2181,35 @@ std::unique_ptr StorageMergeTree::getDefaultSettings() const return std::make_unique(getContext()->getMergeTreeSettings()); } +PreparedSetsCachePtr StorageMergeTree::getPreparedSetsCache(Int64 mutation_id) +{ + auto l = std::lock_guard(mutation_prepared_sets_cache_mutex); + + /// Cleanup stale entries where the shared_ptr is expired. + while (!mutation_prepared_sets_cache.empty()) + { + auto it = mutation_prepared_sets_cache.begin(); + if (it->second.lock()) + break; + mutation_prepared_sets_cache.erase(it); + } + + /// Look up an existing entry. + auto it = mutation_prepared_sets_cache.find(mutation_id); + if (it != mutation_prepared_sets_cache.end()) + { + /// If the entry is still alive, return it. + auto existing_set_cache = it->second.lock(); + if (existing_set_cache) + return existing_set_cache; + } + + /// Create new entry. + auto cache = std::make_shared(); + mutation_prepared_sets_cache[mutation_id] = cache; + return cache; +} + void StorageMergeTree::fillNewPartName(MutableDataPartPtr & part, DataPartsLock &) { part->info.min_block = part->info.max_block = increment.get(); diff --git a/src/Storages/StorageMergeTree.h b/src/Storages/StorageMergeTree.h index a0629bb8d3e..6f8acf9965a 100644 --- a/src/Storages/StorageMergeTree.h +++ b/src/Storages/StorageMergeTree.h @@ -151,6 +151,13 @@ private: std::atomic shutdown_called {false}; std::atomic flush_called {false}; + /// PreparedSets cache for one executing mutation. + /// NOTE: we only store weak_ptr to PreparedSetsCache, so that the cache is shared between mutation tasks that are executed in parallel. + /// The goal is to avoiding consuming a lot of memory when the same big sets are used by multiple tasks at the same time. + /// If the tasks are executed without time overlap, we will destroy the cache to free memory, and the next task might rebuild the same sets. + std::mutex mutation_prepared_sets_cache_mutex; + std::map mutation_prepared_sets_cache; + void loadMutations(); /// Load and initialize deduplication logs. Even if deduplication setting @@ -259,6 +266,8 @@ private: std::unique_ptr getDefaultSettings() const override; + PreparedSetsCachePtr getPreparedSetsCache(Int64 mutation_id); + friend class MergeTreeSink; friend class MergeTreeData; friend class MergePlainMergeTreeTask; diff --git a/src/Storages/StorageS3Settings.cpp b/src/Storages/StorageS3Settings.cpp index d58e043a149..17a11ba9848 100644 --- a/src/Storages/StorageS3Settings.cpp +++ b/src/Storages/StorageS3Settings.cpp @@ -77,12 +77,6 @@ void S3Settings::RequestSettings::PartUploadSettings::updateFromSettingsImpl(con if (!if_changed || settings.s3_max_single_part_upload_size.changed) max_single_part_upload_size = settings.s3_max_single_part_upload_size; - - /// AWS S3 SDK library has a bug. It is using std::*stream (which is a major offense). - /// LLVM libc++ has a bug. It does not allow std::*stream to work for large strings. - - if (max_single_part_upload_size >= 2_GiB) - throw Exception(ErrorCodes::INVALID_SETTING_VALUE, "Uploading parts of 2 GiB or larger is not supported due to a bug in AWS and LLVM. Lower the value of `s3_max_single_part_upload_size` setting."); } void S3Settings::RequestSettings::PartUploadSettings::validate() diff --git a/tests/queries/0_stateless/02344_describe_cache.reference b/tests/queries/0_stateless/02344_describe_cache.reference index d3bb37af5cf..c98e9d263ca 100644 --- a/tests/queries/0_stateless/02344_describe_cache.reference +++ b/tests/queries/0_stateless/02344_describe_cache.reference @@ -1,2 +1,2 @@ -2147483648 1048576 104857600 1 0 0 0 s3_cache/ 0 -2147483648 1048576 104857600 0 0 0 0 s3_cache_2/ 0 +2147483648 1048576 104857600 1 0 0 0 /var/lib/clickhouse/caches/s3_cache/ 0 +2147483648 1048576 104857600 0 0 0 0 /var/lib/clickhouse/caches/s3_cache_2/ 0 diff --git a/tests/queries/0_stateless/02581_share_big_sets_between_multiple_mutations_tasks_long.reference b/tests/queries/0_stateless/02581_share_big_sets_between_multiple_mutations_tasks_long.reference new file mode 100644 index 00000000000..3a92fcf283d --- /dev/null +++ b/tests/queries/0_stateless/02581_share_big_sets_between_multiple_mutations_tasks_long.reference @@ -0,0 +1,9 @@ +40000 +all_1_1_0 +all_2_2_0 +all_3_3_0 +all_4_4_0 +5000 all_1_1_0_9 +5000 all_2_2_0_9 +5000 all_3_3_0_9 +5000 all_4_4_0_9 diff --git a/tests/queries/0_stateless/02581_share_big_sets_between_multiple_mutations_tasks_long.sql b/tests/queries/0_stateless/02581_share_big_sets_between_multiple_mutations_tasks_long.sql new file mode 100644 index 00000000000..92e372d0cdb --- /dev/null +++ b/tests/queries/0_stateless/02581_share_big_sets_between_multiple_mutations_tasks_long.sql @@ -0,0 +1,27 @@ +-- Tags: long, no-debug, no-tsan, no-asan, no-ubsan, no-msan + +DROP TABLE IF EXISTS 02581_trips; + +CREATE TABLE 02581_trips(id UInt32, description String, id2 UInt32, PRIMARY KEY id) ENGINE=MergeTree ORDER BY id; + +-- Make multiple parts +INSERT INTO 02581_trips SELECT number, '', number FROM numbers(10000); +INSERT INTO 02581_trips SELECT number+10000000, '', number FROM numbers(10000); +INSERT INTO 02581_trips SELECT number+20000000, '', number FROM numbers(10000); +INSERT INTO 02581_trips SELECT number+30000000, '', number FROM numbers(10000); + +SELECT count() from 02581_trips WHERE description = ''; + +SELECT name FROM system.parts WHERE database=currentDatabase() AND table = '02581_trips' AND active ORDER BY name; + +-- Start multiple mutations simultaneously +SYSTEM STOP MERGES 02581_trips; +ALTER TABLE 02581_trips UPDATE description='5' WHERE id IN (SELECT (number*10 + 5)::UInt32 FROM numbers(200000000)) SETTINGS mutations_sync=0; +ALTER TABLE 02581_trips UPDATE description='6' WHERE id IN (SELECT (number*10 + 6)::UInt32 FROM numbers(200000000)) SETTINGS mutations_sync=0; +ALTER TABLE 02581_trips DELETE WHERE id IN (SELECT (number*10 + 7)::UInt32 FROM numbers(200000000)) SETTINGS mutations_sync=0; +ALTER TABLE 02581_trips UPDATE description='8' WHERE id IN (SELECT (number*10 + 8)::UInt32 FROM numbers(200000000)) SETTINGS mutations_sync=0; +SYSTEM START MERGES 02581_trips; +DELETE FROM 02581_trips WHERE id IN (SELECT (number*10 + 9)::UInt32 FROM numbers(200000000)); +SELECT count(), _part from 02581_trips WHERE description = '' GROUP BY _part ORDER BY _part; + +DROP TABLE 02581_trips; diff --git a/tests/queries/0_stateless/02581_share_big_sets_between_mutation_tasks.reference b/tests/queries/0_stateless/02581_share_big_sets_between_mutation_tasks.reference new file mode 100644 index 00000000000..18e83d1244a --- /dev/null +++ b/tests/queries/0_stateless/02581_share_big_sets_between_mutation_tasks.reference @@ -0,0 +1,19 @@ +-- { echoOn } +SELECT count(), _part FROM 02581_trips GROUP BY _part ORDER BY _part; +10000 all_1_1_0 +10000 all_2_2_0 +10000 all_3_3_0 +10000 all_4_4_0 +-- Run mutation with a 'IN big subquery' +ALTER TABLE 02581_trips UPDATE description='1' WHERE id IN (SELECT (number*10+1)::UInt32 FROM numbers(10000000)) SETTINGS mutations_sync=2; +SELECT count(), _part FROM 02581_trips WHERE description = '' GROUP BY _part ORDER BY _part; +9000 all_1_1_0_5 +9000 all_2_2_0_5 +9000 all_3_3_0_5 +9000 all_4_4_0_5 +ALTER TABLE 02581_trips UPDATE description='2' WHERE id IN (SELECT (number*10+2)::UInt32 FROM numbers(10000)) SETTINGS mutations_sync=2; +SELECT count(), _part FROM 02581_trips WHERE description = '' GROUP BY _part ORDER BY _part; +8000 all_1_1_0_6 +8000 all_2_2_0_6 +8000 all_3_3_0_6 +8000 all_4_4_0_6 diff --git a/tests/queries/0_stateless/02581_share_big_sets_between_mutation_tasks.sql b/tests/queries/0_stateless/02581_share_big_sets_between_mutation_tasks.sql new file mode 100644 index 00000000000..fc90582d20e --- /dev/null +++ b/tests/queries/0_stateless/02581_share_big_sets_between_mutation_tasks.sql @@ -0,0 +1,21 @@ +DROP TABLE IF EXISTS 02581_trips; + +CREATE TABLE 02581_trips(id UInt32, description String) ENGINE=MergeTree ORDER BY id; + +-- Make multiple parts +INSERT INTO 02581_trips SELECT number, '' FROM numbers(10000); +INSERT INTO 02581_trips SELECT number+10000, '' FROM numbers(10000); +INSERT INTO 02581_trips SELECT number+20000, '' FROM numbers(10000); +INSERT INTO 02581_trips SELECT number+30000, '' FROM numbers(10000); + +-- { echoOn } +SELECT count(), _part FROM 02581_trips GROUP BY _part ORDER BY _part; + +-- Run mutation with a 'IN big subquery' +ALTER TABLE 02581_trips UPDATE description='1' WHERE id IN (SELECT (number*10+1)::UInt32 FROM numbers(10000000)) SETTINGS mutations_sync=2; +SELECT count(), _part FROM 02581_trips WHERE description = '' GROUP BY _part ORDER BY _part; +ALTER TABLE 02581_trips UPDATE description='2' WHERE id IN (SELECT (number*10+2)::UInt32 FROM numbers(10000)) SETTINGS mutations_sync=2; +SELECT count(), _part FROM 02581_trips WHERE description = '' GROUP BY _part ORDER BY _part; +-- { echoOff } + +DROP TABLE 02581_trips; diff --git a/tests/queries/0_stateless/02581_share_big_sets_between_mutation_tasks_long.reference b/tests/queries/0_stateless/02581_share_big_sets_between_mutation_tasks_long.reference new file mode 100644 index 00000000000..3a7410d925f --- /dev/null +++ b/tests/queries/0_stateless/02581_share_big_sets_between_mutation_tasks_long.reference @@ -0,0 +1,12 @@ +40000 +all_1_1_0 +all_2_2_0 +all_3_3_0 +all_4_4_0 +36000 +32000 +28000 +24000 +20000 +16000 +12000 diff --git a/tests/queries/0_stateless/02581_share_big_sets_between_mutation_tasks_long.sql b/tests/queries/0_stateless/02581_share_big_sets_between_mutation_tasks_long.sql new file mode 100644 index 00000000000..97cf979e80a --- /dev/null +++ b/tests/queries/0_stateless/02581_share_big_sets_between_mutation_tasks_long.sql @@ -0,0 +1,57 @@ +-- Tags: long, no-tsan, no-asan, no-ubsan, no-msan + +DROP TABLE IF EXISTS 02581_trips; + +CREATE TABLE 02581_trips(id UInt32, description String, id2 UInt32, PRIMARY KEY id) ENGINE=MergeTree ORDER BY id; + +-- Make multiple parts +INSERT INTO 02581_trips SELECT number, '', number FROM numbers(10000); +INSERT INTO 02581_trips SELECT number+10000000, '', number FROM numbers(10000); +INSERT INTO 02581_trips SELECT number+20000000, '', number FROM numbers(10000); +INSERT INTO 02581_trips SELECT number+30000000, '', number FROM numbers(10000); + +SELECT count() from 02581_trips WHERE description = ''; + + +SELECT name FROM system.parts WHERE database=currentDatabase() AND table = '02581_trips' AND active ORDER BY name; + +-- Run mutation with `id` a 'IN big subquery' +ALTER TABLE 02581_trips UPDATE description='a' WHERE id IN (SELECT (number*10)::UInt32 FROM numbers(200000000)) SETTINGS mutations_sync=2; +SELECT count() from 02581_trips WHERE description = ''; + +ALTER TABLE 02581_trips UPDATE description='a' WHERE id IN (SELECT (number*10 + 1)::UInt32 FROM numbers(200000000)) SETTINGS mutations_sync=2, max_rows_in_set=1000; +SELECT count() from 02581_trips WHERE description = ''; + +-- Run mutation with func(`id`) IN big subquery +ALTER TABLE 02581_trips UPDATE description='b' WHERE id::UInt64 IN (SELECT (number*10 + 2)::UInt32 FROM numbers(200000000)) SETTINGS mutations_sync=2; +SELECT count() from 02581_trips WHERE description = ''; + +-- Run mutation with non-PK `id2` IN big subquery +ALTER TABLE 02581_trips UPDATE description='c' WHERE id2 IN (SELECT (number*10 + 3)::UInt32 FROM numbers(200000000)) SETTINGS mutations_sync=2; +SELECT count() from 02581_trips WHERE description = ''; + +-- Run mutation with PK and non-PK IN big subquery +ALTER TABLE 02581_trips UPDATE description='c' +WHERE + (id IN (SELECT (number*10 + 4)::UInt32 FROM numbers(200000000))) OR + (id2 IN (SELECT (number*10 + 4)::UInt32 FROM numbers(200000000))) +SETTINGS mutations_sync=2; +SELECT count() from 02581_trips WHERE description = ''; + +-- Run mutation with PK and non-PK IN big subquery +ALTER TABLE 02581_trips UPDATE description='c' +WHERE + (id::UInt64 IN (SELECT (number*10 + 5)::UInt32 FROM numbers(200000000))) OR + (id2::UInt64 IN (SELECT (number*10 + 5)::UInt32 FROM numbers(200000000))) +SETTINGS mutations_sync=2; +SELECT count() from 02581_trips WHERE description = ''; + +-- Run mutation with PK and non-PK IN big subquery +ALTER TABLE 02581_trips UPDATE description='c' +WHERE + (id::UInt32 IN (SELECT (number*10 + 6)::UInt32 FROM numbers(200000000))) OR + ((id2+1)::String IN (SELECT (number*10 + 6)::UInt32 FROM numbers(200000000))) +SETTINGS mutations_sync=2; +SELECT count() from 02581_trips WHERE description = ''; + +DROP TABLE 02581_trips; diff --git a/tests/queries/0_stateless/02581_share_big_sets_between_mutation_tasks_with_storage_set.reference b/tests/queries/0_stateless/02581_share_big_sets_between_mutation_tasks_with_storage_set.reference new file mode 100644 index 00000000000..267105947b9 --- /dev/null +++ b/tests/queries/0_stateless/02581_share_big_sets_between_mutation_tasks_with_storage_set.reference @@ -0,0 +1,7 @@ +40000 +all_1_1_0 +all_2_2_0 +all_3_3_0 +all_4_4_0 +36000 +32000 diff --git a/tests/queries/0_stateless/02581_share_big_sets_between_mutation_tasks_with_storage_set.sql b/tests/queries/0_stateless/02581_share_big_sets_between_mutation_tasks_with_storage_set.sql new file mode 100644 index 00000000000..9a14f78628b --- /dev/null +++ b/tests/queries/0_stateless/02581_share_big_sets_between_mutation_tasks_with_storage_set.sql @@ -0,0 +1,32 @@ +DROP TABLE IF EXISTS 02581_trips; + +CREATE TABLE 02581_trips(id UInt32, description String, id2 UInt32, PRIMARY KEY id) ENGINE=MergeTree ORDER BY id; + +-- Make multiple parts +INSERT INTO 02581_trips SELECT number, '', number FROM numbers(10000); +INSERT INTO 02581_trips SELECT number+10000000, '', number FROM numbers(10000); +INSERT INTO 02581_trips SELECT number+20000000, '', number FROM numbers(10000); +INSERT INTO 02581_trips SELECT number+30000000, '', number FROM numbers(10000); + +SELECT count() from 02581_trips WHERE description = ''; + + +SELECT name FROM system.parts WHERE database=currentDatabase() AND table = '02581_trips' AND active ORDER BY name; + +CREATE TABLE 02581_set (id UInt32) ENGINE = Set; + +INSERT INTO 02581_set SELECT number*10+7 FROM numbers(10000000); + +-- Run mutation with PK `id` IN big set +ALTER TABLE 02581_trips UPDATE description='d' WHERE id IN 02581_set SETTINGS mutations_sync=2; +SELECT count() from 02581_trips WHERE description = ''; + +INSERT INTO 02581_set SELECT number*10+8 FROM numbers(10000000); + +-- Run mutation with PK `id` IN big set after it is updated +ALTER TABLE 02581_trips UPDATE description='d' WHERE id IN 02581_set SETTINGS mutations_sync=2; +SELECT count() from 02581_trips WHERE description = ''; + + +DROP TABLE 02581_set; +DROP TABLE 02581_trips; diff --git a/tests/queries/0_stateless/02700_s3_part_INT_MAX.reference b/tests/queries/0_stateless/02700_s3_part_INT_MAX.reference new file mode 100644 index 00000000000..8425fb4df60 --- /dev/null +++ b/tests/queries/0_stateless/02700_s3_part_INT_MAX.reference @@ -0,0 +1 @@ +2097152 diff --git a/tests/queries/0_stateless/02700_s3_part_INT_MAX.sh b/tests/queries/0_stateless/02700_s3_part_INT_MAX.sh new file mode 100755 index 00000000000..d831c7d9806 --- /dev/null +++ b/tests/queries/0_stateless/02700_s3_part_INT_MAX.sh @@ -0,0 +1,19 @@ +#!/usr/bin/env bash +# Tags: no-parallel, long + +CUR_DIR=$(cd "$(dirname "${BASH_SOURCE[0]}")" && pwd) +# shellcheck source=../shell_config.sh +. "$CUR_DIR"/../shell_config.sh + +# Regression test for crash in case of part exceeds INT_MAX +# +# NOTE: .sh test is used over .sql because it needs $CLICKHOUSE_DATABASE to +# avoid truncation, since seems that the version of MinIO that is used on CI +# too slow with this. +$CLICKHOUSE_CLIENT -nm -q " + INSERT INTO FUNCTION s3('http://localhost:11111/test/$CLICKHOUSE_DATABASE/test_INT_MAX.tsv', '', '', 'TSV') + SELECT repeat('a', 1024) FROM numbers((pow(2, 30) * 2) / 1024) + SETTINGS s3_max_single_part_upload_size = '10Gi'; + + SELECT count() FROM s3('http://localhost:11111/test/$CLICKHOUSE_DATABASE/test_INT_MAX.tsv'); +"