Refactor PreparedSets [3]

This commit is contained in:
Nikolai Kochetov 2023-05-04 17:54:08 +00:00
parent 80a2f30a0c
commit f598a39ea2
38 changed files with 585 additions and 292 deletions

View File

@ -5117,14 +5117,26 @@ ProjectionNames QueryAnalyzer::resolveFunction(QueryTreeNodePtr & node, Identifi
const auto & second_argument_constant_literal = second_argument_constant_node->getValue();
const auto & second_argument_constant_type = second_argument_constant_node->getResultType();
auto set = makeSetForConstantValue(first_argument_constant_type,
const auto & settings = scope.context->getSettingsRef();
auto result_block = makeSetForConstantValue(first_argument_constant_type,
second_argument_constant_literal,
second_argument_constant_type,
scope.context->getSettingsRef());
settings.transform_null_in);
SizeLimits size_limits_for_set = {settings.max_rows_in_set, settings.max_bytes_in_set, settings.set_overflow_mode};
auto set = std::make_shared<Set>(size_limits_for_set, true /*fill_set_elements*/, settings.transform_null_in);
set->setHeader(result_block.cloneEmpty().getColumnsWithTypeAndName());
set->insertFromBlock(result_block.getColumnsWithTypeAndName());
set->finishInsert();
auto future_set = std::make_shared<FutureSetFromStorage>(std::move(set));
/// Create constant set column for constant folding
auto column_set = ColumnSet::create(1, FutureSet(std::move(set)));
auto column_set = ColumnSet::create(1, std::move(future_set));
argument_columns[1].column = ColumnConst::create(std::move(column_set), 1);
}

View File

@ -118,7 +118,7 @@ Block createBlockFromCollection(const Collection & collection, const DataTypes &
}
SetPtr makeSetForConstantValue(const DataTypePtr & expression_type, const Field & value, const DataTypePtr & value_type, const Settings & settings)
Block makeSetForConstantValue(const DataTypePtr & expression_type, const Field & value, const DataTypePtr & value_type, bool transform_null_in)
{
DataTypes set_element_types = {expression_type};
const auto * lhs_tuple_type = typeid_cast<const DataTypeTuple *>(expression_type.get());
@ -135,8 +135,8 @@ SetPtr makeSetForConstantValue(const DataTypePtr & expression_type, const Field
size_t lhs_type_depth = getCompoundTypeDepth(*expression_type);
size_t rhs_type_depth = getCompoundTypeDepth(*value_type);
SizeLimits size_limits_for_set = {settings.max_rows_in_set, settings.max_bytes_in_set, settings.set_overflow_mode};
bool tranform_null_in = settings.transform_null_in;
// SizeLimits size_limits_for_set = {settings.max_rows_in_set, settings.max_bytes_in_set, settings.set_overflow_mode};
// bool transform_null_in = settings.transform_null_in;
Block result_block;
@ -145,7 +145,7 @@ SetPtr makeSetForConstantValue(const DataTypePtr & expression_type, const Field
/// 1 in 1; (1, 2) in (1, 2); identity(tuple(tuple(tuple(1)))) in tuple(tuple(tuple(1))); etc.
Array array{value};
result_block = createBlockFromCollection(array, set_element_types, tranform_null_in);
result_block = createBlockFromCollection(array, set_element_types, transform_null_in);
}
else if (lhs_type_depth + 1 == rhs_type_depth)
{
@ -154,9 +154,9 @@ SetPtr makeSetForConstantValue(const DataTypePtr & expression_type, const Field
WhichDataType rhs_which_type(value_type);
if (rhs_which_type.isArray())
result_block = createBlockFromCollection(value.get<const Array &>(), set_element_types, tranform_null_in);
result_block = createBlockFromCollection(value.get<const Array &>(), set_element_types, transform_null_in);
else if (rhs_which_type.isTuple())
result_block = createBlockFromCollection(value.get<const Tuple &>(), set_element_types, tranform_null_in);
result_block = createBlockFromCollection(value.get<const Tuple &>(), set_element_types, transform_null_in);
else
throw Exception(ErrorCodes::ILLEGAL_TYPE_OF_ARGUMENT,
"Unsupported type at the right-side of IN. Expected Array or Tuple. Actual {}",
@ -170,13 +170,15 @@ SetPtr makeSetForConstantValue(const DataTypePtr & expression_type, const Field
value_type->getName());
}
auto set = std::make_shared<Set>(size_limits_for_set, true /*fill_set_elements*/, tranform_null_in);
return result_block;
set->setHeader(result_block.cloneEmpty().getColumnsWithTypeAndName());
set->insertFromBlock(result_block.getColumnsWithTypeAndName());
set->finishInsert();
// auto set = std::make_shared<Set>(size_limits_for_set, true /*fill_set_elements*/, tranform_null_in);
return set;
// set->setHeader(result_block.cloneEmpty().getColumnsWithTypeAndName());
// set->insertFromBlock(result_block.getColumnsWithTypeAndName());
// set->finishInsert();
// return set;
}
}

View File

@ -21,10 +21,9 @@ using SetPtr = std::shared_ptr<Set>;
* @param expression_type - type of first argument of function IN.
* @param value - constant value of second argument of function IN.
* @param value_type - type of second argument of function IN.
* @param settings - query settings.
*
* @return SetPtr for constant value.
*/
SetPtr makeSetForConstantValue(const DataTypePtr & expression_type, const Field & value, const DataTypePtr & value_type, const Settings & settings);
Block makeSetForConstantValue(const DataTypePtr & expression_type, const Field & value, const DataTypePtr & value_type, bool transform_null_in);
}

View File

@ -29,7 +29,7 @@ public:
TypeIndex getDataType() const override { return TypeIndex::Set; }
MutableColumnPtr cloneDummy(size_t s_) const override { return ColumnSet::create(s_, data); }
ConstSetPtr getData() const { if (!data.isReady()) return nullptr; return data.get(); }
FutureSetPtr getData() const { return data; }
// Used only for debugging, making it DUMPABLE
Field operator[](size_t) const override { return {}; }

View File

@ -20,7 +20,7 @@ public:
bool isParametric() const override { return true; }
// Used for expressions analysis.
MutableColumnPtr createColumn() const override { return ColumnSet::create(0, FutureSet{}); }
MutableColumnPtr createColumn() const override { return ColumnSet::create(0, nullptr); }
// Used only for debugging, making it DUMPABLE
Field getDefault() const override { return Tuple(); }

View File

@ -55,9 +55,13 @@ public:
/// It is needed to perform type analysis without creation of set.
static constexpr auto name = FunctionInName<negative, global, null_is_skipped, ignore_set>::name;
static FunctionPtr create(ContextPtr)
FunctionIn(SizeLimits size_limits_, bool transform_null_in_)
: size_limits(std::move(size_limits_)), transform_null_in(transform_null_in_) {}
static FunctionPtr create(ContextPtr context)
{
return std::make_shared<FunctionIn>();
const auto & settings = context->getSettingsRef();
return std::make_shared<FunctionIn>(FutureSet::getSizeLimitsForSet(settings, false), settings.transform_null_in);
}
String getName() const override
@ -122,10 +126,15 @@ public:
tuple = typeid_cast<const ColumnTuple *>(materialized_tuple.get());
}
auto set = column_set->getData();
if (!set)
auto future_set = column_set->getData();
if (!future_set || !future_set->isFilled())
throw Exception(ErrorCodes::LOGICAL_ERROR, "Not-ready Set passed as the second argument for function '{}'", getName());
if (auto * for_tuple = typeid_cast<FutureSetFromTuple *>(future_set.get()))
if (!for_tuple->isReady())
for_tuple->buildForTuple(size_limits, transform_null_in);
auto set = future_set->get();
auto set_types = set->getDataTypes();
if (tuple && set_types.size() != 1 && set_types.size() == tuple->tupleSize())
@ -173,6 +182,10 @@ public:
return res;
}
private:
SizeLimits size_limits;
bool transform_null_in;
};
template<bool ignore_set>

View File

