Fixing more tests

This commit is contained in:
Nikolai Kochetov 2023-05-25 19:18:11 +00:00
parent 30ff5113d9
commit 7ebe19f5fb
21 changed files with 132 additions and 65 deletions

View File

@ -1405,6 +1405,8 @@ FutureSetPtr ActionsMatcher::makeSet(const ASTFunction & node, Data & data, bool
if (auto set = data.prepared_sets->getFuture(set_key))
return set;
FutureSetPtr external_table_set;
/// A special case is if the name of the table is specified on the right side of the IN statement,
/// and the table has the type Set (a previously prepared set).
if (identifier)
@ -1417,6 +1419,9 @@ FutureSetPtr ActionsMatcher::makeSet(const ASTFunction & node, Data & data, bool
if (StorageSet * storage_set = dynamic_cast<StorageSet *>(table.get()))
return data.prepared_sets->addFromStorage(set_key, storage_set->getSet());
}
if (auto tmp_table = data.getContext()->findExternalTable(table_id.getShortName()))
external_table_set = tmp_table->future_set;
}
/// We get the stream of blocks for the subquery. Create Set and put it in place of the subquery.
@ -1438,7 +1443,7 @@ FutureSetPtr ActionsMatcher::makeSet(const ASTFunction & node, Data & data, bool
subquery_for_set.createSource(*interpreter);
}
return data.prepared_sets->addFromSubquery(set_key, std::move(subquery_for_set));
return data.prepared_sets->addFromSubquery(set_key, std::move(subquery_for_set), std::move(external_table_set));
}
else
{

View File

@ -1319,6 +1319,21 @@ void Context::addExternalTable(const String & table_name, TemporaryTableHolder &
external_tables_mapping.emplace(table_name, std::make_shared<TemporaryTableHolder>(std::move(temporary_table)));
}
std::shared_ptr<TemporaryTableHolder> Context::findExternalTable(const String & table_name) const
{
if (isGlobalContext())
throw Exception(ErrorCodes::LOGICAL_ERROR, "Global context cannot have external tables");
std::shared_ptr<TemporaryTableHolder> holder;
{
auto lock = getLock();
auto iter = external_tables_mapping.find(table_name);
if (iter == external_tables_mapping.end())
return {};
holder = iter->second;
}
return holder;
}
std::shared_ptr<TemporaryTableHolder> Context::removeExternalTable(const String & table_name)
{

View File

@ -611,6 +611,7 @@ public:
Tables getExternalTables() const;
void addExternalTable(const String & table_name, TemporaryTableHolder && temporary_table);
std::shared_ptr<TemporaryTableHolder> findExternalTable(const String & table_name) const;
std::shared_ptr<TemporaryTableHolder> removeExternalTable(const String & table_name);
const Scalars & getScalars() const;

View File

@ -79,6 +79,8 @@ private:
using DDLGuardPtr = std::unique_ptr<DDLGuard>;
class FutureSet;
using FutureSetPtr = std::shared_ptr<FutureSet>;
/// Creates temporary table in `_temporary_and_external_tables` with randomly generated unique StorageID.
/// Such table can be accessed from everywhere by its ID.
@ -111,6 +113,7 @@ struct TemporaryTableHolder : boost::noncopyable, WithContext
IDatabase * temporary_tables = nullptr;
UUID id = UUIDHelpers::Nil;
FutureSetPtr future_set;
};
///TODO maybe remove shared_ptr from here?

View File

@ -64,6 +64,7 @@ public:
void addExternalStorage(ASTPtr & ast, bool set_alias = false)
{
// std::cerr << "=============== addExternalStorage is remote " << is_remote << std::endl;
/// With nondistributed queries, creating temporary tables does not make sense.
if (!is_remote)
return;
@ -163,10 +164,10 @@ public:
/// We need to materialize external tables immediately because reading from distributed
/// tables might generate local plans which can refer to external tables during index
/// analysis. It's too late to populate the external table via CreatingSetsTransform.
if (is_explain)
{
/// Do not materialize external tables if it's explain statement.
}
// if (is_explain)
// {
// /// Do not materialize external tables if it's explain statement.
// }
// else if (getContext()->getSettingsRef().use_index_for_in_with_subqueries)
// {
// auto external_table = external_storage_holder->getTable();
@ -176,13 +177,15 @@ public:
// CompletedPipelineExecutor executor(io.pipeline);
// executor.execute();
// }
else
// else
{
// auto & subquery_for_set = prepared_sets->getSubquery(external_table_name);
// subquery_for_set.createSource(*interpreter, external_storage);
auto key = subquery_or_table_name->getColumnName();
auto set_key = PreparedSetKey::forSubquery(database_and_table_name->getTreeHash());
// std::cerr << "====== Adding key " << set_key.toString() << std::endl;
if (!prepared_sets->getFuture(set_key))
{
SubqueryForSet subquery_for_set;
@ -191,10 +194,12 @@ public:
subquery_for_set.createSource(*interpreter);
//std::cerr << reinterpret_cast<const void *>(prepared_sets.get()) << std::endl;
prepared_sets->addFromSubquery(set_key, std::move(subquery_for_set));
auto future_set = prepared_sets->addFromSubquery(set_key, std::move(subquery_for_set), nullptr);
external_storage_holder->future_set = std::move(future_set);
}
else
prepared_sets->addStorageToSubquery(key, std::move(external_storage));
throw Exception(ErrorCodes::LOGICAL_ERROR, "!!!!!!!!");
//prepared_sets->addStorageToSubquery(key, std::move(external_storage));
}
/** NOTE If it was written IN tmp_table - the existing temporary (but not external) table,

View File

@ -3088,7 +3088,7 @@ void InterpreterSelectQuery::executeExtremes(QueryPlan & query_plan)
void InterpreterSelectQuery::executeSubqueriesInSetsAndJoins(QueryPlan & query_plan)
{
auto subqueries = prepared_sets->detachSubqueries(context);
auto subqueries = prepared_sets->detachSubqueries();
if (!subqueries.empty())
{

View File

@ -101,14 +101,14 @@ String PreparedSetKey::toString() const
/// If the subquery is not associated with any set, create default-constructed SubqueryForSet.
/// It's aimed to fill external table passed to SubqueryForSet::createSource.
void PreparedSets::addStorageToSubquery(const String & subquery_id, StoragePtr storage)
{
auto it = subqueries.find(subquery_id);
if (it == subqueries.end())
throw Exception(ErrorCodes::LOGICAL_ERROR, "Cannot find subquery {}", subquery_id);
// void PreparedSets::addStorageToSubquery(const String & subquery_id, StoragePtr storage)
// {
// auto it = subqueries.find(subquery_id);
// if (it == subqueries.end())
// throw Exception(ErrorCodes::LOGICAL_ERROR, "Cannot find subquery {}", subquery_id);
it->second->addStorage(std::move(storage));
}
// it->second->addStorage(std::move(storage));
// }
FutureSetPtr PreparedSets::addFromStorage(const PreparedSetKey & key, SetPtr set_)
{
@ -132,10 +132,10 @@ FutureSetPtr PreparedSets::addFromTuple(const PreparedSetKey & key, Block block,
return it->second;
}
FutureSetPtr PreparedSets::addFromSubquery(const PreparedSetKey & key, SubqueryForSet subquery)
FutureSetPtr PreparedSets::addFromSubquery(const PreparedSetKey & key, SubqueryForSet subquery, FutureSetPtr external_table_set)
{
auto id = subquery.key;
auto from_subquery = std::make_shared<FutureSetFromSubquery>(std::move(subquery));
auto from_subquery = std::make_shared<FutureSetFromSubquery>(std::move(subquery), std::move(external_table_set));
auto [it, inserted] = sets.emplace(key, from_subquery);
if (!inserted)
@ -145,7 +145,7 @@ FutureSetPtr PreparedSets::addFromSubquery(const PreparedSetKey & key, SubqueryF
// std::cerr << "========= PreparedSets::addFromSubquery\n";
// std::cerr << StackTrace().toString() << std::endl;
subqueries.emplace(id, std::move(from_subquery));
subqueries.emplace_back(SetAndName{.name = id, .set = std::move(from_subquery)});
return it->second;
}
@ -176,7 +176,7 @@ FutureSetPtr PreparedSets::getFuture(const PreparedSetKey & key) const
// return res;
// }
PreparedSets::SubqueriesForSets PreparedSets::detachSubqueries(const ContextPtr &)
PreparedSets::SubqueriesForSets PreparedSets::detachSubqueries()
{
auto res = std::move(subqueries);
subqueries = SubqueriesForSets();
@ -226,6 +226,36 @@ std::variant<std::promise<SetPtr>, SharedSet> PreparedSetsCache::findOrPromiseTo
return promise_to_fill_set;
}
SetPtr FutureSetFromSubquery::buildOrderedSetInplace(const ContextPtr & context)
{
if (!context->getSettingsRef().use_index_for_in_with_subqueries)
return nullptr;
if (set)
{
if (set->hasExplicitSetElements())
return set;
return nullptr;
}
if (external_table_set)
return set = external_table_set->buildOrderedSetInplace(context);
auto plan = buildPlan(context, true);
if (!plan)
return nullptr;
auto builder = plan->buildQueryPipeline(QueryPlanOptimizationSettings::fromContext(context), BuildQueryPipelineSettings::fromContext(context));
auto pipeline = QueryPipelineBuilder::getPipeline(std::move(*builder));
pipeline.complete(std::make_shared<EmptySink>(Block()));
CompletedPipelineExecutor executor(pipeline);
executor.execute();
return set;
}
std::unique_ptr<QueryPlan> FutureSetFromSubquery::buildPlan(const ContextPtr & context, bool create_ordered_set)
{
if (set)
@ -313,7 +343,8 @@ FutureSetFromTuple::FutureSetFromTuple(Block block, const Settings & settings)
//block(std::move(block_))
}
FutureSetFromSubquery::FutureSetFromSubquery(SubqueryForSet subquery_) : subquery(std::move(subquery_)) {}
FutureSetFromSubquery::FutureSetFromSubquery(SubqueryForSet subquery_, FutureSetPtr external_table_set_)
: subquery(std::move(subquery_)), external_table_set(std::move(external_table_set_)) {}
FutureSetFromStorage::FutureSetFromStorage(SetPtr set_) : set(std::move(set_)) {}

View File

@ -132,30 +132,13 @@ public:
class FutureSetFromSubquery : public FutureSet
{
public:
FutureSetFromSubquery(SubqueryForSet subquery_);
FutureSetFromSubquery(SubqueryForSet subquery_, FutureSetPtr external_table_set_);
bool isReady() const override { return set != nullptr; }
bool isFilled() const override { return isReady(); }
SetPtr get() const override { return set; }
SetPtr buildOrderedSetInplace(const ContextPtr & context) override
{
if (!context->getSettingsRef().use_index_for_in_with_subqueries)
return nullptr;
auto plan = buildPlan(context, true);
if (!plan)
return nullptr;
auto builder = plan->buildQueryPipeline(QueryPlanOptimizationSettings::fromContext(context), BuildQueryPipelineSettings::fromContext(context));
auto pipeline = QueryPipelineBuilder::getPipeline(std::move(*builder));
pipeline.complete(std::make_shared<EmptySink>(Block()));
CompletedPipelineExecutor executor(pipeline);
executor.execute();
return set;
}
SetPtr buildOrderedSetInplace(const ContextPtr & context) override;
std::unique_ptr<QueryPlan> build(const ContextPtr & context) override
{
@ -167,6 +150,7 @@ public:
private:
SetPtr set;
SubqueryForSet subquery;
FutureSetPtr external_table_set;
std::unique_ptr<QueryPlan> buildPlan(const ContextPtr & context, bool create_ordered_set);
};
@ -244,16 +228,21 @@ struct PreparedSetKey
class PreparedSets
{
public:
using SubqueriesForSets = std::unordered_map<String, std::shared_ptr<FutureSetFromSubquery>>;
struct SetAndName
{
String name;
std::shared_ptr<FutureSetFromSubquery> set;
};
using SubqueriesForSets = std::vector<SetAndName>;
// SubqueryForSet & createOrGetSubquery(const String & subquery_id, const PreparedSetKey & key,
// SizeLimits set_size_limit, bool transform_null_in);
FutureSetPtr addFromStorage(const PreparedSetKey & key, SetPtr set_);
FutureSetPtr addFromTuple(const PreparedSetKey & key, Block block, const Settings & settings);
FutureSetPtr addFromSubquery(const PreparedSetKey & key, SubqueryForSet subquery);
FutureSetPtr addFromSubquery(const PreparedSetKey & key, SubqueryForSet subquery, FutureSetPtr external_table_set);
void addStorageToSubquery(const String & subquery_id, StoragePtr external_storage);
//void addStorageToSubquery(const String & subquery_id, StoragePtr external_storage);
FutureSetPtr getFuture(const PreparedSetKey & key) const;
//SubqueryForSet & getSubquery(const String & subquery_id);
@ -262,7 +251,7 @@ public:
/// Get subqueries and clear them.
/// We need to build a plan for subqueries just once. That's why we can clear them after accessing them.
/// SetPtr would still be available for consumers of PreparedSets.
SubqueriesForSets detachSubqueries(const ContextPtr &);
SubqueriesForSets detachSubqueries();
/// Returns all sets that match the given ast hash not checking types
/// Used in KeyCondition and MergeTreeIndexConditionBloomFilter to make non exact match for types in PreparedSetKey

View File

@ -107,7 +107,8 @@ public:
subquery_for_set.key = planner_context.createSetKey(in_second_argument);
subquery_for_set.source = std::make_unique<QueryPlan>(std::move(subquery_planner).extractQueryPlan());
sets.addFromSubquery(set_key, std::move(subquery_for_set));
/// TODO
sets.addFromSubquery(set_key, std::move(subquery_for_set), nullptr);
//planner_context.registerSet(set_key, PlannerSet(in_second_argument));
}

View File

@ -1468,7 +1468,7 @@ void Planner::buildPlanForQueryNode()
if (!select_query_options.only_analyze)
{
auto subqueries = planner_context->getPreparedSets().detachSubqueries(planner_context->getQueryContext());
auto subqueries = planner_context->getPreparedSets().detachSubqueries();
if (!subqueries.empty())
{

View File

@ -632,7 +632,13 @@ PlannerActionsVisitorImpl::NodeNameAndNodeMinLevel PlannerActionsVisitorImpl::ma
DataTypes set_element_types;
auto in_second_argument_node_type = in_second_argument->getNodeType();
if (!(in_second_argument_node_type == QueryTreeNodeType::QUERY || in_second_argument_node_type == QueryTreeNodeType::UNION))
// std::cerr << "=========== " << in_second_argument->getNodeTypeName() << std::endl;
bool subquery_or_table =
in_second_argument_node_type == QueryTreeNodeType::QUERY ||
in_second_argument_node_type == QueryTreeNodeType::UNION ||
in_second_argument_node_type == QueryTreeNodeType::TABLE;
if (!subquery_or_table)
{
set_element_types = {in_first_argument->getResultType()};
const auto * left_tuple_type = typeid_cast<const DataTypeTuple *>(set_element_types.front().get());

View File

@ -48,7 +48,7 @@ CreatingSetStep::CreatingSetStep(
void CreatingSetStep::transformPipeline(QueryPipelineBuilder & pipeline, const BuildQueryPipelineSettings &)
{
pipeline.addCreatingSetsTransform(getOutputStream().header, std::move(subquery_for_set), network_transfer_limits, context);
pipeline.addCreatingSetsTransform(getOutputStream().header, std::move(subquery_for_set), network_transfer_limits, context->getPreparedSetsCache());
}
void CreatingSetStep::updateOutputStream()
@ -189,7 +189,7 @@ void addCreatingSetsStep(QueryPlan & query_plan, PreparedSetsPtr prepared_sets,
if (!prepared_sets || prepared_sets->empty())
return;
addCreatingSetsStep(query_plan, prepared_sets->detachSubqueries(context), context);
addCreatingSetsStep(query_plan, prepared_sets->detachSubqueries(), context);
}
DelayedCreatingSetsStep::DelayedCreatingSetsStep(

View File

@ -43,7 +43,7 @@ std::unique_ptr<QueryPlan> createLocalPlan(
const Block & header,
ContextPtr context,
QueryProcessingStage::Enum processed_stage,
PreparedSetsPtr prepared_sets,
[[maybe_unused]] PreparedSetsPtr prepared_sets,
size_t shard_num,
size_t shard_count,
size_t replica_num,
@ -99,7 +99,7 @@ std::unique_ptr<QueryPlan> createLocalPlan(
}
else
{
auto interpreter = InterpreterSelectQuery(query_ast, new_context, select_query_options, prepared_sets);
auto interpreter = InterpreterSelectQuery(query_ast, new_context, select_query_options); //, prepared_sets);
interpreter.buildQueryPlan(*query_plan);
}

View File

@ -96,11 +96,13 @@ private:
};
ReadFromMemoryStorageStep::ReadFromMemoryStorageStep(const Names & columns_to_read_,
StoragePtr storage_,
const StorageSnapshotPtr & storage_snapshot_,
const size_t num_streams_,
const bool delay_read_for_global_sub_queries_) :
SourceStepWithFilter(DataStream{.header=storage_snapshot_->getSampleBlockForColumns(columns_to_read_)}),
columns_to_read(columns_to_read_),
storage(std::move(storage_)),
storage_snapshot(storage_snapshot_),
num_streams(num_streams_),
delay_read_for_global_sub_queries(delay_read_for_global_sub_queries_)
@ -142,9 +144,9 @@ Pipe ReadFromMemoryStorageStep::makePipe()
storage_snapshot,
nullptr /* data */,
nullptr /* parallel execution index */,
[current_data](std::shared_ptr<const Blocks> & data_to_initialize)
[storage = storage](std::shared_ptr<const Blocks> & data_to_initialize)
{
data_to_initialize = current_data;
data_to_initialize = static_cast<const StorageMemory &>(*storage).data.get();
}));
}

