Merge pull request #61666 from ClickHouse/fix-not-ready-set-system-parts

Fix Non-ready set for system.parts.
This commit is contained in:
Nikolai Kochetov 2024-03-22 11:49:26 +01:00 committed by GitHub
commit 3ed30959fa
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
8 changed files with 61 additions and 20 deletions

View File

@ -4,10 +4,12 @@
#include <DataTypes/DataTypesNumber.h>
#include <DataTypes/DataTypeDateTime.h>
#include <DataTypes/DataTypeNullable.h>
#include <DataTypes/DataTypeUUID.h>
#include <Storages/IStorage.h>
#include <Storages/MergeTree/DataPartStorageOnDiskFull.h>
#include <Storages/System/StorageSystemPartsBase.h>
#include <Storages/System/getQueriedColumnsMaskAndHeader.h>
#include <Storages/VirtualColumnUtils.h>
#include <Processors/Sources/SourceFromSingleChunk.h>
#include <QueryPipeline/Pipe.h>
#include <IO/SharedThreadPools.h>
@ -320,7 +322,7 @@ protected:
std::shared_ptr<StorageSystemDetachedParts> storage;
std::vector<UInt8> columns_mask;
const ActionsDAG::Node * predicate = nullptr;
ActionsDAGPtr filter;
const size_t max_block_size;
const size_t num_streams;
};
@ -329,7 +331,20 @@ void ReadFromSystemDetachedParts::applyFilters(ActionDAGNodes added_filter_nodes
{
filter_actions_dag = ActionsDAG::buildFilterActionsDAG(added_filter_nodes.nodes);
if (filter_actions_dag)
predicate = filter_actions_dag->getOutputs().at(0);
{
const auto * predicate = filter_actions_dag->getOutputs().at(0);
Block block;
block.insert(ColumnWithTypeAndName({}, std::make_shared<DataTypeString>(), "database"));
block.insert(ColumnWithTypeAndName({}, std::make_shared<DataTypeString>(), "table"));
block.insert(ColumnWithTypeAndName({}, std::make_shared<DataTypeString>(), "engine"));
block.insert(ColumnWithTypeAndName({}, std::make_shared<DataTypeUInt8>(), "active"));
block.insert(ColumnWithTypeAndName({}, std::make_shared<DataTypeUUID>(), "uuid"));
filter = VirtualColumnUtils::splitFilterDagForAllowedInputs(predicate, &block);
if (filter)
VirtualColumnUtils::buildSetsForDAG(filter, context);
}
}
void StorageSystemDetachedParts::read(
@ -358,7 +373,7 @@ void StorageSystemDetachedParts::read(
void ReadFromSystemDetachedParts::initializePipeline(QueryPipelineBuilder & pipeline, const BuildQueryPipelineSettings &)
{
auto state = std::make_shared<SourceState>(StoragesInfoStream(predicate, context));
auto state = std::make_shared<SourceState>(StoragesInfoStream(nullptr, filter, context));
Pipe pipe;

View File

@ -11,7 +11,7 @@ namespace DB
{
StoragesDroppedInfoStream::StoragesDroppedInfoStream(const ActionsDAG::Node * predicate, ContextPtr context)
StoragesDroppedInfoStream::StoragesDroppedInfoStream(const ActionsDAGPtr & filter, ContextPtr context)
: StoragesInfoStreamBase(context)
{
/// Will apply WHERE to subset of columns and then add more columns.
@ -74,7 +74,8 @@ StoragesDroppedInfoStream::StoragesDroppedInfoStream(const ActionsDAG::Node * pr
if (block_to_filter.rows())
{
/// Filter block_to_filter with columns 'database', 'table', 'engine', 'active'.
VirtualColumnUtils::filterBlockWithPredicate(predicate, block_to_filter, context);
if (filter)
VirtualColumnUtils::filterBlockWithDAG(filter, block_to_filter, context);
rows = block_to_filter.rows();
}

View File

@ -9,7 +9,7 @@ namespace DB
class StoragesDroppedInfoStream : public StoragesInfoStreamBase
{
public:
StoragesDroppedInfoStream(const ActionsDAG::Node * predicate, ContextPtr context);
StoragesDroppedInfoStream(const ActionsDAGPtr & filter, ContextPtr context);
protected:
bool tryLockTable(StoragesInfo &) override
{
@ -30,9 +30,9 @@ public:
std::string getName() const override { return "SystemDroppedTablesParts"; }
protected:
std::unique_ptr<StoragesInfoStreamBase> getStoragesInfoStream(const ActionsDAG::Node * predicate, ContextPtr context) override
std::unique_ptr<StoragesInfoStreamBase> getStoragesInfoStream(const ActionsDAGPtr &, const ActionsDAGPtr & filter, ContextPtr context) override
{
return std::make_unique<StoragesDroppedInfoStream>(predicate, context);
return std::make_unique<StoragesDroppedInfoStream>(filter, context);
}
};

View File

@ -83,7 +83,7 @@ StoragesInfo::getProjectionParts(MergeTreeData::DataPartStateVector & state, boo
return data->getProjectionPartsVectorForInternalUsage({State::Active}, &state);
}
StoragesInfoStream::StoragesInfoStream(const ActionsDAG::Node * predicate, ContextPtr context)
StoragesInfoStream::StoragesInfoStream(const ActionsDAGPtr & filter_by_database, const ActionsDAGPtr & filter_by_other_columns, ContextPtr context)
: StoragesInfoStreamBase(context)
{
/// Will apply WHERE to subset of columns and then add more columns.
@ -115,7 +115,8 @@ StoragesInfoStream::StoragesInfoStream(const ActionsDAG::Node * predicate, Conte
std::move(database_column_mut), std::make_shared<DataTypeString>(), "database"));
/// Filter block_to_filter with column 'database'.
VirtualColumnUtils::filterBlockWithPredicate(predicate, block_to_filter, context);
if (filter_by_database)
VirtualColumnUtils::filterBlockWithDAG(filter_by_database, block_to_filter, context);
rows = block_to_filter.rows();
/// Block contains new columns, update database_column.
@ -194,7 +195,8 @@ StoragesInfoStream::StoragesInfoStream(const ActionsDAG::Node * predicate, Conte
if (rows)
{
/// Filter block_to_filter with columns 'database', 'table', 'engine', 'active'.
VirtualColumnUtils::filterBlockWithPredicate(predicate, block_to_filter, context);
if (filter_by_other_columns)
VirtualColumnUtils::filterBlockWithDAG(filter_by_other_columns, block_to_filter, context);
rows = block_to_filter.rows();
}
@ -226,7 +228,8 @@ protected:
std::shared_ptr<StorageSystemPartsBase> storage;
std::vector<UInt8> columns_mask;
const bool has_state_column;
const ActionsDAG::Node * predicate = nullptr;
ActionsDAGPtr filter_by_database;
ActionsDAGPtr filter_by_other_columns;
};
ReadFromSystemPartsBase::ReadFromSystemPartsBase(
@ -254,7 +257,25 @@ void ReadFromSystemPartsBase::applyFilters(ActionDAGNodes added_filter_nodes)
{
filter_actions_dag = ActionsDAG::buildFilterActionsDAG(added_filter_nodes.nodes);
if (filter_actions_dag)
predicate = filter_actions_dag->getOutputs().at(0);
{
const auto * predicate = filter_actions_dag->getOutputs().at(0);
Block block;
block.insert(ColumnWithTypeAndName({}, std::make_shared<DataTypeString>(), "database"));
filter_by_database = VirtualColumnUtils::splitFilterDagForAllowedInputs(predicate, &block);
if (filter_by_database)
VirtualColumnUtils::buildSetsForDAG(filter_by_database, context);
block.insert(ColumnWithTypeAndName({}, std::make_shared<DataTypeString>(), "table"));
block.insert(ColumnWithTypeAndName({}, std::make_shared<DataTypeString>(), "engine"));
block.insert(ColumnWithTypeAndName({}, std::make_shared<DataTypeUInt8>(), "active"));
block.insert(ColumnWithTypeAndName({}, std::make_shared<DataTypeUUID>(), "uuid"));
filter_by_other_columns = VirtualColumnUtils::splitFilterDagForAllowedInputs(predicate, &block);
if (filter_by_other_columns)
VirtualColumnUtils::buildSetsForDAG(filter_by_other_columns, context);
}
}
void StorageSystemPartsBase::read(
@ -288,7 +309,7 @@ void StorageSystemPartsBase::read(
void ReadFromSystemPartsBase::initializePipeline(QueryPipelineBuilder & pipeline, const BuildQueryPipelineSettings &)
{
auto stream = storage->getStoragesInfoStream(predicate, context);
auto stream = storage->getStoragesInfoStream(filter_by_database, filter_by_other_columns, context);
auto header = getOutputStream().header;
MutableColumns res_columns = header.cloneEmptyColumns();

View File

@ -115,7 +115,7 @@ protected:
class StoragesInfoStream : public StoragesInfoStreamBase
{
public:
StoragesInfoStream(const ActionsDAG::Node * predicate, ContextPtr context);
StoragesInfoStream(const ActionsDAGPtr & filter_by_database, const ActionsDAGPtr & filter_by_other_columns, ContextPtr context);
};
/** Implements system table 'parts' which allows to get information about data parts for tables of MergeTree family.
@ -145,9 +145,9 @@ protected:
StorageSystemPartsBase(const StorageID & table_id_, ColumnsDescription && columns);
virtual std::unique_ptr<StoragesInfoStreamBase> getStoragesInfoStream(const ActionsDAG::Node * predicate, ContextPtr context)
virtual std::unique_ptr<StoragesInfoStreamBase> getStoragesInfoStream(const ActionsDAGPtr & filter_by_database, const ActionsDAGPtr & filter_by_other_columns, ContextPtr context)
{
return std::make_unique<StoragesInfoStream>(predicate, context);
return std::make_unique<StoragesInfoStream>(filter_by_database, filter_by_other_columns, context);
}
virtual void

View File

@ -53,9 +53,9 @@ namespace DB
namespace VirtualColumnUtils
{
static void makeSets(const ExpressionActionsPtr & actions, const ContextPtr & context)
void buildSetsForDAG(const ActionsDAGPtr & dag, const ContextPtr & context)
{
for (const auto & node : actions->getNodes())
for (const auto & node : dag->getNodes())
{
if (node.type == ActionsDAG::ActionType::COLUMN)
{
@ -78,8 +78,8 @@ static void makeSets(const ExpressionActionsPtr & actions, const ContextPtr & co
void filterBlockWithDAG(ActionsDAGPtr dag, Block & block, ContextPtr context)
{
buildSetsForDAG(dag, context);
auto actions = std::make_shared<ExpressionActions>(dag);
makeSets(actions, context);
Block block_with_filter = block;
actions->execute(block_with_filter, /*dry_run=*/ false, /*allow_duplicates_in_input=*/ true);

View File

@ -25,6 +25,9 @@ void filterBlockWithPredicate(const ActionsDAG::Node * predicate, Block & block,
/// Just filters block. Block should contain all the required columns.
void filterBlockWithDAG(ActionsDAGPtr dag, Block & block, ContextPtr context);
/// Builds sets used by ActionsDAG inplace.
void buildSetsForDAG(const ActionsDAGPtr & dag, const ContextPtr & context);
/// Recursively checks if all functions used in DAG are deterministic in scope of query.
bool isDeterministicInScopeOfQuery(const ActionsDAG::Node * node);

View File

@ -10,3 +10,4 @@ $CLICKHOUSE_CLIENT -q "insert into t1 select number from numbers(10);"
$CLICKHOUSE_CLIENT --max_threads=2 --max_result_rows=1 --result_overflow_mode=break -q "with tab as (select min(number) from t1 prewhere number in (select number from view(select number, row_number() OVER (partition by number % 2 ORDER BY number DESC) from numbers_mt(1e4)) where number != 2 order by number)) select number from t1 union all select * from tab;" > /dev/null
$CLICKHOUSE_CLIENT -q "SELECT * FROM system.tables WHERE 1 in (SELECT number from numbers(2)) AND database = currentDatabase() format Null"
$CLICKHOUSE_CLIENT -q "SELECT xor(1, 0) FROM system.parts WHERE 1 IN (SELECT 1) FORMAT Null"