Reworked, testing

This commit is contained in:
Alexander Gololobov 2023-04-04 12:01:01 +02:00
parent 3884f89f5c
commit 0b9579bfb8
19 changed files with 306 additions and 136 deletions

View File

@ -5124,7 +5124,7 @@ ProjectionNames QueryAnalyzer::resolveFunction(QueryTreeNodePtr & node, Identifi
/// Create constant set column for constant folding
auto column_set = ColumnSet::create(1, std::move(set));
auto column_set = ColumnSet::create(1, makeReadyFutureSet(std::move(set)));
argument_columns[1].column = ColumnConst::create(std::move(column_set), 1);
}

View File

@ -2,13 +2,18 @@
#include <Columns/IColumnDummy.h>
#include <Core/Field.h>
#include "Common/Exception.h"
#include <chrono>
#include <future>
#include <stdexcept>
namespace DB
{
class Set;
using SetPtr = std::shared_ptr<Set>;
using ConstSetPtr = std::shared_ptr<const Set>;
using FutureSet = std::shared_future<SetPtr>;
/** A column containing multiple values in the `IN` section.
@ -20,7 +25,7 @@ class ColumnSet final : public COWHelper<IColumnDummy, ColumnSet>
private:
friend class COWHelper<IColumnDummy, ColumnSet>;
ColumnSet(size_t s_, const ConstSetPtr & data_) : data(data_) { s = s_; }
ColumnSet(size_t s_, FutureSet data_) : data(std::move(data_)) { s = s_; }
ColumnSet(const ColumnSet &) = default;
public:
@ -28,13 +33,13 @@ public:
TypeIndex getDataType() const override { return TypeIndex::Set; }
MutableColumnPtr cloneDummy(size_t s_) const override { return ColumnSet::create(s_, data); }
ConstSetPtr getData() const { return data; }
ConstSetPtr getData() const { if (!data.valid() || data.wait_for(std::chrono::seconds(0)) != std::future_status::ready ) return nullptr; return data.get(); }
// Used only for debugging, making it DUMPABLE
Field operator[](size_t) const override { return {}; }
private:
ConstSetPtr data;
FutureSet data;
};
}

View File

@ -20,7 +20,7 @@ public:
bool isParametric() const override { return true; }
// Used for expressions analysis.
MutableColumnPtr createColumn() const override { return ColumnSet::create(0, nullptr); }
MutableColumnPtr createColumn() const override { return ColumnSet::create(0, FutureSet{}); }
// Used only for debugging, making it DUMPABLE
Field getDefault() const override { return Tuple(); }

View File

@ -544,7 +544,6 @@ ActionsMatcher::Data::Data(
, subquery_depth(subquery_depth_)
, source_columns(source_columns_)
, prepared_sets(prepared_sets_)
, prepared_sets_cache(context_->getPreparedSetsCache())
, no_subqueries(no_subqueries_)
, no_makeset(no_makeset_)
, only_consts(only_consts_)
@ -953,14 +952,16 @@ void ActionsMatcher::visit(const ASTFunction & node, const ASTPtr & ast, Data &
return;
}
SetPtr prepared_set;
FutureSet prepared_set;
if (checkFunctionIsInOrGlobalInOperator(node))
{
/// Let's find the type of the first argument (then getActionsImpl will be called again and will not affect anything).
visit(node.arguments->children.at(0), data);
if (!data.no_makeset && !(data.is_create_parameterized_view && !analyzeReceiveQueryParams(ast).empty())
&& (prepared_set = makeSet(node, data, data.no_subqueries)))
if (!data.no_makeset && !(data.is_create_parameterized_view && !analyzeReceiveQueryParams(ast).empty()))
prepared_set = makeSet(node, data, data.no_subqueries);
if (prepared_set.valid())
{
/// Transform tuple or subquery into a set.
}
@ -1173,14 +1174,15 @@ void ActionsMatcher::visit(const ASTFunction & node, const ASTPtr & ast, Data &
num_arguments += columns.size() - 1;
arg += columns.size() - 1;
}
else if (checkFunctionIsInOrGlobalInOperator(node) && arg == 1 && prepared_set)
else if (checkFunctionIsInOrGlobalInOperator(node) && arg == 1 && prepared_set.valid())
{
ColumnWithTypeAndName column;
column.type = std::make_shared<DataTypeSet>();
/// If the argument is a set given by an enumeration of values (so, the set was already built), give it a unique name,
/// so that sets with the same literal representation do not fuse together (they can have different types).
if (!prepared_set->empty())
const bool is_constant_set = prepared_set.wait_for(std::chrono::seconds(0)) == std::future_status::ready && prepared_set.get()->isCreated();
if (is_constant_set) /// TODO: if the set is from prepared_sets_cache, it might be not empty already but we should not handle it as const!!!
column.name = data.getUniqueName("__set");
else
column.name = child->getColumnName();
@ -1190,13 +1192,16 @@ void ActionsMatcher::visit(const ASTFunction & node, const ASTPtr & ast, Data &
auto column_set = ColumnSet::create(1, prepared_set);
/// If prepared_set is not empty, we have a set made with literals.
/// Create a const ColumnSet to make constant folding work
if (!prepared_set->empty())
column.column = ColumnConst::create(std::move(column_set), 1);
if (is_constant_set)
column.column = ColumnConst::create(std::move(column_set), 1); /// TODO: and here we alos must not handle set form cahce as const!!!
else
column.column = std::move(column_set);
data.addColumn(column);
}
// TODO: if we added an empty set it means that it has not been built yet.
// We should add a wait for it to be filled somewhere before we access it
argument_types.push_back(column.type);
argument_names.push_back(column.name);
}
@ -1371,10 +1376,10 @@ void ActionsMatcher::visit(const ASTLiteral & literal, const ASTPtr & /* ast */,
data.addColumn(std::move(column));
}
SetPtr ActionsMatcher::makeSet(const ASTFunction & node, Data & data, bool no_subqueries)
FutureSet ActionsMatcher::makeSet(const ASTFunction & node, Data & data, bool no_subqueries)
{
if (!data.prepared_sets)
return nullptr;
return {};//nullptr;
/** You need to convert the right argument to a set.
* This can be a table name, a value, a value enumeration, or a subquery.
@ -1391,16 +1396,11 @@ SetPtr ActionsMatcher::makeSet(const ASTFunction & node, Data & data, bool no_su
if (no_subqueries)
return {};
auto set_key = PreparedSetKey::forSubquery(*right_in_operand);
if (SetPtr set = data.prepared_sets->get(set_key))
return set;
if (data.prepared_sets_cache)
{
if (auto set = data.prepared_sets_cache->findOrBuild(set_key, nullptr))
{
data.prepared_sets->set(set_key, set);
auto set = data.prepared_sets->getFuture(set_key);
if (set.valid())
return set;
}
}
/// A special case is if the name of the table is specified on the right side of the IN statement,
@ -1417,7 +1417,7 @@ SetPtr ActionsMatcher::makeSet(const ASTFunction & node, Data & data, bool no_su
{
SetPtr set = storage_set->getSet();
data.prepared_sets->set(set_key, set);
return set;
return makeReadyFutureSet(set);
}
}
}
@ -1449,7 +1449,8 @@ SetPtr ActionsMatcher::makeSet(const ASTFunction & node, Data & data, bool no_su
const auto & index = data.actions_stack.getLastActionsIndex();
if (data.prepared_sets && index.contains(left_in_operand->getColumnName()))
/// An explicit enumeration of values in parentheses.
return makeExplicitSet(&node, last_actions, false, data.getContext(), data.set_size_limit, *data.prepared_sets);
return makeReadyFutureSet(
makeExplicitSet(&node, last_actions, false, data.getContext(), data.set_size_limit, *data.prepared_sets));
else
return {};
}

View File

@ -126,7 +126,6 @@ public:
size_t subquery_depth;
const NamesAndTypesList & source_columns;
PreparedSetsPtr prepared_sets;
PreparedSetsCachePtr prepared_sets_cache;
bool no_subqueries;
bool no_makeset;
bool only_consts;
@ -220,7 +219,7 @@ private:
static void visit(const ASTLiteral & literal, const ASTPtr & ast, Data & data);
static void visit(ASTExpressionList & expression_list, const ASTPtr & ast, Data & data);
static SetPtr makeSet(const ASTFunction & node, Data & data, bool no_subqueries);
static FutureSet makeSet(const ASTFunction & node, Data & data, bool no_subqueries);
static ASTs doUntuple(const ASTFunction * function, ActionsMatcher::Data & data);
static std::optional<NameAndTypePair> getNameAndTypeFromAST(const ASTPtr & ast, Data & data);
};

View File

@ -1,3 +1,4 @@
#include <cstddef>
#include <memory>
#include <Core/Block.h>
@ -56,6 +57,8 @@
#include <Core/Names.h>
#include <Core/NamesAndTypes.h>
#include <Common/logger_useful.h>
#include "Interpreters/PreparedSets.h"
#include "QueryPipeline/SizeLimits.h"
#include <DataTypes/DataTypesNumber.h>
@ -450,7 +453,7 @@ void ExpressionAnalyzer::tryMakeSetForIndexFromSubquery(const ASTPtr & subquery_
auto set_key = PreparedSetKey::forSubquery(*subquery_or_table_name);
if (prepared_sets->get(set_key))
if (prepared_sets->getFuture(set_key).valid())
return; /// Already prepared.
if (auto set_ptr_from_storage_set = isPlainStorageSetInSubquery(subquery_or_table_name))
@ -465,7 +468,8 @@ void ExpressionAnalyzer::tryMakeSetForIndexFromSubquery(const ASTPtr & subquery_
auto io = interpreter_subquery->execute();
PullingAsyncPipelineExecutor executor(io.pipeline);
SetPtr set = std::make_shared<Set>(settings.size_limits_for_set, true, getContext()->getSettingsRef().transform_null_in);
SizeLimits size_limits_for_key_condition_sets(0, 50*1000*1000, OverflowMode::BREAK);
SetPtr set = std::make_shared<Set>(size_limits_for_key_condition_sets, true, getContext()->getSettingsRef().transform_null_in);
set->setHeader(executor.getHeader().getColumnsWithTypeAndName());
Block block;
@ -484,9 +488,28 @@ void ExpressionAnalyzer::tryMakeSetForIndexFromSubquery(const ASTPtr & subquery_
return set;
};
auto set_cache = getContext()->getPreparedSetsCache();
SetPtr set;
auto set = set_cache ? set_cache->findOrBuild(set_key, build_set) : build_set();
auto set_cache = getContext()->getPreparedSetsCache();
if (set_cache)
{
auto from_cache = set_cache->findOrPromiseToBuild(set_key);
if (from_cache.index() == 0)
{
LOG_TRACE(getLogger(), "Building set, key: {}:{}", set_key.ast_hash.first, set_key.ast_hash.second);
set = build_set();
std::get<0>(from_cache).set_value(set);
}
else
{
LOG_TRACE(getLogger(), "Waiting for set, key: {}:{}", set_key.ast_hash.first, set_key.ast_hash.second);
set = std::get<1>(from_cache).get();
}
}
else
{
set = build_set();
}
if (!set)
return;

View File

@ -1,3 +1,5 @@
#include <chrono>
#include <variant>
#include <Interpreters/PreparedSets.h>
#include <Processors/QueryPlan/QueryPlan.h>
#include <Interpreters/InterpreterSelectWithUnionQuery.h>
@ -51,10 +53,20 @@ SubqueryForSet & PreparedSets::createOrGetSubquery(const String & subquery_id, c
/// If you already created a Set with the same subquery / table for another ast
/// In that case several PreparedSetKey would share same subquery and set
/// Not sure if it's really possible case (maybe for distributed query when set was filled by external table?)
if (subquery.set)
sets[key] = subquery.set;
if (subquery.set.valid())
sets[key] = subquery.set; // TODO:
else
sets[key] = subquery.set = std::make_shared<Set>(set_size_limit, false, transform_null_in);
{
subquery.set_in_progress = std::make_shared<Set>(set_size_limit, false, transform_null_in);
sets[key] = subquery.promise_to_fill_set.get_future();
}
if (!subquery.set_in_progress)
{
subquery.key = key;
subquery.set_in_progress = std::make_shared<Set>(set_size_limit, false, transform_null_in);
}
return subquery;
}
@ -62,17 +74,31 @@ SubqueryForSet & PreparedSets::createOrGetSubquery(const String & subquery_id, c
/// It's aimed to fill external table passed to SubqueryForSet::createSource.
SubqueryForSet & PreparedSets::getSubquery(const String & subquery_id) { return subqueries[subquery_id]; }
void PreparedSets::set(const PreparedSetKey & key, SetPtr set_) { sets[key] = set_; }
void PreparedSets::set(const PreparedSetKey & key, SetPtr set_) { sets[key] = makeReadyFutureSet(set_); }
SetPtr & PreparedSets::get(const PreparedSetKey & key) { return sets[key]; }
FutureSet PreparedSets::getFuture(const PreparedSetKey & key) const
{
auto it = sets.find(key);
if (it == sets.end())// || it->second.wait_for(std::chrono::seconds(0)) != std::future_status::ready)
return {};
return it->second;
}
std::vector<SetPtr> PreparedSets::getByTreeHash(IAST::Hash ast_hash)
SetPtr PreparedSets::get(const PreparedSetKey & key) const
{
auto it = sets.find(key);
if (it == sets.end() || it->second.wait_for(std::chrono::seconds(0)) != std::future_status::ready)
return nullptr;
return it->second.get();
}
std::vector<SetPtr> PreparedSets::getByTreeHash(IAST::Hash ast_hash) const
{
std::vector<SetPtr> res;
for (const auto & it : this->sets)
{
if (it.first.ast_hash == ast_hash)
res.push_back(it.second);
res.push_back(it.second.get());
}
return res;
}
@ -106,12 +132,10 @@ QueryPlanPtr SubqueryForSet::detachSource()
return res;
}
SetPtr PreparedSetsCache::findOrBuild(const PreparedSetKey & key, const std::function<SetPtr()> & build_set)
{
auto* log = &Poco::Logger::get("PreparedSetsCache");
EntryPtr entry;
bool need_to_build_set = false;
std::variant<std::promise<SetPtr>, FutureSet> PreparedSetsCache::findOrPromiseToBuild(const PreparedSetKey & key)
{
// auto* log = &Poco::Logger::get("PreparedSetsCache");
/// Look for existing entry in the cache.
{
@ -120,48 +144,30 @@ SetPtr PreparedSetsCache::findOrBuild(const PreparedSetKey & key, const std::fun
auto it = cache.find(key);
if (it != cache.end())
{
entry = it->second;
/// If the set is being built, return its future, but if it's ready and is nullptr then we should retry building it.
/// TODO: consider moving retry logic outside of the cache.
if (it->second->future.valid() &&
(it->second->future.wait_for(std::chrono::seconds(0)) != std::future_status::ready || it->second->future.get() != nullptr))
return it->second->future;
}
else
{
if (build_set == nullptr)
return nullptr;
{
/// Insert the entry into the cache so that other threads can find it and start waiting for the set.
entry = std::make_shared<Entry>();
entry->filled_set = entry->promise.get_future();
std::promise<SetPtr> promise_to_fill_set;
auto entry = std::make_shared<Entry>();
entry->future = promise_to_fill_set.get_future();
cache[key] = entry;
need_to_build_set = true;
return promise_to_fill_set;
}
}
if (need_to_build_set)
{
LOG_DEBUG(log, "Building set for key {}:{}", key.ast_hash.first, key.ast_hash.second);
try
{
auto set = build_set();
entry->promise.set_value(set);
}
catch (...)
{
entry->promise.set_exception(std::current_exception());
throw;
}
return entry->filled_set.get();
}
if (entry->filled_set.valid() && entry->filled_set.wait_for(std::chrono::seconds(0)) == std::future_status::ready)
{
LOG_DEBUG(log, "Found set for key {}:{} without wait", key.ast_hash.first, key.ast_hash.second);
}
else
{
LOG_DEBUG(log, "Found set for key {}:{} with wait", key.ast_hash.first, key.ast_hash.second);
}
return entry->filled_set.get();
}
FutureSet makeReadyFutureSet(SetPtr set)
{
std::promise<SetPtr> promise;
promise.set_value(set);
return promise.get_future();
}
};

View File

@ -18,32 +18,10 @@ class QueryPlan;
class Set;
using SetPtr = std::shared_ptr<Set>;
using ConstSetPtr = std::shared_ptr<const Set>;
using FutureSet = std::shared_future<SetPtr>;
class InterpreterSelectWithUnionQuery;
/// Information on how to build set for the [GLOBAL] IN section.
class SubqueryForSet
{
public:
void createSource(InterpreterSelectWithUnionQuery & interpreter, StoragePtr table_ = nullptr);
bool hasSource() const;
/// Returns query plan for the set's source
/// and removes it from SubqueryForSet because we need to build it only once.
std::unique_ptr<QueryPlan> detachSource();
/// Build this set from the result of the subquery.
SetPtr set;
/// If set, put the result into the table.
/// This is a temporary table for transferring to remote servers for distributed query processing.
StoragePtr table;
/// The source is obtained using the InterpreterSelectQuery subquery.
std::unique_ptr<QueryPlan> source;
};
struct PreparedSetKey
{
/// Prepared sets for tuple literals are indexed by the hash of the tree contents and by the desired
@ -66,9 +44,39 @@ struct PreparedSetKey
};
};
/// Information on how to build set for the [GLOBAL] IN section.
class SubqueryForSet
{
public:
void createSource(InterpreterSelectWithUnionQuery & interpreter, StoragePtr table_ = nullptr);
bool hasSource() const;
/// Returns query plan for the set's source
/// and removes it from SubqueryForSet because we need to build it only once.
std::unique_ptr<QueryPlan> detachSource();
/// Build this set from the result of the subquery.
PreparedSetKey key;
SetPtr set_in_progress;
std::promise<SetPtr> promise_to_fill_set;
FutureSet set = promise_to_fill_set.get_future();
/// If set, put the result into the table.
/// This is a temporary table for transferring to remote servers for distributed query processing.
StoragePtr table;
/// The source is obtained using the InterpreterSelectQuery subquery.
std::unique_ptr<QueryPlan> source;
};
class PreparedSets
{
public:
// explicit PreparedSets(PreparedSetsCachePtr cache_ = {}) : cache(cache_) {}
using SubqueriesForSets = std::unordered_map<String, SubqueryForSet>;
SubqueryForSet & createOrGetSubquery(const String & subquery_id, const PreparedSetKey & key,
@ -76,7 +84,8 @@ public:
SubqueryForSet & getSubquery(const String & subquery_id);
void set(const PreparedSetKey & key, SetPtr set_);
SetPtr & get(const PreparedSetKey & key);
FutureSet getFuture(const PreparedSetKey & key) const;
SetPtr get(const PreparedSetKey & key) const;
/// Get subqueries and clear them.
/// We need to build a plan for subqueries just once. That's why we can clear them after accessing them.
@ -85,12 +94,12 @@ public:
/// Returns all sets that match the given ast hash not checking types
/// Used in KeyCondition and MergeTreeIndexConditionBloomFilter to make non exact match for types in PreparedSetKey
std::vector<SetPtr> getByTreeHash(IAST::Hash ast_hash);
std::vector<SetPtr> getByTreeHash(IAST::Hash ast_hash) const;
bool empty() const;
private:
std::unordered_map<PreparedSetKey, SetPtr, PreparedSetKey::Hash> sets;
std::unordered_map<PreparedSetKey, FutureSet, PreparedSetKey::Hash> sets;
/// This is the information required for building sets
SubqueriesForSets subqueries;
@ -98,22 +107,25 @@ private:
using PreparedSetsPtr = std::shared_ptr<PreparedSets>;
/// This set cache is used to avoid building the same set multiple times. It is different from PreparedSets in way that
/// it can be used across multiple queries. One use case is when we execute the same mutation on multiple parts. In this
/// case each part is processed by a separate mutation task but they can share the same set.
/// TODO: need to distinguish between sets with and w/o set_elements!!!!
class PreparedSetsCache
{
public:
/// Returns the set from the cache or builds it using the provided function.
/// If the set is already being built by another task, then this call will wait for the set to be built.
SetPtr findOrBuild(const PreparedSetKey & key, const std::function<SetPtr()> & build_set);
FutureSet findOrBuild(const PreparedSetKey & key, const std::function<FutureSet()> & build_set);
std::variant<std::promise<SetPtr>, FutureSet> findOrPromiseToBuild(const PreparedSetKey & key);
private:
struct Entry
{
std::promise<SetPtr> promise; /// The promise is set when the set is built by the first task.
std::shared_future<SetPtr> filled_set; /// Other tasks can wait for the set to be built.
// std::promise<SetPtr> promise; /// The promise is set when the set is built by the first task.
std::shared_future<SetPtr> future; /// Other tasks can wait for the set to be built.
};
using EntryPtr = std::shared_ptr<Entry>;
@ -125,4 +137,8 @@ private:
using PreparedSetsCachePtr = std::shared_ptr<PreparedSetsCache>;
FutureSet makeReadyFutureSet(SetPtr set);
}

View File

@ -1,3 +1,5 @@
#include <memory>
#include <mutex>
#include <optional>
#include <Core/Field.h>
@ -236,6 +238,26 @@ bool Set::insertFromBlock(const Columns & columns)
return limits.check(data.getTotalRowCount(), data.getTotalByteCount(), "IN-set", ErrorCodes::SET_SIZE_LIMIT_EXCEEDED);
}
void Set::finishInsert()
{
is_created = true;
is_created_promise.set_value();
}
void Set::waitForIsCreated() const
{
if (is_created.load())
return;
// FIXME: each thread must wait on its own copy of the future
std::shared_future<void> local_is_created_future;
{
std::lock_guard<std::mutex> lock(is_created_future_mutex);
local_is_created_future = is_created_future;
}
local_is_created_future.wait();
}
ColumnPtr Set::execute(const ColumnsWithTypeAndName & columns, bool negative) const
{
@ -244,6 +266,8 @@ ColumnPtr Set::execute(const ColumnsWithTypeAndName & columns, bool negative) co
if (0 == num_key_columns)
throw Exception(ErrorCodes::LOGICAL_ERROR, "Logical error: no columns passed to Set::execute method.");
waitForIsCreated();
auto res = ColumnUInt8::create();
ColumnUInt8::Container & vec_res = res->getData();
vec_res.resize(columns.at(0).column->size());

View File

@ -34,6 +34,7 @@ public:
: log(&Poco::Logger::get("Set")),
limits(limits_), fill_set_elements(fill_set_elements_), transform_null_in(transform_null_in_)
{
is_created_future = is_created_promise.get_future();
}
/** Set can be created either from AST or from a stream of data (subquery result).
@ -49,11 +50,13 @@ public:
bool insertFromBlock(const ColumnsWithTypeAndName & columns);
/// Call after all blocks were inserted. To get the information that set is already created.
void finishInsert() { is_created = true; }
void finishInsert();
/// finishInsert and isCreated are thread-safe
bool isCreated() const { return is_created.load(); }
void waitForIsCreated() const;
/** For columns of 'block', check belonging of corresponding rows to the set.
* Return UInt8 column with the result.
*/
@ -67,7 +70,7 @@ public:
const DataTypes & getElementsTypes() const { return set_elements_types; }
bool hasExplicitSetElements() const { return fill_set_elements; }
Columns getSetElements() const { return { set_elements.begin(), set_elements.end() }; }
Columns getSetElements() const { waitForIsCreated(); return { set_elements.begin(), set_elements.end() }; }
void checkColumnsNumber(size_t num_key_columns) const;
bool areTypesEqual(size_t set_type_idx, const DataTypePtr & other_type) const;
@ -115,6 +118,9 @@ private:
/// Check if set contains all the data.
std::atomic<bool> is_created = false;
std::promise<void> is_created_promise;
mutable std::mutex is_created_future_mutex;
mutable std::shared_future<void> is_created_future TSA_GUARDED_BY(is_created_future_mutex);
/// If in the left part columns contains the same types as the elements of the set.
void executeOrdinary(

View File

@ -905,7 +905,7 @@ void addBuildSubqueriesForSetsStepIfNeeded(QueryPlan & query_plan,
subquery_planner.buildQueryPlanIfNeeded();
SubqueryForSet subquery_for_set;
subquery_for_set.set = planner_set->getSet();
subquery_for_set.set = makeReadyFutureSet(planner_set->getSet()); // TODO: make it lazy?
subquery_for_set.source = std::make_unique<QueryPlan>(std::move(subquery_planner).extractQueryPlan());
subqueries_for_sets.emplace(set_key, std::move(subquery_for_set));

View File

@ -633,7 +633,7 @@ PlannerActionsVisitorImpl::NodeNameAndNodeMinLevel PlannerActionsVisitorImpl::ma
column.type = std::make_shared<DataTypeSet>();
bool set_is_created = planner_set.getSet()->isCreated();
auto column_set = ColumnSet::create(1, planner_set.getSet());
auto column_set = ColumnSet::create(1, FutureSet{});//planner_set.getSet());
if (set_is_created)
column.column = ColumnConst::create(std::move(column_set), 1);

View File

@ -60,7 +60,7 @@ void CreatingSetStep::describeActions(FormatSettings & settings) const
String prefix(settings.offset, ' ');
settings.out << prefix;
if (subquery_for_set.set)
if (subquery_for_set.set_in_progress)
settings.out << "Set: ";
settings.out << description << '\n';
@ -68,7 +68,7 @@ void CreatingSetStep::describeActions(FormatSettings & settings) const
void CreatingSetStep::describeActions(JSONBuilder::JSONMap & map) const
{
if (subquery_for_set.set)
// if (subquery_for_set.set_in_progress)
map.add("Set", description);
}

View File

@ -4,6 +4,7 @@
#include <Interpreters/Set.h>
#include <Interpreters/IJoin.h>
#include <Interpreters/Context.h>
#include <Storages/IStorage.h>
#include <Common/logger_useful.h>
@ -39,13 +40,37 @@ void CreatingSetsTransform::work()
if (!is_initialized)
init();
if (done_with_set && done_with_table)
{
finishConsume();
input.close(); // TODO: what is the proper way to finish the input? why input.close() was not called before my changes?
}
IAccumulatingTransform::work();
}
void CreatingSetsTransform::startSubquery()
{
if (subquery.set)
LOG_TRACE(log, "Creating set.");
// TODO: lookup the set in the context->prepared_sets_cache
auto ctx = context.lock();
if (ctx && ctx->getPreparedSetsCache())
{
auto from_cache = ctx->getPreparedSetsCache()->findOrPromiseToBuild(subquery.key);
if (from_cache.index() == 0)
promise_to_build = std::move(std::get<0>(from_cache));
else
{
LOG_TRACE(log, "Waiting for set to be build by another thread.");
FutureSet set_built_by_another_thread = std::move(std::get<1>(from_cache));
SetPtr ready_set = set_built_by_another_thread.get();
subquery.promise_to_fill_set.set_value(ready_set);
done_with_set = true;
subquery.set_in_progress.reset();
}
}
if (subquery.set_in_progress)
LOG_TRACE(log, "Creating set, key: {}:{}", subquery.key.ast_hash.first, subquery.key.ast_hash.second);
if (subquery.table)
LOG_TRACE(log, "Filling temporary table.");
@ -53,11 +78,12 @@ void CreatingSetsTransform::startSubquery()
/// TODO: make via port
table_out = QueryPipeline(subquery.table->write({}, subquery.table->getInMemoryMetadataPtr(), getContext()));
done_with_set = !subquery.set;
done_with_set = !subquery.set_in_progress;
done_with_table = !subquery.table;
if (done_with_set /*&& done_with_join*/ && done_with_table)
throw Exception(ErrorCodes::LOGICAL_ERROR, "Logical error: nothing to do with subquery");
// TODO: properly do this check
// if (done_with_set /*&& done_with_join*/ && done_with_table)
// throw Exception(ErrorCodes::LOGICAL_ERROR, "Logical error: nothing to do with subquery");
if (table_out.initialized())
{
@ -72,8 +98,8 @@ void CreatingSetsTransform::finishSubquery()
{
auto seconds = watch.elapsedNanoseconds() / 1e9;
if (subquery.set)
LOG_DEBUG(log, "Created Set with {} entries from {} rows in {} sec.", subquery.set->getTotalRowCount(), read_rows, seconds);
if (subquery.set_in_progress)
LOG_DEBUG(log, "Created Set with {} entries from {} rows in {} sec.", subquery.set_in_progress->getTotalRowCount(), read_rows, seconds);
if (subquery.table)
LOG_DEBUG(log, "Created Table with {} rows in {} sec.", read_rows, seconds);
}
@ -87,8 +113,10 @@ void CreatingSetsTransform::init()
{
is_initialized = true;
if (subquery.set)
subquery.set->setHeader(getInputPort().getHeader().getColumnsWithTypeAndName());
if (subquery.set_in_progress)
{
subquery.set_in_progress->setHeader(getInputPort().getHeader().getColumnsWithTypeAndName());
}
watch.restart();
startSubquery();
@ -101,7 +129,7 @@ void CreatingSetsTransform::consume(Chunk chunk)
if (!done_with_set)
{
if (!subquery.set->insertFromBlock(block.getColumnsWithTypeAndName()))
if (!subquery.set_in_progress->insertFromBlock(block.getColumnsWithTypeAndName()))
done_with_set = true;
}
@ -124,8 +152,13 @@ void CreatingSetsTransform::consume(Chunk chunk)
Chunk CreatingSetsTransform::generate()
{
if (subquery.set)
subquery.set->finishInsert();
if (subquery.set_in_progress)
{
subquery.set_in_progress->finishInsert();
subquery.promise_to_fill_set.set_value(subquery.set_in_progress);
if (promise_to_build)
promise_to_build->set_value(subquery.set_in_progress);
}
if (table_out.initialized())
{

View File

@ -1,5 +1,6 @@
#pragma once
#include <future>
#include <QueryPipeline/SizeLimits.h>
#include <Interpreters/Context_fwd.h>
#include <Processors/IAccumulatingTransform.h>
@ -43,6 +44,7 @@ public:
private:
SubqueryForSet subquery;
std::optional<std::promise<SetPtr>> promise_to_build;
QueryPipeline table_out;
std::unique_ptr<PushingPipelineExecutor> executor;

View File

@ -1725,6 +1725,10 @@ bool MutateTask::prepare()
context_for_reading->setSetting("force_index_by_date", false);
context_for_reading->setSetting("force_primary_key", false);
// /// Skip using large sets in keyCondition
// context_for_reading->setSetting("max_bytes_in_set", 50*1000*1000);
// context_for_reading->setSetting("set_overflow_mode", String("break"));
for (const auto & command : *ctx->commands)
if (!canSkipMutationCommandForPart(ctx->source_part, command, context_for_reading))
ctx->commands_for_part.emplace_back(command);
@ -1773,6 +1777,9 @@ bool MutateTask::prepare()
context_for_reading->setSetting("max_threads", 1);
context_for_reading->setSetting("allow_asynchronous_read_from_io_pool_for_merge_tree", false);
context_for_reading->setSetting("max_streams_for_merge_tree_reading", Field(0));
/// Restore settings for large sets.
context_for_reading->setSetting("max_bytes_in_set", ctx->context->getSettings().max_bytes_in_set.value);
context_for_reading->setSetting("set_overflow_mode", ctx->context->getSettings().set_overflow_mode.toString());
MutationHelpers::splitAndModifyMutationCommands(ctx->source_part, ctx->commands_for_part, ctx->for_interpreter, ctx->for_file_renames);

View File

@ -288,7 +288,7 @@ ConstSetPtr tryGetSetFromDAGNode(const ActionsDAG::Node * dag_node)
{
auto set = column_set->getData();
if (set->isCreated())
if (set && set->isCreated())
return set;
}

View File

@ -3,3 +3,10 @@ all_1_1_0
all_2_2_0
all_3_3_0
all_4_4_0
36000
32000
28000
24000
20000
16000
12000

View File

@ -2,19 +2,60 @@
DROP TABLE IF EXISTS 02581_trips;
CREATE TABLE 02581_trips(id UInt32, description String) ENGINE=MergeTree ORDER BY id;
CREATE TABLE 02581_trips(id UInt32, description String, id2 UInt32, PRIMARY KEY id) ENGINE=MergeTree ORDER BY id;
-- Make multiple parts
INSERT INTO 02581_trips SELECT number, '' FROM numbers(10000);
INSERT INTO 02581_trips SELECT number+100000, '' FROM numbers(10000);
INSERT INTO 02581_trips SELECT number+200000, '' FROM numbers(10000);
INSERT INTO 02581_trips SELECT number+300000, '' FROM numbers(10000);
INSERT INTO 02581_trips SELECT number, '', number FROM numbers(10000);
INSERT INTO 02581_trips SELECT number+100000, '', number FROM numbers(10000);
INSERT INTO 02581_trips SELECT number+200000, '', number FROM numbers(10000);
INSERT INTO 02581_trips SELECT number+300000, '', number FROM numbers(10000);
SELECT count() from 02581_trips WHERE description = '';
SELECT count() from 02581_trips;
SELECT name FROM system.parts WHERE database=currentDatabase() AND table = '02581_trips' AND active ORDER BY name;
-- Run mutation with a 'IN big subquery'
ALTER TABLE 02581_trips UPDATE description='' WHERE id IN (SELECT (number+5)::UInt32 FROM numbers(100000000)) SETTINGS mutations_sync=2;
-- Run mutation with `id` a 'IN big subquery'
ALTER TABLE 02581_trips UPDATE description='a' WHERE id IN (SELECT (number*10)::UInt32 FROM numbers(200000000)) SETTINGS mutations_sync=2;
SELECT count() from 02581_trips WHERE description = '';
ALTER TABLE 02581_trips UPDATE description='a' WHERE id IN (SELECT (number*10 + 1)::UInt32 FROM numbers(200000000)) SETTINGS mutations_sync=2, max_rows_in_set=1000;
SELECT count() from 02581_trips WHERE description = '';
-- Run mutation with func(`id`) IN big subquery
ALTER TABLE 02581_trips UPDATE description='b' WHERE id::UInt64 IN (SELECT (number*10 + 2)::UInt32 FROM numbers(200000000)) SETTINGS mutations_sync=2;
SELECT count() from 02581_trips WHERE description = '';
-- Run mutation with non-PK `id2` IN big subquery
ALTER TABLE 02581_trips UPDATE description='c' WHERE id2 IN (SELECT (number*10 + 3)::UInt32 FROM numbers(200000000)) SETTINGS mutations_sync=2;
SELECT count() from 02581_trips WHERE description = '';
-- Run mutation with PK and non-PK IN big subquery
ALTER TABLE 02581_trips UPDATE description='c'
WHERE
(id IN (SELECT (number*10 + 4)::UInt32 FROM numbers(200000000))) OR
(id2 IN (SELECT (number*10 + 4)::UInt32 FROM numbers(200000000)))
SETTINGS mutations_sync=2;
SELECT count() from 02581_trips WHERE description = '';
-- Run mutation with PK and non-PK IN big subquery
ALTER TABLE 02581_trips UPDATE description='c'
WHERE
(id::UInt64 IN (SELECT (number*10 + 5)::UInt32 FROM numbers(200000000))) OR
(id2::UInt64 IN (SELECT (number*10 + 5)::UInt32 FROM numbers(200000000)))
SETTINGS mutations_sync=2;
SELECT count() from 02581_trips WHERE description = '';
CREATE TABLE 02581_set (id UInt32) ENGINE = Set;
INSERT INTO 02581_set SELECT number*10+6 FROM numbers(200000000);
-- Run mutation with non-PK `id2` IN big subquery
ALTER TABLE 02581_trips UPDATE description='d' WHERE id IN 02581_set SETTINGS mutations_sync=2;
SELECT count() from 02581_trips WHERE description = '';
DROP TABLE 02581_set;
DROP TABLE 02581_trips;