Fixes for new analyzer

This commit is contained in:
Alexander Gololobov 2023-04-04 23:47:05 +02:00
parent adbe87e9d3
commit fbf09a1115
12 changed files with 94 additions and 69 deletions

View File

@ -1,19 +1,15 @@
#pragma once
#include <Interpreters/PreparedSets.h>
#include <Columns/IColumnDummy.h>
#include <Core/Field.h>
#include "Common/Exception.h"
#include <chrono>
#include <future>
#include <stdexcept>
namespace DB
{
class Set;
using SetPtr = std::shared_ptr<Set>;
using ConstSetPtr = std::shared_ptr<const Set>;
using FutureSet = std::shared_future<SetPtr>;
/** A column containing multiple values in the `IN` section.
@ -33,7 +29,7 @@ public:
TypeIndex getDataType() const override { return TypeIndex::Set; }
MutableColumnPtr cloneDummy(size_t s_) const override { return ColumnSet::create(s_, data); }
ConstSetPtr getData() const { if (!data.valid() || data.wait_for(std::chrono::seconds(0)) != std::future_status::ready ) return nullptr; return data.get(); }
ConstSetPtr getData() const { if (!data.isReady()) return nullptr; return data.get(); }
// Used only for debugging, making it DUMPABLE
Field operator[](size_t) const override { return {}; }

View File

@ -1181,7 +1181,7 @@ void ActionsMatcher::visit(const ASTFunction & node, const ASTPtr & ast, Data &
/// If the argument is a set given by an enumeration of values (so, the set was already built), give it a unique name,
/// so that sets with the same literal representation do not fuse together (they can have different types).
const bool is_constant_set = prepared_set.wait_for(std::chrono::seconds(0)) == std::future_status::ready && prepared_set.get()->isCreated();
const bool is_constant_set = prepared_set.isCreated();
if (is_constant_set) /// TODO: if the set is from prepared_sets_cache, it might be not empty already but we should not handle it as const!!!
column.name = data.getUniqueName("__set");
else

View File

@ -1,4 +1,3 @@
#include <cstddef>
#include <memory>
#include <Core/Block.h>
@ -57,8 +56,7 @@
#include <Core/Names.h>
#include <Core/NamesAndTypes.h>
#include <Common/logger_useful.h>
#include "Interpreters/PreparedSets.h"
#include "QueryPipeline/SizeLimits.h"
#include <QueryPipeline/SizeLimits.h>
#include <DataTypes/DataTypesNumber.h>
@ -71,6 +69,7 @@
#include <Interpreters/interpretSubquery.h>
#include <Interpreters/JoinUtils.h>
#include <Interpreters/misc.h>
#include <Interpreters/PreparedSets.h>
#include <IO/Operators.h>
#include <IO/WriteBufferFromString.h>
@ -493,7 +492,7 @@ void ExpressionAnalyzer::tryMakeSetForIndexFromSubquery(const ASTPtr & subquery_
auto set_cache = getContext()->getPreparedSetsCache();
if (set_cache)
{
auto from_cache = set_cache->findOrPromiseToBuild(set_key);
auto from_cache = set_cache->findOrPromiseToBuild(toString(set_key));
if (from_cache.index() == 0)
{
LOG_TRACE(getLogger(), "Building set, key: {}:{}", set_key.ast_hash.first, set_key.ast_hash.second);

View File

@ -58,12 +58,12 @@ SubqueryForSet & PreparedSets::createOrGetSubquery(const String & subquery_id, c
else
{
subquery.set_in_progress = std::make_shared<Set>(set_size_limit, false, transform_null_in);
sets[key] = subquery.promise_to_fill_set.get_future();
sets[key] = FutureSet(subquery.promise_to_fill_set.get_future());
}
if (!subquery.set_in_progress)
{
subquery.key = key;
subquery.key = toString(key);
subquery.set_in_progress = std::make_shared<Set>(set_size_limit, false, transform_null_in);
}
@ -132,8 +132,19 @@ QueryPlanPtr SubqueryForSet::detachSource()
return res;
}
bool FutureSet::isReady() const
{
return valid() &&
wait_for(std::chrono::seconds(0)) == std::future_status::ready;
}
std::variant<std::promise<SetPtr>, FutureSet> PreparedSetsCache::findOrPromiseToBuild(const PreparedSetKey & key)
bool FutureSet::isCreated() const
{
return isReady() && get() != nullptr && get()->isCreated();
}
std::variant<std::promise<SetPtr>, FutureSet> PreparedSetsCache::findOrPromiseToBuild(const String & key)
{
// auto* log = &Poco::Logger::get("PreparedSetsCache");
@ -167,7 +178,7 @@ FutureSet makeReadyFutureSet(SetPtr set)
{
std::promise<SetPtr> promise;
promise.set_value(set);
return promise.get_future();
return FutureSet(promise.get_future());
}
};

View File

@ -18,10 +18,46 @@ class QueryPlan;
class Set;
using SetPtr = std::shared_ptr<Set>;
using ConstSetPtr = std::shared_ptr<const Set>;
using FutureSet = std::shared_future<SetPtr>;
class InterpreterSelectWithUnionQuery;
class FutureSet final : public std::shared_future<SetPtr>
{
public:
FutureSet() = default;
FutureSet(const std::shared_future<SetPtr> & future) : std::shared_future<SetPtr>(future) {}
bool isReady() const;
bool isCreated() const;
};
/// Information on how to build set for the [GLOBAL] IN section.
class SubqueryForSet
{
public:
void createSource(InterpreterSelectWithUnionQuery & interpreter, StoragePtr table_ = nullptr);
bool hasSource() const;
/// Returns query plan for the set's source
/// and removes it from SubqueryForSet because we need to build it only once.
std::unique_ptr<QueryPlan> detachSource();
/// Build this set from the result of the subquery.
String key;
SetPtr set_in_progress;
std::promise<SetPtr> promise_to_fill_set;
FutureSet set = {promise_to_fill_set.get_future()};
/// If set, put the result into the table.
/// This is a temporary table for transferring to remote servers for distributed query processing.
StoragePtr table;
/// The source is obtained using the InterpreterSelectQuery subquery.
std::unique_ptr<QueryPlan> source;
};
struct PreparedSetKey
{
/// Prepared sets for tuple literals are indexed by the hash of the tree contents and by the desired
@ -44,33 +80,10 @@ struct PreparedSetKey
};
};
/// Information on how to build set for the [GLOBAL] IN section.
class SubqueryForSet
inline String toString(const PreparedSetKey & key)
{
public:
void createSource(InterpreterSelectWithUnionQuery & interpreter, StoragePtr table_ = nullptr);
bool hasSource() const;
/// Returns query plan for the set's source
/// and removes it from SubqueryForSet because we need to build it only once.
std::unique_ptr<QueryPlan> detachSource();
/// Build this set from the result of the subquery.
PreparedSetKey key;
SetPtr set_in_progress;
std::promise<SetPtr> promise_to_fill_set;
FutureSet set = promise_to_fill_set.get_future();
/// If set, put the result into the table.
/// This is a temporary table for transferring to remote servers for distributed query processing.
StoragePtr table;
/// The source is obtained using the InterpreterSelectQuery subquery.
std::unique_ptr<QueryPlan> source;
};
return "__set_" + std::to_string(key.ast_hash.first) + "_" + std::to_string(key.ast_hash.second);
}
class PreparedSets
{
@ -115,11 +128,7 @@ using PreparedSetsPtr = std::shared_ptr<PreparedSets>;
class PreparedSetsCache
{
public:
/// Returns the set from the cache or builds it using the provided function.
/// If the set is already being built by another task, then this call will wait for the set to be built.
FutureSet findOrBuild(const PreparedSetKey & key, const std::function<FutureSet()> & build_set);
std::variant<std::promise<SetPtr>, FutureSet> findOrPromiseToBuild(const PreparedSetKey & key);
std::variant<std::promise<SetPtr>, FutureSet> findOrPromiseToBuild(const String & key);
private:
struct Entry
@ -132,7 +141,7 @@ private:
/// Protects just updates to the cache. When we got EntyPtr from the cache we can access it without locking.
std::mutex cache_mutex;
std::unordered_map<PreparedSetKey, EntryPtr, PreparedSetKey::Hash> cache;
std::unordered_map<String, EntryPtr> cache;
};
using PreparedSetsCachePtr = std::shared_ptr<PreparedSetsCache>;

View File

@ -1,6 +1,7 @@
#include <Planner/CollectSets.h>
#include <Interpreters/Context.h>
#include <Interpreters/PreparedSets.h>
#include <Storages/StorageSet.h>
@ -52,7 +53,8 @@ public:
if (storage_set)
{
planner_context.registerSet(set_key, PlannerSet(storage_set->getSet()));
/// TODO: need to handle storage_set as not-yet-built set?
planner_context.registerSet(set_key, PlannerSet(makeReadyFutureSet(storage_set->getSet())));
}
else if (const auto * constant_node = in_second_argument->as<ConstantNode>())
{
@ -62,16 +64,12 @@ public:
constant_node->getResultType(),
settings);
planner_context.registerSet(set_key, PlannerSet(std::move(set)));
planner_context.registerSet(set_key, PlannerSet(makeReadyFutureSet(std::move(set))));
}
else if (in_second_argument_node_type == QueryTreeNodeType::QUERY ||
in_second_argument_node_type == QueryTreeNodeType::UNION)
{
SizeLimits size_limits_for_set = {settings.max_rows_in_set, settings.max_bytes_in_set, settings.set_overflow_mode};
bool tranform_null_in = settings.transform_null_in;
auto set = std::make_shared<Set>(size_limits_for_set, false /*fill_set_elements*/, tranform_null_in);
planner_context.registerSet(set_key, PlannerSet(std::move(set), in_second_argument));
planner_context.registerSet(set_key, PlannerSet(in_second_argument));
}
else
{

View File

@ -894,7 +894,7 @@ void addBuildSubqueriesForSetsStepIfNeeded(QueryPlan & query_plan,
if (!planner_set)
continue;
if (planner_set->getSet()->isCreated() || !planner_set->getSubqueryNode())
if (planner_set->getSet().isCreated() || !planner_set->getSubqueryNode())
continue;
auto subquery_options = select_query_options.subquery();
@ -904,8 +904,17 @@ void addBuildSubqueriesForSetsStepIfNeeded(QueryPlan & query_plan,
planner_context->getGlobalPlannerContext());
subquery_planner.buildQueryPlanIfNeeded();
const auto & settings = planner_context->getQueryContext()->getSettingsRef();
SizeLimits size_limits_for_set = {settings.max_rows_in_set, settings.max_bytes_in_set, settings.set_overflow_mode};
bool tranform_null_in = settings.transform_null_in;
auto set = std::make_shared<Set>(size_limits_for_set, false /*fill_set_elements*/, tranform_null_in);
/// TODO: cleanup this initialization
SubqueryForSet subquery_for_set;
subquery_for_set.set_in_progress = planner_set->getSet();
subquery_for_set.key = set_key;
subquery_for_set.set_in_progress = set;
subquery_for_set.promise_to_fill_set = planner_set->getPromiseToBuildSet();
subquery_for_set.set = planner_set->getSet();
subquery_for_set.source = std::make_unique<QueryPlan>(std::move(subquery_planner).extractQueryPlan());
subqueries_for_sets.emplace(set_key, std::move(subquery_for_set));

View File

@ -633,8 +633,8 @@ PlannerActionsVisitorImpl::NodeNameAndNodeMinLevel PlannerActionsVisitorImpl::ma
column.type = std::make_shared<DataTypeSet>();
/// TODO: Properly fix this to use FutureSet for non-yet-created sets.
bool set_is_created = planner_set.getSet()->isCreated();
auto column_set = ColumnSet::create(1, makeReadyFutureSet(planner_set.getSet()));
bool set_is_created = planner_set.getSet().isCreated();
auto column_set = ColumnSet::create(1, planner_set.getSet());
if (set_is_created)
column.column = ColumnConst::create(std::move(column_set), 1);

View File

@ -128,7 +128,7 @@ PlannerContext::SetKey PlannerContext::createSetKey(const QueryTreeNodePtr & set
void PlannerContext::registerSet(const SetKey & key, PlannerSet planner_set)
{
if (!planner_set.getSet())
if (!planner_set.getSet().valid()) // TODO
throw Exception(ErrorCodes::LOGICAL_ERROR, "Set must be initialized");
const auto & subquery_node = planner_set.getSubqueryNode();

View File

@ -7,6 +7,7 @@
#include <Interpreters/Context_fwd.h>
#include <Interpreters/Set.h>
#include <Interpreters/PreparedSets.h>
#include <Analyzer/IQueryTreeNode.h>
@ -56,18 +57,18 @@ class PlannerSet
{
public:
/// Construct planner set that is ready for execution
explicit PlannerSet(SetPtr set_)
explicit PlannerSet(FutureSet set_)
: set(std::move(set_))
{}
/// Construct planner set with set and subquery node
explicit PlannerSet(SetPtr set_, QueryTreeNodePtr subquery_node_)
: set(std::move(set_))
explicit PlannerSet(QueryTreeNodePtr subquery_node_)
: set(promise_to_build_set.get_future())
, subquery_node(std::move(subquery_node_))
{}
/// Get set
const SetPtr & getSet() const
const FutureSet & getSet() const
{
return set;
}
@ -78,8 +79,11 @@ public:
return subquery_node;
}
std::promise<SetPtr> getPromiseToBuildSet() const { return std::move(promise_to_build_set); }
private:
SetPtr set;
mutable std::promise<SetPtr> promise_to_build_set; // FIXME: mutable is a hack
FutureSet set;
QueryTreeNodePtr subquery_node;
};

View File

@ -70,7 +70,7 @@ void CreatingSetsTransform::startSubquery()
}
if (subquery.set_in_progress)
LOG_TRACE(log, "Creating set, key: {}:{}", subquery.key.ast_hash.first, subquery.key.ast_hash.second);
LOG_TRACE(log, "Creating set, key: {}", subquery.key);
if (subquery.table)
LOG_TRACE(log, "Filling temporary table.");

View File

@ -1,6 +1,5 @@
#pragma once
#include <future>
#include <QueryPipeline/SizeLimits.h>
#include <Interpreters/Context_fwd.h>
#include <Processors/IAccumulatingTransform.h>