@ -446,7 +446,7 @@ FutureSetPtr makeExplicitSet(
if (const auto * low_cardinality_type = typeid_cast<const DataTypeLowCardinality *>(element_type.get()))
element_type = low_cardinality_type->getDictionaryType();
auto set_key = PreparedSetKey::forLiteral(*right_arg, set_element_types);
auto set_key = PreparedSetKey::forLiteral(right_arg->getTreeHash(), set_element_types);
if (auto set = prepared_sets.getFuture(set_key))
return set; /// Already prepared.
@ -1384,7 +1384,7 @@ FutureSetPtr ActionsMatcher::makeSet(const ASTFunction & node, Data & data, bool
{
if (no_subqueries)
return {};
auto set_key = PreparedSetKey::forSubquery(*right_in_operand);
auto set_key = PreparedSetKey::forSubquery(right_in_operand->getTreeHash());
if (auto set = data.prepared_sets->getFuture(set_key))
return set;

View File

@ -941,7 +941,7 @@ bool ExpressionActions::checkColumnIsAlwaysFalse(const String & column_name) con
if (const auto * column_set = checkAndGetColumn<const ColumnSet>(action.node->column.get()))
{
auto set = column_set->getData();
if (set && set->isCreated() && set->getTotalRowCount() == 0)
if (set && set->isReady() && set->get()->getTotalRowCount() == 0)
return true;
}
}

View File

@ -181,6 +181,18 @@ public:
// auto & subquery_for_set = prepared_sets->getSubquery(external_table_name);
// subquery_for_set.createSource(*interpreter, external_storage);
auto key = subquery_or_table_name->getColumnName();
auto set_key = PreparedSetKey::forSubquery(subquery_or_table_name->getTreeHash());
if (!prepared_sets->getFuture(set_key))
{
SubqueryForSet subquery_for_set;
subquery_for_set.key = std::move(key);
subquery_for_set.table = std::move(external_storage);
subquery_for_set.createSource(*interpreter);
prepared_sets->addFromSubquery(set_key, std::move(subquery_for_set));
}
else
prepared_sets->addStorageToSubquery(key, std::move(external_storage));
}

View File

@ -905,8 +905,8 @@ Block InterpreterSelectQuery::getSampleBlockImpl()
if (storage && !options.only_analyze)
{
query_analyzer->makeSetsForIndex(select_query.where());
query_analyzer->makeSetsForIndex(select_query.prewhere());
// query_analyzer->makeSetsForIndex(select_query.where());
// query_analyzer->makeSetsForIndex(select_query.prewhere());
query_info.prepared_sets = query_analyzer->getPreparedSets();
from_stage = storage->getQueryProcessingStage(context, options.to_stage, storage_snapshot, query_info);
@ -3088,7 +3088,12 @@ void InterpreterSelectQuery::executeExtremes(QueryPlan & query_plan)
void InterpreterSelectQuery::executeSubqueriesInSetsAndJoins(QueryPlan & query_plan)
{
addCreatingSetsStep(query_plan, prepared_sets, context);
auto step = std::make_unique<DelayedCreatingSetsStep>(
query_plan.getCurrentDataStream(),
prepared_sets->detachSubqueries(context),
context);
query_plan.addStep(std::move(step));
}

View File

@ -11,7 +11,7 @@
namespace DB
{
PreparedSetKey PreparedSetKey::forLiteral(const IAST & ast, DataTypes types_)
PreparedSetKey PreparedSetKey::forLiteral(Hash hash, DataTypes types_)
{
/// Remove LowCardinality types from type list because Set doesn't support LowCardinality keys now,
/// just converts LowCardinality to ordinary types.
@ -19,15 +19,15 @@ PreparedSetKey PreparedSetKey::forLiteral(const IAST & ast, DataTypes types_)
type = recursiveRemoveLowCardinality(type);
PreparedSetKey key;
key.ast_hash = ast.getTreeHash();
key.ast_hash = hash;
key.types = std::move(types_);
return key;
}
PreparedSetKey PreparedSetKey::forSubquery(const IAST & ast)
PreparedSetKey PreparedSetKey::forSubquery(Hash hash)
{
PreparedSetKey key;
key.ast_hash = ast.getTreeHash();
key.ast_hash = hash;
return key;
}
@ -155,9 +155,9 @@ FutureSetPtr PreparedSets::getFuture(const PreparedSetKey & key) const
// return it->second.get();
// }
// std::vector<FutureSet> PreparedSets::getByTreeHash(IAST::Hash ast_hash) const
// std::vector<FutureSetPtr> PreparedSets::getByTreeHash(IAST::Hash ast_hash) const
// {
// std::vector<FutureSet> res;
// std::vector<FutureSetPtr> res;
// for (const auto & it : this->sets)
// {
// if (it.first.ast_hash == ast_hash)
@ -166,7 +166,7 @@ FutureSetPtr PreparedSets::getFuture(const PreparedSetKey & key) const
// return res;
// }
PreparedSets::SubqueriesForSets PreparedSets::detachSubqueries()
PreparedSets::SubqueriesForSets PreparedSets::detachSubqueries(const ContextPtr &)
{
auto res = std::move(subqueries);
subqueries = SubqueriesForSets();
@ -221,6 +221,8 @@ std::unique_ptr<QueryPlan> FutureSetFromSubquery::buildPlan(const ContextPtr & c
if (set)
return nullptr;
std::cerr << StackTrace().toString() << std::endl;
auto set_cache = context->getPreparedSetsCache();
if (set_cache)
{
@ -277,4 +279,10 @@ SizeLimits FutureSet::getSizeLimitsForSet(const Settings & settings, bool ordere
return ordered_set ? getSizeLimitsForOrderedSet(settings) : getSizeLimitsForUnorderedSet(settings);
}
FutureSetFromTuple::FutureSetFromTuple(Block block_) : block(std::move(block_)) {}
FutureSetFromSubquery::FutureSetFromSubquery(SubqueryForSet subquery_) : subquery(std::move(subquery_)) {}
FutureSetFromStorage::FutureSetFromStorage(SetPtr set_) : set(std::move(set_)) {}
};

View File

@ -65,6 +65,7 @@ public:
virtual ~FutureSet() = default;
virtual bool isReady() const = 0;
virtual bool isFilled() const = 0;
virtual SetPtr get() const = 0;
virtual SetPtr buildOrderedSetInplace(const ContextPtr & context) = 0;
@ -81,34 +82,41 @@ public:
FutureSetFromTuple(Block block_);
bool isReady() const override { return set != nullptr; }
bool isFilled() const override { return true; }
SetPtr get() const override { return set; }
SetPtr buildOrderedSetInplace(const ContextPtr & context) override
{
fill(context, true);
const auto & settings = context->getSettingsRef();
auto size_limits = getSizeLimitsForSet(settings, true);
fill(size_limits, settings.transform_null_in, true);
return set;
}
std::unique_ptr<QueryPlan> build(const ContextPtr & context) override
{
fill(context, false);
const auto & settings = context->getSettingsRef();
auto size_limits = getSizeLimitsForSet(settings, false);
fill(size_limits, settings.transform_null_in, false);
return nullptr;
}
void buildForTuple(SizeLimits size_limits, bool transform_null_in)
{
fill(size_limits, transform_null_in, false);
}
private:
Block block;
SetPtr set;
void fill(const ContextPtr & context, bool create_ordered_set)
void fill(SizeLimits size_limits, bool transform_null_in, bool create_ordered_set)
{
if (set)
return;
const auto & settings = context->getSettingsRef();
auto size_limits = getSizeLimitsForSet(settings, create_ordered_set);
set = std::make_shared<Set>(size_limits, create_ordered_set, settings.transform_null_in);
set = std::make_shared<Set>(size_limits, create_ordered_set, transform_null_in);
set->setHeader(block.cloneEmpty().getColumnsWithTypeAndName());
set->insertFromBlock(block.getColumnsWithTypeAndName());
set->finishInsert();
@ -151,6 +159,7 @@ public:
FutureSetFromSubquery(SubqueryForSet subquery_);
bool isReady() const override { return set != nullptr; }
bool isFilled() const override { return isReady(); }
SetPtr get() const override { return set; }
SetPtr buildOrderedSetInplace(const ContextPtr & context) override
@ -190,6 +199,7 @@ public:
FutureSetFromStorage(SetPtr set_); // : set(std::move(set_) {}
bool isReady() const override { return set != nullptr; }
bool isFilled() const override { return isReady(); }
SetPtr get() const override { return set; }
SetPtr buildOrderedSetInplace(const ContextPtr &) override
@ -229,23 +239,25 @@ private:
struct PreparedSetKey
{
using Hash = std::pair<UInt64, UInt64>;
/// Prepared sets for tuple literals are indexed by the hash of the tree contents and by the desired
/// data types of set elements (two different Sets can be required for two tuples with the same contents
/// if left hand sides of the IN operators have different types).
static PreparedSetKey forLiteral(const IAST & ast, DataTypes types_);
static PreparedSetKey forLiteral(Hash hash, DataTypes types_);
/// Prepared sets for subqueries are indexed only by the AST contents because the type of the resulting
/// set is fully determined by the subquery.
static PreparedSetKey forSubquery(const IAST & ast);
static PreparedSetKey forSubquery(Hash hash);
IAST::Hash ast_hash;
Hash ast_hash;
DataTypes types; /// Empty for subqueries.
bool operator==(const PreparedSetKey & other) const;
String toString() const;
struct Hash
struct Hashing
{
UInt64 operator()(const PreparedSetKey & key) const { return key.ast_hash.first; }
};
@ -272,16 +284,18 @@ public:
/// Get subqueries and clear them.
/// We need to build a plan for subqueries just once. That's why we can clear them after accessing them.
/// SetPtr would still be available for consumers of PreparedSets.
SubqueriesForSets detachSubqueries();
SubqueriesForSets detachSubqueries(const ContextPtr &);
/// Returns all sets that match the given ast hash not checking types
/// Used in KeyCondition and MergeTreeIndexConditionBloomFilter to make non exact match for types in PreparedSetKey
std::vector<FutureSet> getByTreeHash(IAST::Hash ast_hash) const;
//std::vector<FutureSetPtr> getByTreeHash(IAST::Hash ast_hash) const;
const std::unordered_map<PreparedSetKey, FutureSetPtr, PreparedSetKey::Hashing> & getSets() const { return sets; }
bool empty() const;
private:
std::unordered_map<PreparedSetKey, FutureSetPtr, PreparedSetKey::Hash> sets;
std::unordered_map<PreparedSetKey, FutureSetPtr, PreparedSetKey::Hashing> sets;
/// This is the information required for building sets
SubqueriesForSets subqueries;

View File

@ -11,6 +11,8 @@
#include <Analyzer/ConstantNode.h>
#include <Analyzer/FunctionNode.h>
#include <Analyzer/TableNode.h>
#include <DataTypes/DataTypeTuple.h>
#include <Planner/Planner.h>
namespace DB
{
@ -26,8 +28,9 @@ namespace
class CollectSetsVisitor : public ConstInDepthQueryTreeVisitor<CollectSetsVisitor>
{
public:
explicit CollectSetsVisitor(PlannerContext & planner_context_)
explicit CollectSetsVisitor(PlannerContext & planner_context_, const SelectQueryOptions & select_query_options_)
: planner_context(planner_context_)
, select_query_options(select_query_options_)
{}
void visitImpl(const QueryTreeNodePtr & node)
@ -42,10 +45,12 @@ public:
const auto & settings = planner_context.getQueryContext()->getSettingsRef();
String set_key = planner_context.createSetKey(in_second_argument);
// String set_key = planner_context.createSetKey(in_second_argument);
if (planner_context.hasSet(set_key))
return;
// if (planner_context.hasSet(set_key))
// return;
auto & sets = planner_context.getPreparedSets();
/// Tables and table functions are replaced with subquery at Analysis stage, except special Set table.
auto * second_argument_table = in_second_argument->as<TableNode>();
@ -54,7 +59,9 @@ public:
if (storage_set)
{
/// Handle storage_set as ready set.
planner_context.registerSet(set_key, PlannerSet(FutureSet(storage_set->getSet())));
auto set_key = PreparedSetKey::forSubquery(in_second_argument->getTreeHash());
sets.addFromStorage(set_key, storage_set->getSet());
//planner_context.registerSet(set_key, PlannerSet(FutureSet(storage_set->getSet())));
}
else if (const auto * constant_node = in_second_argument->as<ConstantNode>())
{
@ -62,14 +69,47 @@ public:
in_first_argument->getResultType(),
constant_node->getValue(),
constant_node->getResultType(),
settings);
settings.transform_null_in);
planner_context.registerSet(set_key, PlannerSet(FutureSet(std::move(set))));
DataTypes set_element_types = {in_first_argument->getResultType()};
const auto * left_tuple_type = typeid_cast<const DataTypeTuple *>(set_element_types.front().get());
if (left_tuple_type && left_tuple_type->getElements().size() != 1)
set_element_types = left_tuple_type->getElements();
for (auto & element_type : set_element_types)
if (const auto * low_cardinality_type = typeid_cast<const DataTypeLowCardinality *>(element_type.get()))
element_type = low_cardinality_type->getDictionaryType();
auto set_key = PreparedSetKey::forLiteral(in_second_argument->getTreeHash(), set_element_types);
sets.addFromTuple(set_key, std::move(set));
//planner_context.registerSet(set_key, PlannerSet(FutureSet(std::move(set))));
}
else if (in_second_argument_node_type == QueryTreeNodeType::QUERY ||
in_second_argument_node_type == QueryTreeNodeType::UNION)
{
planner_context.registerSet(set_key, PlannerSet(in_second_argument));
auto set_key = PreparedSetKey::forSubquery(in_second_argument->getTreeHash());
auto subquery_options = select_query_options.subquery();
Planner subquery_planner(
in_second_argument,
subquery_options,
planner_context.getGlobalPlannerContext());
subquery_planner.buildQueryPlanIfNeeded();
// const auto & settings = planner_context.getQueryContext()->getSettingsRef();
// SizeLimits size_limits_for_set = {settings.max_rows_in_set, settings.max_bytes_in_set, settings.set_overflow_mode};
// bool tranform_null_in = settings.transform_null_in;
// auto set = std::make_shared<Set>(size_limits_for_set, false /*fill_set_elements*/, tranform_null_in);
SubqueryForSet subquery_for_set;
subquery_for_set.key = planner_context.createSetKey(in_second_argument);
subquery_for_set.source = std::make_unique<QueryPlan>(std::move(subquery_planner).extractQueryPlan());
sets.addFromSubquery(set_key, std::move(subquery_for_set));
//planner_context.registerSet(set_key, PlannerSet(in_second_argument));
}
else
{
@ -87,13 +127,14 @@ public:
private:
PlannerContext & planner_context;
const SelectQueryOptions & select_query_options;
};
}
void collectSets(const QueryTreeNodePtr & node, PlannerContext & planner_context)
void collectSets(const QueryTreeNodePtr & node, PlannerContext & planner_context, const SelectQueryOptions & select_query_options)
{
CollectSetsVisitor visitor(planner_context);
CollectSetsVisitor visitor(planner_context, select_query_options);
visitor.visit(node);
}

View File

@ -7,9 +7,11 @@
namespace DB
{
struct SelectQueryOptions;
/** Collect prepared sets and sets for subqueries that are necessary to execute IN function and its variations.
* Collected sets are registered in planner context.
*/
void collectSets(const QueryTreeNodePtr & node, PlannerContext & planner_context);
void collectSets(const QueryTreeNodePtr & node, PlannerContext & planner_context, const SelectQueryOptions & select_query_options);
}

View File

@ -878,50 +878,50 @@ void addOffsetStep(QueryPlan & query_plan, const QueryAnalysisResult & query_ana
query_plan.addStep(std::move(offsets_step));
}
void addBuildSubqueriesForSetsStepIfNeeded(QueryPlan & query_plan,
const SelectQueryOptions & select_query_options,
const PlannerContextPtr & planner_context,
const std::vector<ActionsDAGPtr> & result_actions_to_execute)
{
PreparedSets::SubqueriesForSets subqueries_for_sets;
// void addBuildSubqueriesForSetsStepIfNeeded(QueryPlan & query_plan,
// const SelectQueryOptions & select_query_options,
// const PlannerContextPtr & planner_context,
// const std::vector<ActionsDAGPtr> & result_actions_to_execute)
// {
// PreparedSets::SubqueriesForSets subqueries_for_sets;
for (const auto & actions_to_execute : result_actions_to_execute)
{
for (const auto & node : actions_to_execute->getNodes())
{
const auto & set_key = node.result_name;
auto * planner_set = planner_context->getSetOrNull(set_key);
if (!planner_set)
continue;
// for (const auto & actions_to_execute : result_actions_to_execute)
// {
// for (const auto & node : actions_to_execute->getNodes())
// {
// const auto & set_key = node.result_name;
// auto * planner_set = planner_context->getSetOrNull(set_key);
// if (!planner_set)
// continue;
if (planner_set->getSet().isCreated() || !planner_set->getSubqueryNode())
continue;
// if (planner_set->getSet().isCreated() || !planner_set->getSubqueryNode())
// continue;
auto subquery_options = select_query_options.subquery();
Planner subquery_planner(
planner_set->getSubqueryNode(),
subquery_options,
planner_context->getGlobalPlannerContext());
subquery_planner.buildQueryPlanIfNeeded();
// auto subquery_options = select_query_options.subquery();
// Planner subquery_planner(
// planner_set->getSubqueryNode(),
// subquery_options,
// planner_context->getGlobalPlannerContext());
// subquery_planner.buildQueryPlanIfNeeded();
const auto & settings = planner_context->getQueryContext()->getSettingsRef();
SizeLimits size_limits_for_set = {settings.max_rows_in_set, settings.max_bytes_in_set, settings.set_overflow_mode};
bool tranform_null_in = settings.transform_null_in;
auto set = std::make_shared<Set>(size_limits_for_set, false /*fill_set_elements*/, tranform_null_in);
// const auto & settings = planner_context->getQueryContext()->getSettingsRef();
// SizeLimits size_limits_for_set = {settings.max_rows_in_set, settings.max_bytes_in_set, settings.set_overflow_mode};
// bool tranform_null_in = settings.transform_null_in;
// auto set = std::make_shared<Set>(size_limits_for_set, false /*fill_set_elements*/, tranform_null_in);
SubqueryForSet subquery_for_set;
subquery_for_set.key = set_key;
subquery_for_set.set_in_progress = set;
subquery_for_set.set = planner_set->getSet();
subquery_for_set.promise_to_fill_set = planner_set->extractPromiseToBuildSet();
subquery_for_set.source = std::make_unique<QueryPlan>(std::move(subquery_planner).extractQueryPlan());
// SubqueryForSet subquery_for_set;
// subquery_for_set.key = set_key;
// subquery_for_set.set_in_progress = set;
// subquery_for_set.set = planner_set->getSet();
// subquery_for_set.promise_to_fill_set = planner_set->extractPromiseToBuildSet();
// subquery_for_set.source = std::make_unique<QueryPlan>(std::move(subquery_planner).extractQueryPlan());
subqueries_for_sets.emplace(set_key, std::move(subquery_for_set));
}
}
// subqueries_for_sets.emplace(set_key, std::move(subquery_for_set));
// }
// }
addCreatingSetsStep(query_plan, std::move(subqueries_for_sets), planner_context->getQueryContext());
}
// addCreatingSetsStep(query_plan, std::move(subqueries_for_sets), planner_context->getQueryContext());
// }
/// Support for `additional_result_filter` setting
void addAdditionalFilterStepIfNeeded(QueryPlan & query_plan,
@ -951,7 +951,7 @@ void addAdditionalFilterStepIfNeeded(QueryPlan & query_plan,
auto storage = std::make_shared<StorageDummy>(StorageID{"dummy", "dummy"}, fake_column_descriptions);
auto fake_table_expression = std::make_shared<TableNode>(std::move(storage), query_context);
auto filter_info = buildFilterInfo(additional_result_filter_ast, fake_table_expression, planner_context, std::move(fake_name_set));
auto filter_info = buildFilterInfo(additional_result_filter_ast, fake_table_expression, planner_context, select_query_options, std::move(fake_name_set));
if (!filter_info.actions || !query_plan.isInitialized())
return;
@ -1179,7 +1179,7 @@ void Planner::buildPlanForQueryNode()
}
checkStoragesSupportTransactions(planner_context);
collectSets(query_tree, *planner_context);
collectSets(query_tree, *planner_context, select_query_options);
collectTableExpressionData(query_tree, planner_context);
const auto & settings = query_context->getSettingsRef();
@ -1467,7 +1467,17 @@ void Planner::buildPlanForQueryNode()
}
if (!select_query_options.only_analyze)
addBuildSubqueriesForSetsStepIfNeeded(query_plan, select_query_options, planner_context, result_actions_to_execute);
{
auto step = std::make_unique<DelayedCreatingSetsStep>(
query_plan.getCurrentDataStream(),
planner_context->getPreparedSets().detachSubqueries(planner_context->getQueryContext()),
planner_context->getQueryContext());
query_plan.addStep(std::move(step));
//addCreatingSetsStep(query_plan, planner_context->getPreparedSets().detachSubqueries(planner_context->getQueryContext()), planner_context->getQueryContext());
//addBuildSubqueriesForSetsStepIfNeeded(query_plan, select_query_options, planner_context, result_actions_to_execute);
}
}
SelectQueryInfo Planner::buildSelectQueryInfo() const

View File

@ -16,6 +16,7 @@
#include <DataTypes/DataTypeSet.h>
#include <Common/FieldVisitorToString.h>
#include <DataTypes/DataTypeTuple.h>
#include <Columns/ColumnSet.h>
#include <Columns/ColumnConst.h>
@ -623,33 +624,51 @@ PlannerActionsVisitorImpl::NodeNameAndNodeMinLevel PlannerActionsVisitorImpl::vi
PlannerActionsVisitorImpl::NodeNameAndNodeMinLevel PlannerActionsVisitorImpl::makeSetForInFunction(const QueryTreeNodePtr & node)
{
const auto & function_node = node->as<FunctionNode &>();
auto in_first_argument = function_node.getArguments().getNodes().at(0);
auto in_second_argument = function_node.getArguments().getNodes().at(1);
auto set_key = planner_context->createSetKey(in_second_argument);
const auto & planner_set = planner_context->getSetOrThrow(set_key);
//auto set_key = planner_context->createSetKey(in_second_argument);
DataTypes set_element_types = {in_first_argument->getResultType()};
const auto * left_tuple_type = typeid_cast<const DataTypeTuple *>(set_element_types.front().get());
if (left_tuple_type && left_tuple_type->getElements().size() != 1)
set_element_types = left_tuple_type->getElements();
for (auto & element_type : set_element_types)
if (const auto * low_cardinality_type = typeid_cast<const DataTypeLowCardinality *>(element_type.get()))
element_type = low_cardinality_type->getDictionaryType();
auto set_key = PreparedSetKey::forLiteral(in_second_argument->getTreeHash(), set_element_types);
auto set = planner_context->getPreparedSets().getFuture(set_key);
if (!set)
throw Exception(ErrorCodes::LOGICAL_ERROR,
"No set is registered for key {}",
set_key.toString());
ColumnWithTypeAndName column;
column.name = set_key;
column.name = planner_context->createSetKey(in_second_argument);
column.type = std::make_shared<DataTypeSet>();
bool set_is_created = planner_set.getSet().isCreated();
auto column_set = ColumnSet::create(1, planner_set.getSet());
bool set_is_created = set->isFilled();
auto column_set = ColumnSet::create(1, std::move(set));
if (set_is_created)
column.column = ColumnConst::create(std::move(column_set), 1);
else
column.column = std::move(column_set);
actions_stack[0].addConstantIfNecessary(set_key, column);
actions_stack[0].addConstantIfNecessary(column.name, column);
size_t actions_stack_size = actions_stack.size();
for (size_t i = 1; i < actions_stack_size; ++i)
{
auto & actions_stack_node = actions_stack[i];
actions_stack_node.addInputConstantColumnIfNecessary(set_key, column);
actions_stack_node.addInputConstantColumnIfNecessary(column.name, column);
}
return {set_key, 0};
return {column.name, 0};
}
PlannerActionsVisitorImpl::NodeNameAndNodeMinLevel PlannerActionsVisitorImpl::visitIndexHintFunction(const QueryTreeNodePtr & node)

View File

@ -126,49 +126,49 @@ PlannerContext::SetKey PlannerContext::createSetKey(const QueryTreeNodePtr & set
return "__set_" + toString(set_source_hash.first) + '_' + toString(set_source_hash.second);
}
void PlannerContext::registerSet(const SetKey & key, PlannerSet planner_set)
{
if (!planner_set.getSet().isValid())
throw Exception(ErrorCodes::LOGICAL_ERROR, "Set must be initialized");
// void PlannerContext::registerSet(const SetKey & key, PlannerSet planner_set)
// {
// if (!planner_set.getSet().isValid())
// throw Exception(ErrorCodes::LOGICAL_ERROR, "Set must be initialized");
const auto & subquery_node = planner_set.getSubqueryNode();
if (subquery_node)
{
auto node_type = subquery_node->getNodeType();
// const auto & subquery_node = planner_set.getSubqueryNode();
// if (subquery_node)
// {
// auto node_type = subquery_node->getNodeType();
if (node_type != QueryTreeNodeType::QUERY &&
node_type != QueryTreeNodeType::UNION)
throw Exception(ErrorCodes::LOGICAL_ERROR,
"Invalid node for set table expression. Expected query or union. Actual {}",
subquery_node->formatASTForErrorMessage());
}
// if (node_type != QueryTreeNodeType::QUERY &&
// node_type != QueryTreeNodeType::UNION)
// throw Exception(ErrorCodes::LOGICAL_ERROR,
// "Invalid node for set table expression. Expected query or union. Actual {}",
// subquery_node->formatASTForErrorMessage());
// }
set_key_to_set.emplace(key, std::move(planner_set));
}
// set_key_to_set.emplace(key, std::move(planner_set));
// }
bool PlannerContext::hasSet(const SetKey & key) const
{
return set_key_to_set.contains(key);
}
// bool PlannerContext::hasSet(const SetKey & key) const
// {
// return set_key_to_set.contains(key);
// }
const PlannerSet & PlannerContext::getSetOrThrow(const SetKey & key) const
{
auto it = set_key_to_set.find(key);
if (it == set_key_to_set.end())
throw Exception(ErrorCodes::LOGICAL_ERROR,
"No set is registered for key {}",
key);
// const PlannerSet & PlannerContext::getSetOrThrow(const SetKey & key) const
// {
// auto it = set_key_to_set.find(key);
// if (it == set_key_to_set.end())
// throw Exception(ErrorCodes::LOGICAL_ERROR,
// "No set is registered for key {}",
// key);
return it->second;
}
// return it->second;
// }
PlannerSet * PlannerContext::getSetOrNull(const SetKey & key)
{
auto it = set_key_to_set.find(key);
if (it == set_key_to_set.end())
return nullptr;
// PlannerSet * PlannerContext::getSetOrNull(const SetKey & key)
// {
// auto it = set_key_to_set.find(key);
// if (it == set_key_to_set.end())
// return nullptr;
return &it->second;
}
// return &it->second;
// }
}

View File

@ -55,43 +55,47 @@ using GlobalPlannerContextPtr = std::shared_ptr<GlobalPlannerContext>;
*/
class PlannerSet
{
public:
/// Construct planner set that is ready for execution
explicit PlannerSet(FutureSetPtr set_)
: set(std::move(set_))
{}
/// Construct planner set with set and subquery node
explicit PlannerSet(QueryTreeNodePtr subquery_node_)
//: set(promise_to_build_set.get_future())
: subquery_node(std::move(subquery_node_))
{}
/// Get a reference to a set that might be not built yet
const FutureSetPtr & getSet() const
{
return set;
}
/// Get subquery node
const QueryTreeNodePtr & getSubqueryNode() const
{
return subquery_node;
}
/// This promise will be fulfilled when set is built and all FutureSet objects will become ready
// std::promise<SetPtr> extractPromiseToBuildSet()
// {
// return std::move(promise_to_build_set);
// }
private:
//std::promise<SetPtr> promise_to_build_set;
FutureSetPtr set;
QueryTreeNodePtr subquery_node;
};
// {
// public:
// /// Construct planner set that is ready for execution
// explicit PlannerSet(FutureSetPtr set_)
// : set(std::move(set_))
// {}
// /// Construct planner set with set and subquery node
// explicit PlannerSet(QueryTreeNodePtr subquery_node_)
// //: set(promise_to_build_set.get_future())
// : subquery_node(std::move(subquery_node_))
// {}
// /// Get a reference to a set that might be not built yet
// const FutureSetPtr & getSet() const
// {
// return set;
// }
// /// Get subquery node
// const QueryTreeNodePtr & getSubqueryNode() const
// {
// return subquery_node;
// }
// /// This promise will be fulfilled when set is built and all FutureSet objects will become ready
// // std::promise<SetPtr> extractPromiseToBuildSet()
// // {
// // return std::move(promise_to_build_set);
// // }
// private:
// //std::promise<SetPtr> promise_to_build_set;
// FutureSetPtr set;
// QueryTreeNodePtr subquery_node;
// };
class PlannerContext
{
public:
@ -179,28 +183,30 @@ public:
using SetKey = std::string;
using SetKeyToSet = std::unordered_map<String, PlannerSet>;
// using SetKeyToSet = std::unordered_map<String, PlannerSet>;
/// Create set key for set source node
// /// Create set key for set source node
static SetKey createSetKey(const QueryTreeNodePtr & set_source_node);
/// Register set for set key
void registerSet(const SetKey & key, PlannerSet planner_set);
// /// Register set for set key
// void registerSet(const SetKey & key, PlannerSet planner_set);
/// Returns true if set is registered for key, false otherwise
bool hasSet(const SetKey & key) const;
// /// Returns true if set is registered for key, false otherwise
// bool hasSet(const SetKey & key) const;
/// Get set for key, if no set is registered logical exception is thrown
const PlannerSet & getSetOrThrow(const SetKey & key) const;
// /// Get set for key, if no set is registered logical exception is thrown
// const PlannerSet & getSetOrThrow(const SetKey & key) const;
/// Get set for key, if no set is registered null is returned
PlannerSet * getSetOrNull(const SetKey & key);
// /// Get set for key, if no set is registered null is returned
// PlannerSet * getSetOrNull(const SetKey & key);
/// Get registered sets
const SetKeyToSet & getRegisteredSets() const
{
return set_key_to_set;
}
// /// Get registered sets
// const SetKeyToSet & getRegisteredSets() const
// {
// return set_key_to_set;
// }
PreparedSets & getPreparedSets() { return prepared_sets; }
private:
/// Query context
@ -216,8 +222,7 @@ private:
std::unordered_map<QueryTreeNodePtr, TableExpressionData> table_expression_node_to_data;
/// Set key to set
SetKeyToSet set_key_to_set;
PreparedSets prepared_sets;
};
using PlannerContextPtr = std::shared_ptr<PlannerContext>;

View File

@ -388,7 +388,8 @@ void updatePrewhereOutputsIfNeeded(SelectQueryInfo & table_expression_query_info
FilterDAGInfo buildRowPolicyFilterIfNeeded(const StoragePtr & storage,
SelectQueryInfo & table_expression_query_info,
PlannerContextPtr & planner_context)
PlannerContextPtr & planner_context,
const SelectQueryOptions & select_query_options)
{
auto storage_id = storage->getStorageID();
const auto & query_context = planner_context->getQueryContext();
@ -397,12 +398,13 @@ FilterDAGInfo buildRowPolicyFilterIfNeeded(const StoragePtr & storage,
if (!row_policy_filter)
return {};
return buildFilterInfo(row_policy_filter->expression, table_expression_query_info.table_expression, planner_context);
return buildFilterInfo(row_policy_filter->expression, table_expression_query_info.table_expression, planner_context, select_query_options);
}
FilterDAGInfo buildCustomKeyFilterIfNeeded(const StoragePtr & storage,
SelectQueryInfo & table_expression_query_info,
PlannerContextPtr & planner_context)
PlannerContextPtr & planner_context,
const SelectQueryOptions & select_query_options)
{
const auto & query_context = planner_context->getQueryContext();
const auto & settings = query_context->getSettingsRef();
@ -428,14 +430,15 @@ FilterDAGInfo buildCustomKeyFilterIfNeeded(const StoragePtr & storage,
*storage,
query_context);
return buildFilterInfo(parallel_replicas_custom_filter_ast, table_expression_query_info.table_expression, planner_context);
return buildFilterInfo(parallel_replicas_custom_filter_ast, table_expression_query_info.table_expression, planner_context, select_query_options);
}
/// Apply filters from additional_table_filters setting
FilterDAGInfo buildAdditionalFiltersIfNeeded(const StoragePtr & storage,
const String & table_expression_alias,
SelectQueryInfo & table_expression_query_info,
PlannerContextPtr & planner_context)
PlannerContextPtr & planner_context,
const SelectQueryOptions & select_query_options)
{
const auto & query_context = planner_context->getQueryContext();
const auto & settings = query_context->getSettingsRef();
@ -469,7 +472,7 @@ FilterDAGInfo buildAdditionalFiltersIfNeeded(const StoragePtr & storage,
return {};
table_expression_query_info.additional_filter_ast = additional_filter_ast;
return buildFilterInfo(additional_filter_ast, table_expression_query_info.table_expression, planner_context);
return buildFilterInfo(additional_filter_ast, table_expression_query_info.table_expression, planner_context, select_query_options);
}
JoinTreeQueryPlan buildQueryPlanForTableExpression(QueryTreeNodePtr table_expression,
@ -679,14 +682,14 @@ JoinTreeQueryPlan buildQueryPlanForTableExpression(QueryTreeNodePtr table_expres
}
};
auto row_policy_filter_info = buildRowPolicyFilterIfNeeded(storage, table_expression_query_info, planner_context);
auto row_policy_filter_info = buildRowPolicyFilterIfNeeded(storage, table_expression_query_info, planner_context, select_query_options);
add_filter(row_policy_filter_info, "Row-level security filter");
if (query_context->getParallelReplicasMode() == Context::ParallelReplicasMode::CUSTOM_KEY)
{
if (settings.parallel_replicas_count > 1)
{
auto parallel_replicas_custom_key_filter_info = buildCustomKeyFilterIfNeeded(storage, table_expression_query_info, planner_context);
auto parallel_replicas_custom_key_filter_info = buildCustomKeyFilterIfNeeded(storage, table_expression_query_info, planner_context, select_query_options);
add_filter(parallel_replicas_custom_key_filter_info, "Parallel replicas custom key filter");
}
else
@ -701,7 +704,7 @@ JoinTreeQueryPlan buildQueryPlanForTableExpression(QueryTreeNodePtr table_expres
}
const auto & table_expression_alias = table_expression->getAlias();
auto additional_filters_info = buildAdditionalFiltersIfNeeded(storage, table_expression_alias, table_expression_query_info, planner_context);
auto additional_filters_info = buildAdditionalFiltersIfNeeded(storage, table_expression_alias, table_expression_query_info, planner_context, select_query_options);
add_filter(additional_filters_info, "additional filter");
from_stage = storage->getQueryProcessingStage(query_context, select_query_options.to_stage, storage_snapshot, table_expression_query_info);

View File

@ -426,6 +426,7 @@ SelectQueryInfo buildSelectQueryInfo(const QueryTreeNodePtr & query_tree, const
FilterDAGInfo buildFilterInfo(ASTPtr filter_expression,
const QueryTreeNodePtr & table_expression,
PlannerContextPtr & planner_context,
const SelectQueryOptions & select_query_options,
NameSet table_expression_required_names_without_filter)
{
const auto & query_context = planner_context->getQueryContext();
@ -443,7 +444,7 @@ FilterDAGInfo buildFilterInfo(ASTPtr filter_expression,
}
collectSourceColumns(filter_query_tree, planner_context);
collectSets(filter_query_tree, *planner_context);
collectSets(filter_query_tree, *planner_context, select_query_options);
auto filter_actions_dag = std::make_shared<ActionsDAG>();

View File

@ -82,6 +82,7 @@ SelectQueryInfo buildSelectQueryInfo(const QueryTreeNodePtr & query_tree, const
FilterDAGInfo buildFilterInfo(ASTPtr filter_expression,
const QueryTreeNodePtr & table_expression,
PlannerContextPtr & planner_context,
const SelectQueryOptions & select_query_options,
NameSet table_expression_required_names_without_filter = {});
ASTPtr parseAdditionalResultFilter(const Settings & settings);

View File

@ -38,16 +38,16 @@ CreatingSetStep::CreatingSetStep(
SizeLimits network_transfer_limits_,
ContextPtr context_)
: ITransformingStep(input_stream_, Block{}, getTraits())
, WithContext(context_)
, description(std::move(description_))
, subquery_for_set(std::move(subquery_for_set_))
, network_transfer_limits(std::move(network_transfer_limits_))
, context(std::move(context_))
{
}
void CreatingSetStep::transformPipeline(QueryPipelineBuilder & pipeline, const BuildQueryPipelineSettings &)
{
pipeline.addCreatingSetsTransform(getOutputStream().header, std::move(subquery_for_set), network_transfer_limits, getContext());
pipeline.addCreatingSetsTransform(getOutputStream().header, std::move(subquery_for_set), network_transfer_limits, context);
}
void CreatingSetStep::updateOutputStream()
@ -60,7 +60,7 @@ void CreatingSetStep::describeActions(FormatSettings & settings) const
String prefix(settings.offset, ' ');
settings.out << prefix;
if (subquery_for_set.set_in_progress)
if (subquery_for_set.set)
settings.out << "Set: ";
settings.out << description << '\n';
@ -68,7 +68,7 @@ void CreatingSetStep::describeActions(FormatSettings & settings) const
void CreatingSetStep::describeActions(JSONBuilder::JSONMap & map) const
{
if (subquery_for_set.set_in_progress)
if (subquery_for_set.set)
map.add("Set", description);
}
@ -130,22 +130,14 @@ void addCreatingSetsStep(QueryPlan & query_plan, PreparedSets::SubqueriesForSets
plans.emplace_back(std::make_unique<QueryPlan>(std::move(query_plan)));
query_plan = QueryPlan();
for (auto & [description, subquery_for_set] : subqueries_for_sets)
for (auto & [description, future_set] : subqueries_for_sets)
{
if (!subquery_for_set.hasSource())
if (future_set->isReady())
continue;
auto plan = subquery_for_set.detachSource();
const Settings & settings = context->getSettingsRef();
auto creating_set = std::make_unique<CreatingSetStep>(
plan->getCurrentDataStream(),
description,
std::move(subquery_for_set),
SizeLimits(settings.max_rows_to_transfer, settings.max_bytes_to_transfer, settings.transfer_overflow_mode),
context);
creating_set->setStepDescription("Create set for subquery");
plan->addStep(std::move(creating_set));
auto plan = future_set->build(context);
if (!plan)
continue;
input_streams.emplace_back(plan->getCurrentDataStream());
plans.emplace_back(std::move(plan));
@ -162,12 +154,56 @@ void addCreatingSetsStep(QueryPlan & query_plan, PreparedSets::SubqueriesForSets
query_plan.unitePlans(std::move(creating_sets), std::move(plans));
}
//void addCreatingSetsStep(QueryPlan & query_plan, PreparedSets::SubqueriesForSets subqueries_for_sets, ContextPtr context)
std::vector<std::unique_ptr<QueryPlan>> DelayedCreatingSetsStep::makePlansForSets(DelayedCreatingSetsStep && step)
{
// DataStreams input_streams;
// input_streams.emplace_back(query_plan.getCurrentDataStream());
std::vector<std::unique_ptr<QueryPlan>> plans;
// plans.emplace_back(std::make_unique<QueryPlan>(std::move(query_plan)));
// query_plan = QueryPlan();
for (auto & [description, future_set] : step.subqueries_for_sets)
{
if (future_set->isReady())
continue;
auto plan = future_set->build(step.context);
if (!plan)
continue;
plan->optimize(QueryPlanOptimizationSettings::fromContext(step.context));
//input_streams.emplace_back(plan->getCurrentDataStream());
plans.emplace_back(std::move(plan));
}
return plans;
}
void addCreatingSetsStep(QueryPlan & query_plan, PreparedSetsPtr prepared_sets, ContextPtr context)
{
if (!prepared_sets || prepared_sets->empty())
return;
addCreatingSetsStep(query_plan, prepared_sets->detachSubqueries(), context);
addCreatingSetsStep(query_plan, prepared_sets->detachSubqueries(context), context);
}
DelayedCreatingSetsStep::DelayedCreatingSetsStep(
DataStream input_stream, PreparedSets::SubqueriesForSets subqueries_for_sets_, ContextPtr context_)
: subqueries_for_sets(std::move(subqueries_for_sets_)), context(std::move(context_))
{
input_streams = {input_stream};
output_stream = std::move(input_stream);
}
QueryPipelineBuilderPtr DelayedCreatingSetsStep::updatePipeline(QueryPipelineBuilders, const BuildQueryPipelineSettings &)
{
throw Exception(
ErrorCodes::LOGICAL_ERROR,
"Cannot build pipeline in DelayedCreatingSets. This step should be optimized out.");
}
}

View File

@ -9,7 +9,7 @@ namespace DB
{
/// Creates sets for subqueries and JOIN. See CreatingSetsTransform.
class CreatingSetStep : public ITransformingStep, WithContext
class CreatingSetStep : public ITransformingStep
{
public:
CreatingSetStep(
@ -32,6 +32,7 @@ private:
String description;
SubqueryForSet subquery_for_set;
SizeLimits network_transfer_limits;
ContextPtr context;
};
class CreatingSetsStep : public IQueryPlanStep
@ -46,6 +47,22 @@ public:
void describePipeline(FormatSettings & settings) const override;
};
class DelayedCreatingSetsStep final : public IQueryPlanStep
{
public:
DelayedCreatingSetsStep(DataStream input_stream, PreparedSets::SubqueriesForSets subqueries_for_sets_, ContextPtr context_);
String getName() const override { return "DelayedCreatingSets"; }
QueryPipelineBuilderPtr updatePipeline(QueryPipelineBuilders, const BuildQueryPipelineSettings &) override;
static std::vector<std::unique_ptr<QueryPlan>> makePlansForSets(DelayedCreatingSetsStep && step);
private:
PreparedSets::SubqueriesForSets subqueries_for_sets;
ContextPtr context;
};
void addCreatingSetsStep(QueryPlan & query_plan, PreparedSets::SubqueriesForSets subqueries_for_sets, ContextPtr context);
void addCreatingSetsStep(QueryPlan & query_plan, PreparedSetsPtr prepared_sets, ContextPtr context);

View File

@ -110,6 +110,7 @@ void optimizeReadInOrder(QueryPlan::Node & node, QueryPlan::Nodes & nodes);
void optimizeAggregationInOrder(QueryPlan::Node & node, QueryPlan::Nodes &);
bool optimizeUseAggregateProjections(QueryPlan::Node & node, QueryPlan::Nodes & nodes);
bool optimizeUseNormalProjections(Stack & stack, QueryPlan::Nodes & nodes);
bool addPlansForSets(QueryPlan::Node & node, QueryPlan::Nodes & nodes);
/// Enable memory bound merging of aggregation states for remote queries
/// in case it was enabled for local plan

View File

@ -0,0 +1,35 @@
#include <memory>
#include <Processors/QueryPlan/Optimizations/Optimizations.h>
#include <Processors/QueryPlan/CreatingSetsStep.h>
#include <Common/typeid_cast.h>
namespace DB::QueryPlanOptimizations
{
bool addPlansForSets(QueryPlan::Node & node, QueryPlan::Nodes & nodes)
{
auto * delayed = typeid_cast<DelayedCreatingSetsStep *>(node.step.get());
if (!delayed)
return false;
auto plans = DelayedCreatingSetsStep::makePlansForSets(std::move(*delayed));
node.children.reserve(1 + plans.size());
DataStreams input_streams;
input_streams.reserve(1 + plans.size());
input_streams.push_back(node.children.front()->step->getOutputStream());
for (const auto & plan : plans)
{
input_streams.push_back(plan->getCurrentDataStream());
node.children.push_back(plan->getRootNode());
nodes.splice(nodes.end(), QueryPlan::detachNodes(std::move(*plan)));
}
auto creating_sets = std::make_unique<CreatingSetsStep>(std::move(input_streams));
creating_sets->setStepDescription("Create sets before main query execution");
node.step = std::move(creating_sets);
return true;
}
}

View File

@ -163,6 +163,7 @@ void optimizeTreeSecondPass(const QueryPlanOptimizationSettings & optimization_s
optimizePrewhere(stack, nodes);
optimizePrimaryKeyCondition(stack);
enableMemoryBoundMerging(*frame.node, nodes);
addPlansForSets(*frame.node, nodes);
stack.pop_back();
}

View File

@ -541,4 +541,9 @@ void QueryPlan::explainEstimate(MutableColumns & columns)
}
}
QueryPlan::Nodes QueryPlan::detachNodes(QueryPlan && plan)
{
return std::move(plan.nodes);
}
}

View File

@ -105,10 +105,11 @@ public:
std::vector<Node *> children = {};
};
const Node * getRootNode() const { return root; }
using Nodes = std::list<Node>;
Node * getRootNode() const { return root; }
static Nodes detachNodes(QueryPlan && plan);
private:
QueryPlanResourceHolder resources;
Nodes nodes;

View File

@ -76,7 +76,7 @@ void CreatingSetsTransform::startSubquery()
}
subquery.promise_to_fill_set.set_value(ready_set);
subquery.set_in_progress.reset();
subquery.set.reset();
done_with_set = true;
set_from_cache = true;
}
@ -84,7 +84,7 @@ void CreatingSetsTransform::startSubquery()
}
}
if (subquery.set_in_progress)
if (subquery.set)
LOG_TRACE(log, "Creating set, key: {}", subquery.key);
if (subquery.table)
LOG_TRACE(log, "Filling temporary table.");
@ -93,7 +93,7 @@ void CreatingSetsTransform::startSubquery()
/// TODO: make via port
table_out = QueryPipeline(subquery.table->write({}, subquery.table->getInMemoryMetadataPtr(), getContext()));
done_with_set = !subquery.set_in_progress;
done_with_set = !subquery.set;
done_with_table = !subquery.table;
if ((done_with_set && !set_from_cache) /*&& done_with_join*/ && done_with_table)
@ -116,8 +116,8 @@ void CreatingSetsTransform::finishSubquery()
}
else if (read_rows != 0)
{
if (subquery.set_in_progress)
LOG_DEBUG(log, "Created Set with {} entries from {} rows in {} sec.", subquery.set_in_progress->getTotalRowCount(), read_rows, seconds);
if (subquery.set)
LOG_DEBUG(log, "Created Set with {} entries from {} rows in {} sec.", subquery.set->getTotalRowCount(), read_rows, seconds);
if (subquery.table)
LOG_DEBUG(log, "Created Table with {} rows in {} sec.", read_rows, seconds);
}
@ -131,9 +131,9 @@ void CreatingSetsTransform::init()
{
is_initialized = true;
if (subquery.set_in_progress)
if (subquery.set)
{
subquery.set_in_progress->setHeader(getInputPort().getHeader().getColumnsWithTypeAndName());
subquery.set->setHeader(getInputPort().getHeader().getColumnsWithTypeAndName());
}
watch.restart();
@ -147,7 +147,7 @@ void CreatingSetsTransform::consume(Chunk chunk)
if (!done_with_set)
{
if (!subquery.set_in_progress->insertFromBlock(block.getColumnsWithTypeAndName()))
if (!subquery.set->insertFromBlock(block.getColumnsWithTypeAndName()))
done_with_set = true;
}
@ -170,12 +170,12 @@ void CreatingSetsTransform::consume(Chunk chunk)
Chunk CreatingSetsTransform::generate()
{
if (subquery.set_in_progress)
if (subquery.set)
{
subquery.set_in_progress->finishInsert();
subquery.promise_to_fill_set.set_value(subquery.set_in_progress);
subquery.set->finishInsert();
subquery.promise_to_fill_set.set_value(subquery.set);
if (promise_to_build)
promise_to_build->set_value(subquery.set_in_progress);
promise_to_build->set_value(subquery.set);
}
if (table_out.initialized())

View File

@ -68,11 +68,18 @@ bool traverseASTFilter(
PreparedSetKey set_key;
if ((value->as<ASTSubquery>() || value->as<ASTIdentifier>()))
set_key = PreparedSetKey::forSubquery(*value);
set_key = PreparedSetKey::forSubquery(value->getTreeHash());
else
set_key = PreparedSetKey::forLiteral(*value, {primary_key_type});
set_key = PreparedSetKey::forLiteral(value->getTreeHash(), {primary_key_type});
SetPtr set = prepared_sets->get(set_key);
FutureSetPtr future_set = prepared_sets->getFuture(set_key);
if (!future_set)
return false;
if (!future_set->isReady())
future_set->buildOrderedSetInplace(context);
auto set = future_set->get();
if (!set)
return false;

View File

@ -1204,14 +1204,32 @@ bool KeyCondition::tryPrepareSetIndex(
const auto right_arg = func.getArgumentAt(1);
auto prepared_set = right_arg.tryGetPreparedSet(indexes_mapping, data_types);
LOG_TRACE(&Poco::Logger::get("KK"), "Trying to get set for {}", right_arg.getColumnName());
auto future_set = right_arg.tryGetPreparedSet(indexes_mapping, data_types);
if (!future_set)
return false;
LOG_TRACE(&Poco::Logger::get("KK"), "Found set for {}", right_arg.getColumnName());
if (!future_set->isReady())
{
LOG_TRACE(&Poco::Logger::get("KK"), "Building set inplace for {}", right_arg.getColumnName());
future_set->buildOrderedSetInplace(right_arg.getTreeContext().getQueryContext());
}
auto prepared_set = future_set->get();
if (!prepared_set)
return false;
LOG_TRACE(&Poco::Logger::get("KK"), "Set if ready for {}", right_arg.getColumnName());
/// The index can be prepared if the elements of the set were saved in advance.
if (!prepared_set->hasExplicitSetElements())
return false;
LOG_TRACE(&Poco::Logger::get("KK"), "Has explicit elements for {}", right_arg.getColumnName());
prepared_set->checkColumnsNumber(left_args_count);
for (size_t i = 0; i < indexes_mapping.size(); ++i)
prepared_set->checkTypesEqual(indexes_mapping[i].tuple_index, data_types[i]);

View File

@ -310,7 +310,13 @@ bool MergeTreeIndexConditionBloomFilter::traverseFunction(const RPNBuilderTreeNo
if (functionIsInOrGlobalInOperator(function_name))
{
ConstSetPtr prepared_set = rhs_argument.tryGetPreparedSet();
auto future_set = rhs_argument.tryGetPreparedSet();
if (future_set && !future_set->isReady())
future_set->buildOrderedSetInplace(rhs_argument.getTreeContext().getQueryContext());
ConstSetPtr prepared_set;
if (future_set)
prepared_set = future_set->get();
if (prepared_set && prepared_set->hasExplicitSetElements())
{

View File

@ -624,7 +624,14 @@ bool MergeTreeConditionFullText::tryPrepareSetBloomFilter(
if (key_tuple_mapping.empty())
return false;
auto prepared_set = right_argument.tryGetPreparedSet(data_types);
auto future_set = right_argument.tryGetPreparedSet(data_types);
if (future_set && !future_set->isReady())
future_set->buildOrderedSetInplace(right_argument.getTreeContext().getQueryContext());
ConstSetPtr prepared_set;
if (future_set)
prepared_set = future_set->get();
if (!prepared_set || !prepared_set->hasExplicitSetElements())
return false;

View File

@ -655,7 +655,14 @@ bool MergeTreeConditionInverted::tryPrepareSetGinFilter(
if (key_tuple_mapping.empty())
return false;
ConstSetPtr prepared_set = rhs.tryGetPreparedSet();
auto future_set = rhs.tryGetPreparedSet();
if (future_set && !future_set->isReady())
future_set->buildOrderedSetInplace(rhs.getTreeContext().getQueryContext());
ConstSetPtr prepared_set;
if (future_set)
prepared_set = future_set->get();
if (!prepared_set || !prepared_set->hasExplicitSetElements())
return false;

View File

@ -275,7 +275,7 @@ bool RPNBuilderTreeNode::tryGetConstant(Field & output_value, DataTypePtr & outp
namespace
{
ConstSetPtr tryGetSetFromDAGNode(const ActionsDAG::Node * dag_node)
FutureSetPtr tryGetSetFromDAGNode(const ActionsDAG::Node * dag_node)
{
if (!dag_node->column)
return {};
@ -285,28 +285,20 @@ ConstSetPtr tryGetSetFromDAGNode(const ActionsDAG::Node * dag_node)
column = &column_const->getDataColumn();
if (const auto * column_set = typeid_cast<const ColumnSet *>(column))
{
auto set = column_set->getData();
if (set && set->isCreated())
return set;
}
return column_set->getData();
return {};
}
}
ConstSetPtr RPNBuilderTreeNode::tryGetPreparedSet() const
FutureSetPtr RPNBuilderTreeNode::tryGetPreparedSet() const
{
const auto & prepared_sets = getTreeContext().getPreparedSets();
if (ast_node && prepared_sets)
{
auto prepared_sets_with_same_hash = prepared_sets->getByTreeHash(ast_node->getTreeHash());
for (auto & set : prepared_sets_with_same_hash)
if (set.isCreated())
return set.get();
return prepared_sets->getFuture(PreparedSetKey::forSubquery(ast_node->getTreeHash()));
}
else if (dag_node)
{
@ -317,16 +309,16 @@ ConstSetPtr RPNBuilderTreeNode::tryGetPreparedSet() const
return {};
}
ConstSetPtr RPNBuilderTreeNode::tryGetPreparedSet(const DataTypes & data_types) const
FutureSetPtr RPNBuilderTreeNode::tryGetPreparedSet(const DataTypes & data_types) const
{
const auto & prepared_sets = getTreeContext().getPreparedSets();
if (prepared_sets && ast_node)
{
if (ast_node->as<ASTSubquery>() || ast_node->as<ASTTableIdentifier>())
return prepared_sets->get(PreparedSetKey::forSubquery(*ast_node));
return prepared_sets->getFuture(PreparedSetKey::forSubquery(ast_node->getTreeHash()));
return prepared_sets->get(PreparedSetKey::forLiteral(*ast_node, data_types));
return prepared_sets->getFuture(PreparedSetKey::forLiteral(ast_node->getTreeHash(), data_types));
}
else if (dag_node)
{
@ -337,7 +329,7 @@ ConstSetPtr RPNBuilderTreeNode::tryGetPreparedSet(const DataTypes & data_types)
return nullptr;
}
ConstSetPtr RPNBuilderTreeNode::tryGetPreparedSet(
FutureSetPtr RPNBuilderTreeNode::tryGetPreparedSet(
const std::vector<MergeTreeSetIndex::KeyTuplePositionMapping> & indexes_mapping,
const DataTypes & data_types) const
{
@ -346,19 +338,25 @@ ConstSetPtr RPNBuilderTreeNode::tryGetPreparedSet(
if (prepared_sets && ast_node)
{
if (ast_node->as<ASTSubquery>() || ast_node->as<ASTTableIdentifier>())
return prepared_sets->get(PreparedSetKey::forSubquery(*ast_node));
return prepared_sets->getFuture(PreparedSetKey::forSubquery(ast_node->getTreeHash()));
/// We have `PreparedSetKey::forLiteral` but it is useless here as we don't have enough information
/// about types in left argument of the IN operator. Instead, we manually iterate through all the sets
/// and find the one for the right arg based on the AST structure (getTreeHash), after that we check
/// that the types it was prepared with are compatible with the types of the primary key.
auto types_match = [&indexes_mapping, &data_types](const SetPtr & candidate_set)
auto types_match = [&indexes_mapping, &data_types](const DataTypes & set_types)
{
assert(indexes_mapping.size() == data_types.size());
for (size_t i = 0; i < indexes_mapping.size(); ++i)
{
if (!candidate_set->areTypesEqual(indexes_mapping[i].tuple_index, data_types[i]))
if (indexes_mapping[i].tuple_index >= set_types.size())
return false;
auto lhs = recursiveRemoveLowCardinality(data_types[i]);
auto rhs = recursiveRemoveLowCardinality(set_types[indexes_mapping[i].tuple_index]);
if (!lhs->equals(*rhs))
return false;
}
@ -366,10 +364,10 @@ ConstSetPtr RPNBuilderTreeNode::tryGetPreparedSet(
};
auto tree_hash = ast_node->getTreeHash();
for (const auto & set : prepared_sets->getByTreeHash(tree_hash))
for (const auto & [key, future_set] : prepared_sets->getSets())
{
if (set.isCreated() && types_match(set.get()))
return set.get();
if (key.ast_hash == tree_hash && types_match(key.types))
return future_set;
}
}
else

View File

@ -109,13 +109,13 @@ public:
bool tryGetConstant(Field & output_value, DataTypePtr & output_type) const;
/// Try get prepared set from node
ConstSetPtr tryGetPreparedSet() const;
FutureSetPtr tryGetPreparedSet() const;
/// Try get prepared set from node that match data types
ConstSetPtr tryGetPreparedSet(const DataTypes & data_types) const;
FutureSetPtr tryGetPreparedSet(const DataTypes & data_types) const;
/// Try get prepared set from node that match indexes mapping and data types
ConstSetPtr tryGetPreparedSet(
FutureSetPtr tryGetPreparedSet(
const std::vector<MergeTreeSetIndex::KeyTuplePositionMapping> & indexes_mapping,
const DataTypes & data_types) const;

View File

@ -313,8 +313,15 @@ static void extractPathImpl(const ActionsDAG::Node & node, Paths & res, ContextP
if (!column_set)
return;
auto set = column_set->getData();
if (!set || !set->isCreated())
auto future_set = column_set->getData();
if (!future_set)
return;
if (!future_set->isReady())
future_set->buildOrderedSetInplace(context);
auto set = future_set->get();
if (!set)
return;
if (!set->hasExplicitSetElements())

View File

@ -80,24 +80,24 @@ ASTPtr buildWhereExpression(const ASTs & functions)
return makeASTFunction("and", functions);
}
void buildSets(const ASTPtr & expression, ExpressionAnalyzer & analyzer)
{
const auto * func = expression->as<ASTFunction>();
if (func && functionIsInOrGlobalInOperator(func->name))
{
const IAST & args = *func->arguments;
const ASTPtr & arg = args.children.at(1);
if (arg->as<ASTSubquery>() || arg->as<ASTTableIdentifier>())
{
analyzer.tryMakeSetForIndexFromSubquery(arg);
}
}
else
{
for (const auto & child : expression->children)
buildSets(child, analyzer);
}
}
// void buildSets(const ASTPtr & expression, ExpressionAnalyzer & analyzer)
// {
// const auto * func = expression->as<ASTFunction>();
// if (func && functionIsInOrGlobalInOperator(func->name))
// {
// const IAST & args = *func->arguments;
// const ASTPtr & arg = args.children.at(1);
// if (arg->as<ASTSubquery>() || arg->as<ASTTableIdentifier>())
// {
// analyzer.tryMakeSetForIndexFromSubquery(arg);
// }
// }
// else
// {
// for (const auto & child : expression->children)
// buildSets(child, analyzer);
// }
// }
}
@ -199,7 +199,7 @@ void filterBlockWithQuery(const ASTPtr & query, Block & block, ContextPtr contex
/// Let's analyze and calculate the prepared expression.
auto syntax_result = TreeRewriter(context).analyze(expression_ast, block.getNamesAndTypesList());
ExpressionAnalyzer analyzer(expression_ast, syntax_result, context);
buildSets(expression_ast, analyzer);
//buildSets(expression_ast, analyzer);
ExpressionActionsPtr actions = analyzer.getActions(false /* add alises */, true /* project result */, CompileExpressions::yes);
Block block_with_filter = block;