View File

@ -16,6 +16,7 @@ class ReadFromMemoryStorageStep final : public SourceStepWithFilter
{
public:
ReadFromMemoryStorageStep(const Names & columns_to_read_,
StoragePtr storage_,
const StorageSnapshotPtr & storage_snapshot_,
size_t num_streams_,
bool delay_read_for_global_sub_queries_);
@ -35,6 +36,7 @@ private:
static constexpr auto name = "ReadFromMemoryStorage";
Names columns_to_read;
StoragePtr storage;
StorageSnapshotPtr storage_snapshot;
size_t num_streams;
bool delay_read_for_global_sub_queries;

View File

@ -27,11 +27,11 @@ CreatingSetsTransform::CreatingSetsTransform(
Block out_header_,
SubqueryForSet subquery_for_set_,
SizeLimits network_transfer_limits_,
ContextPtr context_)
PreparedSetsCachePtr prepared_sets_cache_)
: IAccumulatingTransform(std::move(in_header_), std::move(out_header_))
, WithContext(context_)
, subquery(std::move(subquery_for_set_))
, network_transfer_limits(std::move(network_transfer_limits_))
, prepared_sets_cache(std::move(prepared_sets_cache_))
{
}
@ -52,14 +52,13 @@ void CreatingSetsTransform::work()
void CreatingSetsTransform::startSubquery()
{
/// Lookup the set in the cache if we don't need to build table.
auto ctx = context.lock();
if (ctx && ctx->getPreparedSetsCache() && !subquery.table)
if (prepared_sets_cache && !subquery.table)
{
/// Try to find the set in the cache and wait for it to be built.
/// Retry if the set from cache fails to be built.
while (true)
{
auto from_cache = ctx->getPreparedSetsCache()->findOrPromiseToBuild(subquery.key);
auto from_cache = prepared_sets_cache->findOrPromiseToBuild(subquery.key);
if (from_cache.index() == 0)
{
promise_to_build = std::move(std::get<0>(from_cache));
@ -89,9 +88,11 @@ void CreatingSetsTransform::startSubquery()
if (subquery.table)
LOG_TRACE(log, "Filling temporary table.");
// std::cerr << StackTrace().toString() << std::endl;
if (subquery.table)
/// TODO: make via port
table_out = QueryPipeline(subquery.table->write({}, subquery.table->getInMemoryMetadataPtr(), getContext()));
table_out = QueryPipeline(subquery.table->write({}, subquery.table->getInMemoryMetadataPtr(), nullptr));
done_with_set = !subquery.set;
done_with_table = !subquery.table;

View File

@ -23,7 +23,7 @@ class PushingPipelineExecutor;
/// Don't return any data. Sets are created when Finish status is returned.
/// In general, several work() methods need to be called to finish.
/// Independent processors is created for each subquery.
class CreatingSetsTransform : public IAccumulatingTransform, WithContext
class CreatingSetsTransform : public IAccumulatingTransform
{
public:
CreatingSetsTransform(
@ -31,7 +31,7 @@ public:
Block out_header_,
SubqueryForSet subquery_for_set_,
SizeLimits network_transfer_limits_,
ContextPtr context_);
PreparedSetsCachePtr prepared_sets_cache_);
~CreatingSetsTransform() override;
@ -55,6 +55,7 @@ private:
bool done_with_table = true;
SizeLimits network_transfer_limits;
PreparedSetsCachePtr prepared_sets_cache;
size_t rows_to_transfer = 0;
size_t bytes_to_transfer = 0;

View File

@ -569,7 +569,7 @@ std::unique_ptr<QueryPipelineBuilder> QueryPipelineBuilder::joinPipelinesRightLe
return left;
}
void QueryPipelineBuilder::addCreatingSetsTransform(const Block & res_header, SubqueryForSet subquery_for_set, const SizeLimits & limits, ContextPtr context)
void QueryPipelineBuilder::addCreatingSetsTransform(const Block & res_header, SubqueryForSet subquery_for_set, const SizeLimits & limits, PreparedSetsCachePtr prepared_sets_cache)
{
resize(1);
@ -578,7 +578,7 @@ void QueryPipelineBuilder::addCreatingSetsTransform(const Block & res_header, Su
res_header,
std::move(subquery_for_set),
limits,
context);
std::move(prepared_sets_cache));
InputPort * totals_port = nullptr;

View File

@ -33,6 +33,9 @@ class TableJoin;
class QueryPipelineBuilder;
using QueryPipelineBuilderPtr = std::unique_ptr<QueryPipelineBuilder>;
class PreparedSetsCache;
using PreparedSetsCachePtr = std::shared_ptr<PreparedSetsCache>;
class QueryPipelineBuilder
{
public:
@ -138,7 +141,7 @@ public:
/// This is used for CreatingSets.
void addPipelineBefore(QueryPipelineBuilder pipeline);
void addCreatingSetsTransform(const Block & res_header, SubqueryForSet subquery_for_set, const SizeLimits & limits, ContextPtr context);
void addCreatingSetsTransform(const Block & res_header, SubqueryForSet subquery_for_set, const SizeLimits & limits, PreparedSetsCachePtr prepared_sets_cache);
PipelineExecutorPtr execute();

View File

@ -155,7 +155,7 @@ void StorageMemory::read(
size_t /*max_block_size*/,
size_t num_streams)
{
query_plan.addStep(std::make_unique<ReadFromMemoryStorageStep>(column_names, storage_snapshot, num_streams, delay_read_for_global_subqueries));
query_plan.addStep(std::make_unique<ReadFromMemoryStorageStep>(column_names, shared_from_this(), storage_snapshot, num_streams, delay_read_for_global_subqueries));
}

View File

@ -132,6 +132,8 @@ private:
std::atomic<size_t> total_size_rows = 0;
bool compress;
friend class ReadFromMemoryStorageStep;
};
}