Merge pull request #66018 from ClickHouse/readyornot

Fix 'Not-ready Set is passed' in system tables
This commit is contained in:
Michael Kolupaev 2024-08-06 08:13:03 +00:00 committed by GitHub
commit 9e3bc9bc53
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
24 changed files with 177 additions and 28 deletions

View File

@ -102,7 +102,7 @@ public:
///
/// @param allow_duplicates_in_input - actions are allowed to have
/// duplicated input (that will refer into the block). This is needed for
/// preliminary query filtering (filterBlockWithDAG()), because they just
/// preliminary query filtering (filterBlockWithExpression()), because they just
/// pass available virtual columns, which cannot be moved in case they are
/// used multiple times.
void execute(Block & block, size_t & num_rows, bool dry_run = false, bool allow_duplicates_in_input = false) const;

View File

@ -90,9 +90,18 @@ private:
using FutureSetFromTuplePtr = std::shared_ptr<FutureSetFromTuple>;
/// Set from subquery can be built inplace for PK or in CreatingSet step.
/// If use_index_for_in_with_subqueries_max_values is reached, set for PK won't be created,
/// but ordinary set would be created instead.
/// Set from subquery can be filled (by running the subquery) in one of two ways:
/// 1. During query analysis. Specifically, inside `SourceStepWithFilter::applyFilters()`.
/// Useful if the query plan depends on the set contents, e.g. to determine which files to read.
/// 2. During query execution. This is the preferred way.
/// Sets are created by CreatingSetStep, which runs before other steps.
/// Be careful: to build the set during query analysis, the `buildSetInplace()` call must happen
/// inside `SourceStepWithFilter::applyFilters()`. Calling it later, e.g. from `initializePipeline()`
/// will result in LOGICAL_ERROR "Not-ready Set is passed" (because a CreatingSetStep was already
/// added to pipeline but hasn't executed yet).
///
/// If use_index_for_in_with_subqueries_max_values is reached, the built set won't be suitable for
/// key analysis, but will work with function IN (the set will contain only hashes of elements).
class FutureSetFromSubquery final : public FutureSet
{
public:

View File

@ -1099,7 +1099,7 @@ void addBuildSubqueriesForSetsStepIfNeeded(
auto query_tree = subquery->detachQueryTree();
auto subquery_options = select_query_options.subquery();
/// I don't know if this is a good decision,
/// But for now it is done in the same way as in old analyzer.
/// but for now it is done in the same way as in old analyzer.
/// This would not ignore limits for subqueries (affects mutations only).
/// See test_build_sets_from_multiple_threads-analyzer.
subquery_options.ignore_limits = false;

View File

@ -41,6 +41,14 @@ ColumnsDescription StorageSystemRocksDB::getColumnsDescription()
}
Block StorageSystemRocksDB::getFilterSampleBlock() const
{
return {
{ {}, std::make_shared<DataTypeString>(), "database" },
{ {}, std::make_shared<DataTypeString>(), "table" },
};
}
void StorageSystemRocksDB::fillData(MutableColumns & res_columns, ContextPtr context, const ActionsDAG::Node * predicate, std::vector<UInt8>) const
{
const auto access = context->getAccess();

View File

@ -22,6 +22,7 @@ protected:
using IStorageSystemOneBlock::IStorageSystemOneBlock;
void fillData(MutableColumns & res_columns, ContextPtr context, const ActionsDAG::Node * predicate, std::vector<UInt8>) const override;
Block getFilterSampleBlock() const override;
};
}

View File

@ -275,7 +275,7 @@ public:
private:
std::shared_ptr<StorageMergeTreeIndex> storage;
Poco::Logger * log;
const ActionsDAG::Node * predicate = nullptr;
ExpressionActionsPtr virtual_columns_filter;
};
void ReadFromMergeTreeIndex::applyFilters(ActionDAGNodes added_filter_nodes)
@ -283,7 +283,16 @@ void ReadFromMergeTreeIndex::applyFilters(ActionDAGNodes added_filter_nodes)
SourceStepWithFilter::applyFilters(std::move(added_filter_nodes));
if (filter_actions_dag)
predicate = filter_actions_dag->getOutputs().at(0);
{
Block block_to_filter
{
{ {}, std::make_shared<DataTypeString>(), StorageMergeTreeIndex::part_name_column.name },
};
auto dag = VirtualColumnUtils::splitFilterDagForAllowedInputs(filter_actions_dag->getOutputs().at(0), &block_to_filter);
if (dag)
virtual_columns_filter = VirtualColumnUtils::buildFilterExpression(std::move(*dag), context);
}
}
void StorageMergeTreeIndex::read(
@ -335,7 +344,7 @@ void StorageMergeTreeIndex::read(
void ReadFromMergeTreeIndex::initializePipeline(QueryPipelineBuilder & pipeline, const BuildQueryPipelineSettings &)
{
auto filtered_parts = storage->getFilteredDataParts(predicate, context);
auto filtered_parts = storage->getFilteredDataParts(virtual_columns_filter);
LOG_DEBUG(log, "Reading index{}from {} parts of table {}",
storage->with_marks ? " with marks " : " ",
@ -345,9 +354,9 @@ void ReadFromMergeTreeIndex::initializePipeline(QueryPipelineBuilder & pipeline,
pipeline.init(Pipe(std::make_shared<MergeTreeIndexSource>(getOutputStream().header, storage->key_sample_block, std::move(filtered_parts), context, storage->with_marks)));
}
MergeTreeData::DataPartsVector StorageMergeTreeIndex::getFilteredDataParts(const ActionsDAG::Node * predicate, const ContextPtr & context) const
MergeTreeData::DataPartsVector StorageMergeTreeIndex::getFilteredDataParts(const ExpressionActionsPtr & virtual_columns_filter) const
{
if (!predicate)
if (!virtual_columns_filter)
return data_parts;
auto all_part_names = ColumnString::create();
@ -355,7 +364,7 @@ MergeTreeData::DataPartsVector StorageMergeTreeIndex::getFilteredDataParts(const
all_part_names->insert(part->name);
Block filtered_block{{std::move(all_part_names), std::make_shared<DataTypeString>(), part_name_column.name}};
VirtualColumnUtils::filterBlockWithPredicate(predicate, filtered_block, context);
VirtualColumnUtils::filterBlockWithExpression(virtual_columns_filter, filtered_block);
if (!filtered_block.rows())
return {};

View File

@ -36,7 +36,7 @@ public:
private:
friend class ReadFromMergeTreeIndex;
MergeTreeData::DataPartsVector getFilteredDataParts(const ActionsDAG::Node * predicate, const ContextPtr & context) const;
MergeTreeData::DataPartsVector getFilteredDataParts(const ExpressionActionsPtr & virtual_columns_filter) const;
StoragePtr source_table;
bool with_marks;

View File

@ -5,6 +5,7 @@
// #include <Storages/IStorage.h>
#include <Storages/SelectQueryInfo.h>
#include <Storages/System/getQueriedColumnsMaskAndHeader.h>
#include <Storages/VirtualColumnUtils.h>
#include <Processors/Sources/SourceFromSingleChunk.h>
#include <Processors/QueryPlan/QueryPlan.h>
#include <Processors/QueryPlan/SourceStepWithFilter.h>
@ -44,7 +45,7 @@ public:
private:
std::shared_ptr<IStorageSystemOneBlock> storage;
std::vector<UInt8> columns_mask;
const ActionsDAG::Node * predicate = nullptr;
std::optional<ActionsDAG> filter;
};
void IStorageSystemOneBlock::read(
@ -79,8 +80,9 @@ void IStorageSystemOneBlock::read(
void ReadFromSystemOneBlock::initializePipeline(QueryPipelineBuilder & pipeline, const BuildQueryPipelineSettings &)
{
const auto & sample_block = getOutputStream().header;
const Block & sample_block = getOutputStream().header;
MutableColumns res_columns = sample_block.cloneEmptyColumns();
const ActionsDAG::Node * predicate = filter ? filter->getOutputs().at(0) : nullptr;
storage->fillData(res_columns, context, predicate, std::move(columns_mask));
UInt64 num_rows = res_columns.at(0)->size();
@ -93,8 +95,18 @@ void ReadFromSystemOneBlock::applyFilters(ActionDAGNodes added_filter_nodes)
{
SourceStepWithFilter::applyFilters(std::move(added_filter_nodes));
if (filter_actions_dag)
predicate = filter_actions_dag->getOutputs().at(0);
if (!filter_actions_dag)
return;
Block sample = storage->getFilterSampleBlock();
if (sample.columns() == 0)
return;
filter = VirtualColumnUtils::splitFilterDagForAllowedInputs(filter_actions_dag->getOutputs().at(0), &sample);
/// Must prepare sets here, initializePipeline() would be too late, see comment on FutureSetFromSubquery.
if (filter)
VirtualColumnUtils::buildSetsForDAG(*filter, context);
}
}

View File

@ -22,8 +22,16 @@ class Context;
class IStorageSystemOneBlock : public IStorage
{
protected:
/// If this method uses `predicate`, getFilterSampleBlock() must list all columns to which
/// it's applied. (Otherwise there'll be a LOGICAL_ERROR "Not-ready Set is passed" on subqueries.)
virtual void fillData(MutableColumns & res_columns, ContextPtr context, const ActionsDAG::Node * predicate, std::vector<UInt8> columns_mask) const = 0;
/// Columns to which fillData() applies the `predicate`.
virtual Block getFilterSampleBlock() const
{
return {};
}
virtual bool supportsColumnsMask() const { return false; }
friend class ReadFromSystemOneBlock;

View File

@ -338,7 +338,7 @@ private:
std::shared_ptr<StorageSystemColumns> storage;
std::vector<UInt8> columns_mask;
const size_t max_block_size;
const ActionsDAG::Node * predicate = nullptr;
std::optional<ActionsDAG> virtual_columns_filter;
};
void ReadFromSystemColumns::applyFilters(ActionDAGNodes added_filter_nodes)
@ -346,7 +346,17 @@ void ReadFromSystemColumns::applyFilters(ActionDAGNodes added_filter_nodes)
SourceStepWithFilter::applyFilters(std::move(added_filter_nodes));
if (filter_actions_dag)
predicate = filter_actions_dag->getOutputs().at(0);
{
Block block_to_filter;
block_to_filter.insert(ColumnWithTypeAndName(ColumnString::create(), std::make_shared<DataTypeString>(), "database"));
block_to_filter.insert(ColumnWithTypeAndName(ColumnString::create(), std::make_shared<DataTypeString>(), "table"));
virtual_columns_filter = VirtualColumnUtils::splitFilterDagForAllowedInputs(filter_actions_dag->getOutputs().at(0), &block_to_filter);
/// Must prepare sets here, initializePipeline() would be too late, see comment on FutureSetFromSubquery.
if (virtual_columns_filter)
VirtualColumnUtils::buildSetsForDAG(*virtual_columns_filter, context);
}
}
void StorageSystemColumns::read(
@ -408,7 +418,8 @@ void ReadFromSystemColumns::initializePipeline(QueryPipelineBuilder & pipeline,
block_to_filter.insert(ColumnWithTypeAndName(std::move(database_column_mut), std::make_shared<DataTypeString>(), "database"));
/// Filter block with `database` column.
VirtualColumnUtils::filterBlockWithPredicate(predicate, block_to_filter, context);
if (virtual_columns_filter)
VirtualColumnUtils::filterBlockWithPredicate(virtual_columns_filter->getOutputs().at(0), block_to_filter, context);
if (!block_to_filter.rows())
{
@ -456,7 +467,8 @@ void ReadFromSystemColumns::initializePipeline(QueryPipelineBuilder & pipeline,
}
/// Filter block with `database` and `table` columns.
VirtualColumnUtils::filterBlockWithPredicate(predicate, block_to_filter, context);
if (virtual_columns_filter)
VirtualColumnUtils::filterBlockWithPredicate(virtual_columns_filter->getOutputs().at(0), block_to_filter, context);
if (!block_to_filter.rows())
{

View File

@ -214,7 +214,7 @@ private:
std::shared_ptr<StorageSystemDataSkippingIndices> storage;
std::vector<UInt8> columns_mask;
const size_t max_block_size;
const ActionsDAG::Node * predicate = nullptr;
ExpressionActionsPtr virtual_columns_filter;
};
void ReadFromSystemDataSkippingIndices::applyFilters(ActionDAGNodes added_filter_nodes)
@ -222,7 +222,16 @@ void ReadFromSystemDataSkippingIndices::applyFilters(ActionDAGNodes added_filter
SourceStepWithFilter::applyFilters(std::move(added_filter_nodes));
if (filter_actions_dag)
predicate = filter_actions_dag->getOutputs().at(0);
{
Block block_to_filter
{
{ ColumnString::create(), std::make_shared<DataTypeString>(), "database" },
};
auto dag = VirtualColumnUtils::splitFilterDagForAllowedInputs(filter_actions_dag->getOutputs().at(0), &block_to_filter);
if (dag)
virtual_columns_filter = VirtualColumnUtils::buildFilterExpression(std::move(*dag), context);
}
}
void StorageSystemDataSkippingIndices::read(
@ -268,7 +277,8 @@ void ReadFromSystemDataSkippingIndices::initializePipeline(QueryPipelineBuilder
/// Condition on "database" in a query acts like an index.
Block block { ColumnWithTypeAndName(std::move(column), std::make_shared<DataTypeString>(), "database") };
VirtualColumnUtils::filterBlockWithPredicate(predicate, block, context);
if (virtual_columns_filter)
VirtualColumnUtils::filterBlockWithExpression(virtual_columns_filter, block);
ColumnPtr & filtered_databases = block.getByPosition(0).column;
pipeline.init(Pipe(std::make_shared<DataSkippingIndicesSource>(

View File

@ -73,6 +73,14 @@ static String getEngineFull(const ContextPtr & ctx, const DatabasePtr & database
return engine_full;
}
Block StorageSystemDatabases::getFilterSampleBlock() const
{
return {
{ {}, std::make_shared<DataTypeString>(), "engine" },
{ {}, std::make_shared<DataTypeUUID>(), "uuid" },
};
}
static ColumnPtr getFilteredDatabases(const Databases & databases, const ActionsDAG::Node * predicate, ContextPtr context)
{
MutableColumnPtr name_column = ColumnString::create();

View File

@ -27,6 +27,7 @@ protected:
bool supportsColumnsMask() const override { return true; }
void fillData(MutableColumns & res_columns, ContextPtr context, const ActionsDAG::Node * predicate, std::vector<UInt8> columns_mask) const override;
Block getFilterSampleBlock() const override;
};
}

View File

@ -107,6 +107,13 @@ ColumnsDescription StorageSystemDistributionQueue::getColumnsDescription()
};
}
Block StorageSystemDistributionQueue::getFilterSampleBlock() const
{
return {
{ {}, std::make_shared<DataTypeString>(), "database" },
{ {}, std::make_shared<DataTypeString>(), "table" },
};
}
void StorageSystemDistributionQueue::fillData(MutableColumns & res_columns, ContextPtr context, const ActionsDAG::Node * predicate, std::vector<UInt8>) const
{

View File

@ -22,6 +22,7 @@ protected:
using IStorageSystemOneBlock::IStorageSystemOneBlock;
void fillData(MutableColumns & res_columns, ContextPtr context, const ActionsDAG::Node * predicate, std::vector<UInt8>) const override;
Block getFilterSampleBlock() const override;
};
}

View File

@ -46,6 +46,13 @@ ColumnsDescription StorageSystemMutations::getColumnsDescription()
};
}
Block StorageSystemMutations::getFilterSampleBlock() const
{
return {
{ {}, std::make_shared<DataTypeString>(), "database" },
{ {}, std::make_shared<DataTypeString>(), "table" },
};
}
void StorageSystemMutations::fillData(MutableColumns & res_columns, ContextPtr context, const ActionsDAG::Node * predicate, std::vector<UInt8>) const
{

View File

@ -22,6 +22,7 @@ protected:
using IStorageSystemOneBlock::IStorageSystemOneBlock;
void fillData(MutableColumns & res_columns, ContextPtr context, const ActionsDAG::Node * predicate, std::vector<UInt8>) const override;
Block getFilterSampleBlock() const override;
};
}

View File

@ -43,6 +43,14 @@ ColumnsDescription StorageSystemPartMovesBetweenShards::getColumnsDescription()
}
Block StorageSystemPartMovesBetweenShards::getFilterSampleBlock() const
{
return {
{ {}, std::make_shared<DataTypeString>(), "database" },
{ {}, std::make_shared<DataTypeString>(), "table" },
};
}
void StorageSystemPartMovesBetweenShards::fillData(MutableColumns & res_columns, ContextPtr context, const ActionsDAG::Node * predicate, std::vector<UInt8>) const
{
const auto access = context->getAccess();

View File

@ -20,6 +20,7 @@ protected:
using IStorageSystemOneBlock::IStorageSystemOneBlock;
void fillData(MutableColumns & res_columns, ContextPtr context, const ActionsDAG::Node * predicate, std::vector<UInt8>) const override;
Block getFilterSampleBlock() const override;
};
}

View File

@ -285,7 +285,7 @@ private:
const bool with_zk_fields;
const size_t max_block_size;
std::shared_ptr<StorageSystemReplicasImpl> impl;
const ActionsDAG::Node * predicate = nullptr;
ExpressionActionsPtr virtual_columns_filter;
};
void ReadFromSystemReplicas::applyFilters(ActionDAGNodes added_filter_nodes)
@ -293,7 +293,18 @@ void ReadFromSystemReplicas::applyFilters(ActionDAGNodes added_filter_nodes)
SourceStepWithFilter::applyFilters(std::move(added_filter_nodes));
if (filter_actions_dag)
predicate = filter_actions_dag->getOutputs().at(0);
{
Block block_to_filter
{
{ ColumnString::create(), std::make_shared<DataTypeString>(), "database" },
{ ColumnString::create(), std::make_shared<DataTypeString>(), "table" },
{ ColumnString::create(), std::make_shared<DataTypeString>(), "engine" },
};
auto dag = VirtualColumnUtils::splitFilterDagForAllowedInputs(filter_actions_dag->getOutputs().at(0), &block_to_filter);
if (dag)
virtual_columns_filter = VirtualColumnUtils::buildFilterExpression(std::move(*dag), context);
}
}
void StorageSystemReplicas::read(
@ -430,7 +441,8 @@ void ReadFromSystemReplicas::initializePipeline(QueryPipelineBuilder & pipeline,
{ col_engine, std::make_shared<DataTypeString>(), "engine" },
};
VirtualColumnUtils::filterBlockWithPredicate(predicate, filtered_block, context);
if (virtual_columns_filter)
VirtualColumnUtils::filterBlockWithExpression(virtual_columns_filter, filtered_block);
if (!filtered_block.rows())
{

View File

@ -62,6 +62,14 @@ ColumnsDescription StorageSystemReplicationQueue::getColumnsDescription()
}
Block StorageSystemReplicationQueue::getFilterSampleBlock() const
{
return {
{ {}, std::make_shared<DataTypeString>(), "database" },
{ {}, std::make_shared<DataTypeString>(), "table" },
};
}
void StorageSystemReplicationQueue::fillData(MutableColumns & res_columns, ContextPtr context, const ActionsDAG::Node * predicate, std::vector<UInt8>) const
{
const auto access = context->getAccess();

View File

@ -21,6 +21,7 @@ public:
protected:
using IStorageSystemOneBlock::IStorageSystemOneBlock;
void fillData(MutableColumns & res_columns, ContextPtr context, const ActionsDAG::Node * predicate, std::vector<UInt8>) const override;
Block getFilterSampleBlock() const override;
};
}

View File

@ -18,8 +18,16 @@ class NamesAndTypesList;
namespace VirtualColumnUtils
{
/// Similar to filterBlockWithQuery, but uses ActionsDAG as a predicate.
/// Basically it is filterBlockWithDAG(splitFilterDagForAllowedInputs).
/// The filtering functions are tricky to use correctly.
/// There are 2 ways:
/// 1. Call filterBlockWithPredicate() or filterBlockWithExpression() inside SourceStepWithFilter::applyFilters().
/// 2. Call splitFilterDagForAllowedInputs() and buildSetsForDAG() inside SourceStepWithFilter::applyFilters().
/// Then call filterBlockWithPredicate() or filterBlockWithExpression() in initializePipeline().
///
/// Otherwise calling filter*() outside applyFilters() will throw "Not-ready Set is passed"
/// if there are subqueries.
/// Similar to filterBlockWithExpression(buildFilterExpression(splitFilterDagForAllowedInputs(...))).
void filterBlockWithPredicate(const ActionsDAG::Node * predicate, Block & block, ContextPtr context);
/// Just filters block. Block should contain all the required columns.

View File

@ -11,3 +11,20 @@ $CLICKHOUSE_CLIENT --max_threads=2 --max_result_rows=1 --result_overflow_mode=br
$CLICKHOUSE_CLIENT -q "SELECT * FROM system.tables WHERE 1 in (SELECT number from numbers(2)) AND database = currentDatabase() format Null"
$CLICKHOUSE_CLIENT -q "SELECT xor(1, 0) FROM system.parts WHERE 1 IN (SELECT 1) FORMAT Null"
# (Not all of these tests are effective because some of these tables are empty.)
$CLICKHOUSE_CLIENT -nq "
select * from system.columns where table in (select '123');
select * from system.replicas where database in (select '123');
select * from system.data_skipping_indices where database in (select '123');
select * from system.databases where name in (select '123');
select * from system.mutations where table in (select '123');
select * from system.part_moves_between_shards where database in (select '123');
select * from system.replication_queue where database in (select '123');
select * from system.distribution_queue where database in (select '123');
"
$CLICKHOUSE_CLIENT -nq "
create table a (x Int8) engine MergeTree order by x;
insert into a values (1);
select * from mergeTreeIndex(currentDatabase(), 'a') where part_name in (select '123');
"